Flow: Add/Update/Delete Documents (Product Indexing)
How products are written to a Marqo ecommerce search index via direct API calls. This is the primary data ingestion path for the ecommerce platform.
Request Path
graph TD
A["Client (e.g. customer service)"]
B["Ecom API Cloudflare Worker ({env}-ecom-api)"]
subgraph lambda["Ecom API Lambda ({env}-ShopifyAppAdminFunction)"]
C["Product transform → S3 (JSONL chunks of max 1000 docs)"]
D["SQS message per chunk"]
end
Q["Queue ({env}-{shop_id}-doc-queue)"]
subgraph indexer["Ecom Indexer Lambda ({env}-EcomIndexerFunction)"]
E["Download chunks from S3"]
F["Validation, write protection check, internal field enrichment"]
G["Marqo add/update/delete_documents API (via data plane cell gateway)"]
H["Job status update in DynamoDB ({env}-EcomIndexerJobsTable)"]
end
A --> B
B --> C
C --> D
D --> Q
Q --> E
E --> F
F --> G
G --> H
Step-by-Step
1. Trigger: Client -> Ecom API Call
Where: cloud_control_plane/components/search_proxy/
2. Trigger: Ecom API -> Ecom Lambda Call
Where: cloud_control_plane/components/shopify/admin_server/
Inspect: Check Ecom Lambda logs — see Lambda.
aws logs tail /aws/lambda/{env}-ShopifyAppAdminFunction --since 15m
3. Product Transformation & Chunking
Where: components/shopify/admin_server/admin_server/handlers/bulk_sync_handler.py
- Download bulk JSONL from Shopify (streamed, not loaded into memory)
ProductAccumulatorreconstructs complete products from flattened JSONL (variants, metafields, images linked via__parentId)ProductTransformerconverts to Marqo document format with tensor fields- Documents chunked (configurable chunk size) and written to S3
S3 location: {env}-ecom-product-data-bucket / shopify/{shop_id}/bulk/{job_id}/chunk_{NNNN}.jsonl
Inspect: Check S3 for chunk files — see S3.
4. SQS Message Per Chunk
Where: components/shopify/admin_server/admin_server/services/aws_operations.py
Each chunk gets an SQS message with:
{
"job_id": "uuid",
"shop_id": "example.myshopify.com",
"index_name": "shopify-example-store",
"operation": "ADD_DOCUMENTS",
"s3_bucket": "{env}-ecom-product-data-bucket",
"s3_key": "shopify/.../chunk_0000.jsonl"
}
Queue: {env}-ecom-indexer-{shop_id}.fifo — per-shop FIFO queues (created dynamically). Message group ID ensures ordering per shop.
Inspect: Check SQS queue depth — see SQS.
5. Job Creation in DynamoDB
Where: components/shopify/admin_server/admin_server/services/sync_service.py
A job record is created in {env}-EcomIndexerJobsTable:
- pk:
PLATFORM#shopify#SHOP#{shop_id}, sk:JOB#{created_at}#{job_id} - Status:
PENDING - Includes
total_itemscount for completion tracking
Inspect: Query jobs table — see DynamoDB and Ecommerce for schema.
aws dynamodb query --table-name {env}-EcomIndexerJobsTable \
--index-name GSI_JobLookup \
--key-condition-expression "job_id = :jid" \
--expression-attribute-values '{":jid": {"S": "the-job-id"}}'
6. Indexer Lambda Processes Chunk
Where: components/ecom_indexer/ecom_indexer/lambda_function.py
- Parse SQS records from event
- Prefetch credentials: API key +
AddDocsConfig(tensor fields, mappings) fromEcomIndexSettingsTable - Time budget management: If \<120s Lambda time remaining, defer remaining messages back to SQS
- Mark job
IN_PROGRESSin DynamoDB
Inspect: Check indexer Lambda logs — see Lambda.
aws logs tail /aws/lambda/{env}-EcomIndexerFunction --since 15m
7. Download & Parse Documents from S3
Where: components/ecom_indexer/ecom_indexer/document_operations.py
Downloads JSONL from S3, parses each line. Invalid JSON lines are tracked in skipped_items_details.
8. Write Protection (Optimistic Locking)
Where: components/ecom_indexer/ecom_indexer/document_operations.py
Before writing, fetches existing documents from Marqo via get_documents API:
- Compares
updated_attimestamps (ISO 8601 string comparison) - Falls back to
_mq_versioninteger comparison - Skips write if incoming version \<= stored version
Conflicts tracked in conflict_details on the job record.
9. Call Marqo add_documents API
Where: components/ecom_indexer/ecom_indexer/document_operations.py
POST {index_endpoint}/indexes/{index_name}/documents
{
"documents": [...],
"tensorFields": [...],
"useExistingTensors": true/false,
"mappings": {...}
}
Dual endpoint strategy:
- Primary: index-specific endpoint
- Fallback: data plane cell gateway (on transport errors)
Retry on 409 conflict: Exponential backoff, up to 9 attempts, max 40s sleep.
Inspect: Check data plane cell gateway — see API Gateway and Ecommerce.
10. Readback Integrity Check (Optional)
Where: components/ecom_indexer/ecom_indexer/readback_consistency.py
If ecom_indexer_readback feature flag enabled:
- Queries Marqo for written documents to verify consistency
- Retries 2 times with 15s sleep between
- Cleans up partial documents (incomplete tensor fields)
CloudWatch metrics:
PartialDocumentsDetected— triggers alarmPartialDocumentsRecoveredAfterRetry
Inspect: Check CloudWatch alarms — see CloudWatch.
11. Job Completion
Where: components/ecom_indexer/ecom_indexer/job_management.py
After each chunk, atomically updates job counters using DynamoDB conditional writes:
completed_chunksset ensures idempotence (Lambda retries won't double-count)- When
processed + failed + skipped + conflict == total_items, job status transitions: COMPLETED(all succeeded)CONFLICT(some version conflicts)FAILED(some hard failures)
12. Error Handling & DLQ
Transient errors (SQS retry): 429 rate limits, image download timeouts, 404 document not found.
Terminal errors (self-DLQ): Enriched with stack trace and job context before sending to DLQ. Job marked FAILED.
Inspect: Check DLQ — see SQS.
What to Look For
| Symptom | Where to Check |
|---|---|
| HTTP 4XX errors | Ecom API Lambda logs. |
| Job stuck in PENDING | SQS queue — messages not being consumed? Indexer Lambda errors? |
| Job stuck in IN_PROGRESS | Indexer Lambda timeouts. Check remaining chunks via completed_chunks count. |
| Partial documents | PartialDocumentsDetected CloudWatch alarm. Readback check logs. |
| Version conflicts | conflict_details on job record. Another sync running concurrently? |
| Slow indexing | Marqo latency (data plane cell). Image download timeouts. Queue depth. |
| DLQ messages | Read DLQ messages for enriched error context. Check error_message on job. |
| Missing products | Check S3 chunks — all products accounted for? Check skipped_items_details. |
Related Docs
- Ecommerce — all DDB table schemas, Lambda details
- Lambda — how to check Lambda logs and errors
- SQS — queue depth, DLQ inspection
- S3 — product data bucket
- DynamoDB — query job records
- CloudWatch — partial document alarms
- Settings Sync — how index settings reach the indexer