Indexing Pipeline Stages — what data is available where
A field is only correct if it is computed at a stage where its inputs already
exist. In the Shopify bulk sync path, several per-variant inputs are
enrichment-deferred — they are NOT present when transform_product_result
runs, and only arrive in a later pass. Computing a field in the wrong stage fails
silently: the value is wrong, not absent, so nothing crashes and unit tests with
inline fixtures pass.
Read this before adding any computed/derived field to the indexing pipeline. See also Shopify Metafield Indexing for the metafield-specific flow.
Cautionary example: promoPriceMin / promoPriceMax (PR #3540 → #3588)
PR #3540 added config-driven promoPriceMin/promoPriceMax, computed from each
variant's regular price and its promo.price metafield, inside
ProductTransformer.transform_product_result. The per-doc logic
(compute_promo_price_range) was correct in isolation. But for the bulk path
the promo.price variant metafield is enrichment-deferred: at transform time the
variants carry no metafields, so compute saw nothing → fell back to
priceMin/priceMax, and nothing recomputed once enrichment attached the
metafield. The live index showed promo == regular price for every promo product.
The unit tests passed because their fixtures put the promo metafield inline on
the variant — a shape that only ever exists in the webhook paths, never in bulk.
The fix (PR #3588) recomputes the range in the enrichment merge step and carries a
per-variant price basis from transform so the inputs can be re-paired. The lasting
guardrail is this doc + a realistic enrichment-deferred test fixture
(build_enriched_bulk_product).
Bulk path stages (in order)
| Stage | Code location | Produces / merges |
|---|---|---|
| 1. Raw bulk export | graphql/mutations/bulk_operations_mutations.py BULK_EXPORT_ALL_PRODUCTS (5-connection limit) | Product + variant scalars (id, price, sku, options, inventory_quantity), media, collections, product-level metafields. NO variant metafields, NO per-location inventory, NO market/country data. |
| 2. Transform | transformers/product_transformer.py transform_product_result (called via ProductAccumulator; bulk call site services/sync_service.py → bulk_sync_handler.process_bulk_file_streaming) | Aggregates variants → MarqoResultDocuments: priceMin/priceMax, stockTotal, onSale, option arrays, product metafields, title sort. Sees ONLY stage-1 data. |
| 3. Enrichment merge | handlers/bulk_sync_handler.py _fetch_enrichment_for_batch (fetch, GET_VARIANT_ENRICHMENT_DATA) → _apply_enrichment_to_docs (merge, ~line 735) | Attaches variant metafields (variantMetafield*), per-location inventory (inventory<Location>), and market/country (marketCountries) onto the already-transformed docs via setattr. |
| 4. Field-copy | ecom_indexer/document_operations.py _copy_fields (line 621; called last, lines 853 / 1200) | Copies fields to new names per field_copy_map (e.g. a variant metafield → promoPrice). Runs AFTER enrichment, so a copy of an enrichment-deferred field looks correct even when a transform-stage computation of the same data did not. |
What data is available at each stage
| Data | Stage 1 raw export | Stage 2 transform | Stage 3 enrichment | Stage 4 field-copy |
|---|---|---|---|---|
| Product scalars / tags / status | ✅ | ✅ | ✅ | ✅ |
| Product-level metafields | ✅ | ✅ | ✅ | ✅ |
| Per-variant price / sku / options | ✅ | ✅ | (on doc as aggregate priceMin/Max) | ✅ |
Aggregate priceMin/priceMax/stockTotal | — | ✅ | ✅ | ✅ |
Variant metafields (variantMetafield*) | ❌ | ❌ | ✅ | ✅ |
Per-location inventory (inventory<Loc>) | ❌ | ❌ | ✅ | ✅ |
Market / country (marketCountries) | ❌ | ❌ | ✅ | ✅ |
⚠️ Load-bearing rule: any field whose inputs include variant metafields, per-location inventory, or market/country data must be computed in or after the enrichment merge (stage 3), NOT in
transform_product_result(stage 2). In the bulk path those inputs do not exist at stage 2 — a transform-stage computation silently sees nothing and falls back. The transform stage only has stage-1 data: product scalars/tags/status, product-level metafields, and per-variant price/sku/options.If a field needs per-variant inputs from BOTH stages (e.g. the regular price from stage 1 and a promo metafield from stage 3), carry the stage-1 inputs forward on the doc as an internal field that survives the S3 round-trip, then complete the computation at stage 3. See
_mq_promo_price_basisinmodels/marqo_documents.py+_recompute_promo_price_rangeinhandlers/bulk_sync_handler.py.
Why the webhook / EventBridge paths differ
The single-product webhook and EventBridge batch paths do NOT use the bulk export.
They fetch via ENRICH_PRODUCT_DATA / get_multiple_products_enrichment, and
product_webhook_handler._build_full_product_from_graphql attaches variant
metafields inline on each variant dict before transform_product_result runs
(see handlers/product_webhook_handler.py, the v["metafields"] = ... line). So
in those paths the metafields ARE present at stage 2 and a transform-stage
computation is correct. This asymmetry is exactly the trap: a feature validated
only on the webhook path (or with inline-metafield fixtures) looks done but is
broken in bulk. Always validate computed per-variant fields against the
enrichment-deferred shape.
Checklist for adding a computed field
- List the field's inputs. Are any of them variant metafields, per-location inventory, or market/country data?
- If no → compute in
transform_product_result(stage 2) is fine. - If yes → compute in/after
_apply_enrichment_to_docs(stage 3). If you also need stage-1 per-variant inputs, carry them on the doc as an internal (_mq_*) field and re-pair them at stage 3. - Write a test using
build_enriched_bulk_product(enrichment-deferred shape), not an inline-metafield fixture. It must fail before your fix.