EventBridge Webhook Dataflow
End-to-end dataflow for Shopify webhook processing, covering both the existing HTTP path and the new EventBridge batching path (PR #2597).
══════════════════════════════════════════════════════════════════════════════
WEBHOOK REGISTRATION (one-time per shop)
══════════════════════════════════════════════════════════════════════════════
Shopify App Install
│
▼
POST /api/webhooks/register
│
▼
WebhookService.register_webhooks()
│
├─── eventbridge_arns has ARN for this app's client_id?
│ │
│ YES │ NO
│ │ │
│ │ ▼
│ │ _register_single_webhook()
│ │ → Shopify GraphQL: webhookSubscriptionCreate
│ │ → callback URL: https://admin.ecom.marqo.ai/api/webhooks/...
│ │
│ ▼
│ _register_eventbridge_webhook()
│ → Shopify GraphQL: eventBridgeWebhookSubscriptionCreate
│ → ARN: arn:aws:events:...:event-source/aws.partner/shopify.com/<id>
│
▼
_store_webhook_metadata() → DynamoDB (ShopifySettings)
══════════════════════════════════════════════════════════════════════════════
WEBHOOK DELIVERY — HTTP PATH (existing, per-event)
══════════════════════════════════════════════════════════════════════════════
Shopify (product/collection change)
│
▼
HTTP POST → API Gateway → Admin Lambda
│ (HMAC signature validated)
▼
webhook_routes.py: handle_product_update()
│
▼
SyncService.initialize_product_webhook_job()
│
├── 1. Create SyncJob (PENDING) in DynamoDB
│
├── 2. Build ProductWebhookMessage
│
└── 3. invoke_lambda_async() ──────────────────────┐
│
▼
Webhook Worker Lambda
│
▼
process_product_webhook()
(see COMMON PATH below)
══════════════════════════════════════════════════════════════════════════════
WEBHOOK DELIVERY — EVENTBRIDGE PATH (new, batched)
══════════════════════════════════════════════════════════════════════════════
Shopify (product/collection change)
│
▼
Partner Event Source
│
▼
Partner Event Bus ┌─────────────────────────────┐
(aws.partner/shopify.com/<id>) │ CDK EventBridge Rules │
│ │ (created per partner bus) │
│ detail-type: shopifyWebhook │ │
│ X-Shopify-Topic: │ │
├───── products/* ────────────►│ Product Rule ──► Product │
│ │ SQS Queue│
├───── collections/* ──────────►│ Collection Rule ► Coll. │
│ │ SQS Queue│
└───────────────────────────────┘ │
│
┌─────────────────────────────────────────┘
│
▼
┌────────────────────────┐
│ SQS Batching Config │
│ │
│ Products: │
│ batch_size=500 │
│ window=60s │
│ max_concurrency=150 │
│ │
│ Collections: │
│ batch_size=1 │
│ max_concurrency=150 │
└────────┬───────────────┘
│
▼
Webhook Worker Lambda
│
_is_sqs_batch_event() ── YES
│
_get_sqs_event_source()
│
┌────────┴────────┐
│ │
"product" "collection"
│ │
▼ ▼
process_product_ process_collection_
webhook_batch() webhook_batch_from_sqs()
│ │
│ Per-record:
│ ├ resolve domain
│ ├ create SyncJob
│ └ process_collection_webhook()
│ │
│ └──► (collection handler does
│ its own Shopify fetch +
│ Marqo partial updates)
│
▼
┌─────────────────────────────┐
│ PRODUCT BATCH PROCESSING │
│ │
│ 1. Parse EventBridge events│
│ 2. Dedup by product_id │
│ (delete wins over │
│ update/create) │
│ 3. Group by shop_domain │
└────────────┬────────────────┘
│
▼
SyncService.process_product_
webhook_batch()
(see COMMON PATH below)
══════════════════════════════════════════════════════════════════════════════
COMMON PATH — PRODUCT PROCESSING (both paths converge here)
══════════════════════════════════════════════════════════════════════════════
SyncService.process_product_webhook[_batch]()
│
├── 1. _resolve_shopify_domain()
│ → (system_account_id, index_name, shop_id)
│
├── 2. _get_access_token_for_webhook_sync()
│ → Shopify access token from DynamoDB
│
├── 3. Create SyncJob (batch path) or use existing job (HTTP path)
│
├── 4. Get index config (split_by, group_by, translation_locales, etc.)
│
├── 5. Fetch product data from Shopify GraphQL API
│ │
│ ├── HTTP path: get_full_product_data_by_id() — single product, paginated variants
│ └── Batch path: get_multiple_products_enrichment() — up to 50 products/call
│
├── 6. For each product:
│ │
│ ├── _build_full_product_from_graphql()
│ │ → normalize GIDs, flatten images/media, extract metafields
│ │
│ ├── Active + published?
│ │ │
│ │ YES NO
│ │ │ │
│ │ ▼ ▼
│ │ transform_product_ find_variants_for_product_deletion()
│ │ result() → search Marqo for existing docs
│ │ → MarqoResultDocs → MarqoDeletionDocs
│ │
│ └── Track doc IDs per product (for stale cleanup)
│
├── 7. Update available option facets
│ → index_settings_repo.add_available_option_facets()
│
├── 8. Handle explicit deletes (products/delete topic)
│ → find_variants_for_product_deletion() → MarqoDeletionDocs
│
├── 9. Stale variant cleanup (per product)
│ → find_stale_variant_marqo_ids(current_ids)
│ → enqueue deletions for removed variants
│
└── 10. Write + enqueue
│
▼
┌──────────────────────────────────┐
│ S3: Write JSONL │
│ s3://{bucket}/{prefix}/ │
│ batch-webhooks/{job_id}/ │
│ add_docs.jsonl │
│ delete_docs.jsonl │
│ {product_id}-stale.jsonl │
└──────────┬───────────────────────┘
│
▼
┌──────────────────────────────────┐
│ SQS: Per-shop indexer queue │
│ │
│ Message: │
│ ├ job_id │
│ ├ shop_id │
│ ├ operation: ADD/DELETE_DOCS │
│ ├ s3_bucket + s3_key │
│ └ variant_count │
└──────────┬───────────────────────┘
│
▼
SyncJob.update_job_progress(total_items=N)
══════════════════════════════════════════════════════════════════════════════
INDEXER LAMBDA (ecom_indexer)
══════════════════════════════════════════════════════════════════════════════
Per-shop SQS queue
│
▼
ecom_indexer Lambda (lambda_function.py)
│
├── 1. Parse SQS message → Record(job_id, s3_key, operation, ...)
│
├── 2. Download JSONL from S3
│ → list of MarqoResultDocument or MarqoDeletionDocument
│
├── 3. Resolve API key + index config
│ → api_key_resolver(cell_id, system_account_id)
│
├── 4. Chunk documents (batch_size ~50-100)
│
├── 5. Send to Marqo HTTP API
│ │
│ ├── ADD: POST /indexes/{name}/documents
│ └── DELETE: POST /indexes/{name}/documents/delete-batch
│ │
│ ├── Retry: exponential backoff, deadline-aware
│ ├── 429: reduce visibility, retry later
│ └── 409: conflict (stale write), skip
│
├── 6. Update job progress in DynamoDB
│ → processed_items, failed_items, skipped_items
│
└── 7. check_and_complete_job()
→ if all chunks done: status = COMPLETED
→ SyncJob lifecycle ends
══════════════════════════════════════════════════════════════════════════════
JOB LIFECYCLE
══════════════════════════════════════════════════════════════════════════════
PENDING ──► IN_PROGRESS ──► COMPLETED
│
└──────────► FAILED (on error / max retries)
══════════════════════════════════════════════════════════════════════════════
FAILURE / RETRY PATHS
══════════════════════════════════════════════════════════════════════════════
EventBridge SQS path:
┌──────────────────────────────────────────────────────────┐
│ Worker Lambda returns batchItemFailures │
│ │ │
│ ▼ │
│ SQS redelivers failed messages (visibility=90min) │
│ │ │
│ ├── Up to 5 retries (maxReceiveCount=5) │
│ │ │
│ └── After 5 failures → DLQ (14-day retention) │
└──────────────────────────────────────────────────────────┘
Indexer SQS path:
┌──────────────────────────────────────────────────────────┐
│ Indexer Lambda returns batchItemFailures │
│ │ │
│ ▼ │
│ Per-shop SQS redelivers → retry with backoff │
│ │ │
│ └── Self-DLQ: marks job FAILED before max retries │
└──────────────────────────────────────────────────────────┘