Skip to main content

EventBridge Webhook Dataflow

End-to-end dataflow for Shopify webhook processing, covering both the existing HTTP path and the new EventBridge batching path (PR #2597).

══════════════════════════════════════════════════════════════════════════════
WEBHOOK REGISTRATION (one-time per shop)
══════════════════════════════════════════════════════════════════════════════

Shopify App Install


POST /api/webhooks/register


WebhookService.register_webhooks()

├─── eventbridge_arns has ARN for this app's client_id?
│ │
│ YES │ NO
│ │ │
│ │ ▼
│ │ _register_single_webhook()
│ │ → Shopify GraphQL: webhookSubscriptionCreate
│ │ → callback URL: https://admin.ecom.marqo.ai/api/webhooks/...
│ │
│ ▼
│ _register_eventbridge_webhook()
│ → Shopify GraphQL: eventBridgeWebhookSubscriptionCreate
│ → ARN: arn:aws:events:...:event-source/aws.partner/shopify.com/<id>


_store_webhook_metadata() → DynamoDB (ShopifySettings)


══════════════════════════════════════════════════════════════════════════════
WEBHOOK DELIVERY — HTTP PATH (existing, per-event)
══════════════════════════════════════════════════════════════════════════════

Shopify (product/collection change)


HTTP POST → API Gateway → Admin Lambda
│ (HMAC signature validated)

webhook_routes.py: handle_product_update()


SyncService.initialize_product_webhook_job()

├── 1. Create SyncJob (PENDING) in DynamoDB

├── 2. Build ProductWebhookMessage

└── 3. invoke_lambda_async() ──────────────────────┐


Webhook Worker Lambda


process_product_webhook()
(see COMMON PATH below)


══════════════════════════════════════════════════════════════════════════════
WEBHOOK DELIVERY — EVENTBRIDGE PATH (new, batched)
══════════════════════════════════════════════════════════════════════════════

Shopify (product/collection change)


Partner Event Source


Partner Event Bus ┌─────────────────────────────┐
(aws.partner/shopify.com/<id>) │ CDK EventBridge Rules │
│ │ (created per partner bus) │
│ detail-type: shopifyWebhook │ │
│ X-Shopify-Topic: │ │
├───── products/* ────────────►│ Product Rule ──► Product │
│ │ SQS Queue│
├───── collections/* ──────────►│ Collection Rule ► Coll. │
│ │ SQS Queue│
└───────────────────────────────┘ │

┌─────────────────────────────────────────┘


┌────────────────────────┐
│ SQS Batching Config │
│ │
│ Products: │
│ batch_size=500 │
│ window=60s │
│ max_concurrency=150 │
│ │
│ Collections: │
│ batch_size=1 │
│ max_concurrency=150 │
└────────┬───────────────┘


Webhook Worker Lambda

_is_sqs_batch_event() ── YES

_get_sqs_event_source()

┌────────┴────────┐
│ │
"product" "collection"
│ │
▼ ▼
process_product_ process_collection_
webhook_batch() webhook_batch_from_sqs()
│ │
│ Per-record:
│ ├ resolve domain
│ ├ create SyncJob
│ └ process_collection_webhook()
│ │
│ └──► (collection handler does
│ its own Shopify fetch +
│ Marqo partial updates)


┌─────────────────────────────┐
│ PRODUCT BATCH PROCESSING │
│ │
│ 1. Parse EventBridge events│
│ 2. Dedup by product_id │
│ (delete wins over │
│ update/create) │
│ 3. Group by shop_domain │
└────────────┬────────────────┘


SyncService.process_product_
webhook_batch()
(see COMMON PATH below)


══════════════════════════════════════════════════════════════════════════════
COMMON PATH — PRODUCT PROCESSING (both paths converge here)
══════════════════════════════════════════════════════════════════════════════

SyncService.process_product_webhook[_batch]()

├── 1. _resolve_shopify_domain()
│ → (system_account_id, index_name, shop_id)

├── 2. _get_access_token_for_webhook_sync()
│ → Shopify access token from DynamoDB

├── 3. Create SyncJob (batch path) or use existing job (HTTP path)

├── 4. Get index config (split_by, group_by, translation_locales, etc.)

├── 5. Fetch product data from Shopify GraphQL API
│ │
│ ├── HTTP path: get_full_product_data_by_id() — single product, paginated variants
│ └── Batch path: get_multiple_products_enrichment() — up to 50 products/call

├── 6. For each product:
│ │
│ ├── _build_full_product_from_graphql()
│ │ → normalize GIDs, flatten images/media, extract metafields
│ │
│ ├── Active + published?
│ │ │
│ │ YES NO
│ │ │ │
│ │ ▼ ▼
│ │ transform_product_ find_variants_for_product_deletion()
│ │ result() → search Marqo for existing docs
│ │ → MarqoResultDocs → MarqoDeletionDocs
│ │
│ └── Track doc IDs per product (for stale cleanup)

├── 7. Update available option facets
│ → index_settings_repo.add_available_option_facets()

├── 8. Handle explicit deletes (products/delete topic)
│ → find_variants_for_product_deletion() → MarqoDeletionDocs

├── 9. Stale variant cleanup (per product)
│ → find_stale_variant_marqo_ids(current_ids)
│ → enqueue deletions for removed variants

└── 10. Write + enqueue


┌──────────────────────────────────┐
│ S3: Write JSONL │
│ s3://{bucket}/{prefix}/ │
│ batch-webhooks/{job_id}/ │
│ add_docs.jsonl │
│ delete_docs.jsonl │
│ {product_id}-stale.jsonl │
└──────────┬───────────────────────┘


┌──────────────────────────────────┐
│ SQS: Per-shop indexer queue │
│ │
│ Message: │
│ ├ job_id │
│ ├ shop_id │
│ ├ operation: ADD/DELETE_DOCS │
│ ├ s3_bucket + s3_key │
│ └ variant_count │
└──────────┬───────────────────────┘


SyncJob.update_job_progress(total_items=N)


══════════════════════════════════════════════════════════════════════════════
INDEXER LAMBDA (ecom_indexer)
══════════════════════════════════════════════════════════════════════════════

Per-shop SQS queue


ecom_indexer Lambda (lambda_function.py)

├── 1. Parse SQS message → Record(job_id, s3_key, operation, ...)

├── 2. Download JSONL from S3
│ → list of MarqoResultDocument or MarqoDeletionDocument

├── 3. Resolve API key + index config
│ → api_key_resolver(cell_id, system_account_id)

├── 4. Chunk documents (batch_size ~50-100)

├── 5. Send to Marqo HTTP API
│ │
│ ├── ADD: POST /indexes/{name}/documents
│ └── DELETE: POST /indexes/{name}/documents/delete-batch
│ │
│ ├── Retry: exponential backoff, deadline-aware
│ ├── 429: reduce visibility, retry later
│ └── 409: conflict (stale write), skip

├── 6. Update job progress in DynamoDB
│ → processed_items, failed_items, skipped_items

└── 7. check_and_complete_job()
→ if all chunks done: status = COMPLETED
→ SyncJob lifecycle ends


══════════════════════════════════════════════════════════════════════════════
JOB LIFECYCLE
══════════════════════════════════════════════════════════════════════════════

PENDING ──► IN_PROGRESS ──► COMPLETED

└──────────► FAILED (on error / max retries)


══════════════════════════════════════════════════════════════════════════════
FAILURE / RETRY PATHS
══════════════════════════════════════════════════════════════════════════════

EventBridge SQS path:
┌──────────────────────────────────────────────────────────┐
│ Worker Lambda returns batchItemFailures │
│ │ │
│ ▼ │
│ SQS redelivers failed messages (visibility=90min) │
│ │ │
│ ├── Up to 5 retries (maxReceiveCount=5) │
│ │ │
│ └── After 5 failures → DLQ (14-day retention) │
└──────────────────────────────────────────────────────────┘

Indexer SQS path:
┌──────────────────────────────────────────────────────────┐
│ Indexer Lambda returns batchItemFailures │
│ │ │
│ ▼ │
│ Per-shop SQS redelivers → retry with backoff │
│ │ │
│ └── Self-DLQ: marks job FAILED before max retries │
└──────────────────────────────────────────────────────────┘