Marqo Reindexing Pipeline
A serverless pipeline that reindexes documents from Vespa indexes to the Ecommerce API. It supports parallel extraction via configurable concurrency (slices), pause/resume functionality, and checkpoint-based crash recovery.
Architecture

Components
| Component | Purpose |
|---|---|
| Control Plane | External system that initiates reindexing by writing a job record to the Reindexing Table |
| Reindexing Table | DynamoDB table storing job metadata, status, and progress counters. DDB Stream triggers the Extraction Lambda on new or resumed jobs |
| Progress Table | DynamoDB table storing Vespa continuation tokens per slice for checkpoint-based crash recovery |
| EcomJobs Table | DynamoDB table tracking pending Ecommerce API job IDs awaiting completion (with 30-day TTL) |
| Extraction Lambda | Triggered by DDB Stream on INSERT (pending) or MODIFY (paused → pending); creates a K8s Job in EKS |
| K8s Extraction Job | Runs in a multi-tenant EKS cluster; extracts documents from Vespa using parallel sliced visits |
| Replay Queue | SQS queue containing individual documents for replay to the Ecommerce API |
| Replay Lambda | Batches up to 100 documents from the Replay Queue and sends them to the Ecommerce API |
| Progress Queue | SQS queue for progress events tracking extracted, replayed, and completed document counts |
| Progress Lambda | Consumes progress events and atomically updates counters on the Reindexing Table |
| EventBridge Rule | Scheduled trigger that invokes the Ecommerce Jobs Lambda every minute |
| Ecommerce Jobs Lambda | Polls pending/in-progress jobs from the EcomJobs Table and checks completion status via the Ecommerce API |
| Dead Letter Queue | SQS queue capturing failed messages for investigation (14-day retention) |
Data Flow
- The Control Plane creates a reindexing job by writing a record (status
pending) to the Reindexing Table - The DDB Stream triggers the Extraction Lambda, which authenticates with EKS and creates a K8s Job
- The K8s Job extracts documents from Vespa using parallel sliced visits, transforms them to Marqo format, and sends them to the Replay Queue. It saves continuation tokens to the Progress Table after each page for crash recovery
- The Replay Lambda batches documents and sends them to the Ecommerce API (POST for full reindex, PATCH for field updates), storing the returned job IDs in the EcomJobs Table
- The EventBridge Rule triggers the Ecommerce Jobs Lambda every minute to poll job completion status from the Ecommerce API
- Progress counters (
extracted_doc_count,replayed_doc_count,completed_doc_count) are updated atomically via the Progress Queue and Progress Lambda
Pause and Resume
Jobs can be paused by setting job_status to paused on the Reindexing Table. The K8s extraction job checks status and stops gracefully, preserving continuation tokens in the Progress Table. To resume, update job_status back to pending — the DDB Stream MODIFY event triggers the Extraction Lambda, which creates a new K8s Job that picks up from the saved checkpoints.
Key Design Decisions
- K8s Job over Lambda for extraction: Avoids Lambda's 15-minute timeout and eliminates repeated EKS authentication. A single auth at Job startup replaces per-page auth
- EventBridge + DynamoDB over SQS polling: Avoids AWS Lambda recursive loop detection, which stops SQS-based polling chains after ~16 iterations
- Checkpoint recovery: Continuation tokens are persisted to the Progress Table after each page fetch, enabling crash recovery without re-extraction
- At-least-once delivery: The Ecommerce API is idempotent on document ID, so duplicate sends result in the same indexed state
- Strategy pattern for replay sinks: The
ReplayEventSinkabstraction supports different index types (Ecommerce, Classic, NoOp), making it extensible for future backends
Project Structure
reindexing-pipeline/
├── app/
│ ├── job/
│ │ └── main.py # K8s extraction job (Vespa → SQS)
│ ├── lambdas/
│ │ ├── extraction_lambda.py # DDB Stream → K8s Job creation
│ │ ├── progress_lambda.py # SQS → DynamoDB counter updates
│ │ ├── replay_lambda.py # SQS → Ecommerce API
│ │ ├── replay_event_sink.py # Strategy pattern for replay targets
│ │ └── ecom_jobs_lambda.py # Scheduled polling of Ecommerce job status
│ ├── shared/
│ │ ├── configs.py # HOCON config parsing into Pydantic models
│ │ ├── models.py # Pydantic v2 data models
│ │ └── utils.py # DynamoDB/SQS utilities
│ └── application.conf # HOCON configuration with env var overrides
├── test/ # pytest test suite (169 tests)
├── template.yaml # SAM/CloudFormation infrastructure
├── Dockerfile # Extraction job container (Python 3.14 Alpine)
├── Makefile # Lambda build targets for SAM
├── deploy.sh # Manual staging deployment script
├── samconfig.toml # SAM CLI configuration
├── requirements.txt # Runtime dependencies
└── requirements.dev.txt # Dev/test dependencies (pytest, moto, ruff, mypy)
Key Models
ReindexingJob
The central job record stored in the Reindexing Table with composite keys:
- pk (HASH):
system_account_id - sk (RANGE):
{index_name}#{reindex_id} - GSI
reindex_id_index(HASH):reindex_id
| Field | Type | Description |
|---|---|---|
reindex_id | str | Unique ID in format YYYY-MM-DD-HH-MM-SS-<8 hex chars> |
reindex_type | post | patch | Full document replacement or field-level update |
job_status | pending | in_progress | paused | failed | Current job state |
concurrency | int | Number of parallel Vespa visit slices |
fields | list[str]? | Fields to include (Patch mode only) |
skip_extraction_if_fields_present | list[str]? | If set, drop docs at extraction time when their transformed Marqo doc already contains every named field — useful for cheap field-derivation backfills |
destination_index_name | str? | Optional target index name |
extracted_doc_count | int? | Documents extracted from Vespa |
replayed_doc_count | int? | Documents sent to Ecommerce API |
completed_doc_count | int? | Documents confirmed indexed |
IndexDetails
Index configuration read from the Index Config Table. Determines the index type based on metadata:
- Ecommerce:
metadata.shopIdis present — uses Ecommerce API for replay - Classic: No
shopId— currently unsupported (NoOp sink)
Validates that workflow_version starts with "4" (only v4 workflows supported).
Configuration
Configuration is managed via HOCON in app/application.conf. Each component has its own section with sensible defaults that can be overridden by environment variables:
extraction-lambda = {
reindexing-table = "reindexing-pipeline-reindexing-table"
reindexing-table = ${?REINDEXING_TABLE} # env var override
}
The SAM template injects environment variables from CloudFormation parameters and resource references (e.g., REINDEXING_TABLE: !Ref ReindexingTable).
Development
Prerequisites
- Python 3.14+
- AWS SAM CLI
- Docker (for
sam build --use-container)
Setup
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.dev.txt
Build
sam build --use-container
Validate
sam validate
Local Invocation
Lambdas can be invoked locally using SAM with the sample events in events/:
sam local invoke ExtractionLambda --event events/ddb_reindex_jobs.json
sam local invoke ReplayLambda --event events/sqs_docs_replay.json
sam local invoke ProgressLambda --event events/sqs_progress.json
sam local invoke EcomJobsLambda
| Function | Event file | Event type |
|---|---|---|
ExtractionLambda | events/ddb_reindex_jobs.json | DynamoDB Stream INSERT |
ReplayLambda | events/sqs_docs_replay.json | SQS message with document payload |
ProgressLambda | events/sqs_progress.json | SQS message with progress counters |
EcomJobsLambda | (none — scheduled) | CloudWatch scheduled event |
Alternatively, test/main.py can invoke lambdas directly against a real AWS environment (e.g., staging) without SAM:
python test/main.py
Uncomment the desired function call in test/main.py and it will load the event JSON and invoke the handler using the staging AWS profile.
Deploy
sam deploy --no-fail-on-empty-changeset --capabilities CAPABILITY_NAMED_IAM
Testing
The test suite uses pytest with moto for AWS service mocking, responses for HTTP mocking, and unittest.mock for general mocking.
# Run all tests
pytest test
# Run with coverage
pytest test --cov=app --cov-report=term-missing
# Run a specific test file
pytest test/test_extraction_lambda.py -v
# Lint and format
ruff check
ruff format --check
# Type check
mypy app
CI/CD
The GitHub Actions workflow (.github/workflows/build-and-deploy.yml) runs on every push:
- Check code: mypy, ruff, pytest with coverage, sam validate
- Publish Docker image: Builds multi-arch (amd64/arm64) extraction job image, pushes to ECR
- Deploy (main branch only): Staging → Preprod → Production (with manual approval gates)
Production deployments create a GitHub release tagged YYYY-MM-DD.<short-sha>.