Skip to main content

Flow: Add/Update/Delete Documents (Product Indexing)

How products are written to a Marqo ecommerce search index via direct API calls. This is the primary data ingestion path for the ecommerce platform.

Request Path

Step-by-Step

1. Trigger: Client -> Ecom API Call

Where: cloud_control_plane/components/search_proxy/

2. Trigger: Ecom API -> Ecom Lambda Call

Where: cloud_control_plane/components/shopify/admin_server/

Inspect: Check Ecom Lambda logs — see Lambda.

aws logs tail /aws/lambda/{env}-ShopifyAppAdminFunction --since 15m

3. Product Transformation & Chunking

Where: components/shopify/admin_server/admin_server/handlers/bulk_sync_handler.py

  1. Download bulk JSONL from Shopify (streamed, not loaded into memory)
  2. ProductAccumulator reconstructs complete products from flattened JSONL (variants, metafields, images linked via __parentId)
  3. ProductTransformer converts to Marqo document format with tensor fields
  4. Documents chunked (configurable chunk size) and written to S3

S3 location: {env}-ecom-product-data-bucket / shopify/{shop_id}/bulk/{job_id}/chunk_{NNNN}.jsonl

Inspect: Check S3 for chunk files — see S3.

4. SQS Message Per Chunk

Where: components/shopify/admin_server/admin_server/services/aws_operations.py

Each chunk gets an SQS message with:

{
"job_id": "uuid",
"shop_id": "example.myshopify.com",
"index_name": "shopify-example-store",
"operation": "ADD_DOCUMENTS",
"s3_bucket": "{env}-ecom-product-data-bucket",
"s3_key": "shopify/.../chunk_0000.jsonl"
}

Queue: {env}-ecom-indexer-{shop_id}.fifo — per-shop FIFO queues (created dynamically). Message group ID ensures ordering per shop.

Inspect: Check SQS queue depth — see SQS.

5. Job Creation in DynamoDB

Where: components/shopify/admin_server/admin_server/services/sync_service.py

A job record is created in {env}-EcomIndexerJobsTable:

  • pk: PLATFORM#shopify#SHOP#{shop_id}, sk: JOB#{created_at}#{job_id}
  • Status: PENDING
  • Includes total_items count for completion tracking

Inspect: Query jobs table — see DynamoDB and Ecommerce for schema.

aws dynamodb query --table-name {env}-EcomIndexerJobsTable \
--index-name GSI_JobLookup_v2 \
--key-condition-expression "job_id = :jid" \
--expression-attribute-values '{":jid": {"S": "the-job-id"}}'

6. Indexer Lambda Processes Chunk

Where: components/ecom_indexer/ecom_indexer/lambda_function.py

  1. Parse SQS records from event
  2. Prefetch credentials: API key + AddDocsConfig (tensor fields, mappings) from EcomIndexSettingsTable
  3. Time budget management: If <120s Lambda time remaining, defer remaining messages back to SQS
  4. Mark job IN_PROGRESS in DynamoDB

Inspect: Check indexer Lambda logs — see Lambda.

aws logs tail /aws/lambda/{env}-EcomIndexerFunction --since 15m

7. Download & Parse Documents from S3

Where: components/ecom_indexer/ecom_indexer/document_operations.py

Downloads JSONL from S3, parses each line. Invalid JSON lines are tracked in skipped_items_details.

8. Write Protection (Optimistic Locking)

Where: components/ecom_indexer/ecom_indexer/document_operations.py

Before writing, fetches existing documents from Marqo via get_documents API:

  • Compares updated_at timestamps (ISO 8601 string comparison)
  • Falls back to _mq_version integer comparison
  • Skips write if incoming version <= stored version

Conflicts tracked in conflict_details on the job record.

9. Call Marqo add_documents API

Where: components/ecom_indexer/ecom_indexer/document_operations.py

POST {index_endpoint}/indexes/{index_name}/documents
{
"documents": [...],
"tensorFields": [...],
"useExistingTensors": true/false,
"mappings": {...}
}

Dual endpoint strategy:

  • Primary: index-specific endpoint
  • Fallback: data plane cell gateway (on transport errors)

Retry on 409 conflict: Exponential backoff, up to 9 attempts, max 40s sleep.

Inspect: Check data plane cell gateway — see API Gateway and Ecommerce.

10. Readback Integrity Check (Optional)

Where: components/ecom_indexer/ecom_indexer/readback_consistency.py

If ecom_indexer_readback feature flag enabled:

  • Queries Marqo for written documents to verify consistency
  • Retries 2 times with 15s sleep between
  • Cleans up partial documents (incomplete tensor fields)

CloudWatch metrics:

  • PartialDocumentsDetected — triggers alarm
  • PartialDocumentsRecoveredAfterRetry

Inspect: Check CloudWatch alarms — see CloudWatch.

11. Job Completion

Where: components/ecom_indexer/ecom_indexer/job_management.py

After each chunk, atomically updates job counters using DynamoDB conditional writes:

  • completed_chunks set ensures idempotence (Lambda retries won't double-count)
  • When processed + failed + skipped + conflict == total_items, job status transitions:
    • COMPLETED (all succeeded)
    • CONFLICT (some version conflicts)
    • FAILED (some hard failures)

12. Error Handling & DLQ

Transient errors (SQS retry): 429 rate limits, image download timeouts, 404 document not found.

Terminal errors (self-DLQ): Enriched with stack trace and job context before sending to DLQ. Job marked FAILED.

Inspect: Check DLQ — see SQS.

What to Look For

SymptomWhere to Check
HTTP 4XX errorsEcom API Lambda logs.
Job stuck in PENDINGSQS queue — messages not being consumed? Indexer Lambda errors?
Job stuck in IN_PROGRESSIndexer Lambda timeouts. Check remaining chunks via completed_chunks count.
Partial documentsPartialDocumentsDetected CloudWatch alarm. Readback check logs.
Version conflictsconflict_details on job record. Another sync running concurrently?
Slow indexingMarqo latency (data plane cell). Image download timeouts. Queue depth.
DLQ messagesRead DLQ messages for enriched error context. Check error_message on job.
Missing productsCheck S3 chunks — all products accounted for? Check skipped_items_details.
  • Ecommerce — all DDB table schemas, Lambda details
  • Lambda — how to check Lambda logs and errors
  • SQS — queue depth, DLQ inspection
  • S3 — product data bucket
  • DynamoDB — query job records
  • CloudWatch — partial document alarms
  • Settings Sync — how index settings reach the indexer