Skip to main content

Marqo Reindexing Pipeline

A serverless pipeline that reindexes documents from Vespa indexes to the Ecommerce API. It supports parallel extraction via configurable concurrency (slices), pause/resume functionality, and checkpoint-based crash recovery.

Architecture

Architecture

Components

ComponentPurpose
Control PlaneExternal system that initiates reindexing by writing a job record to the Reindexing Table
Reindexing TableDynamoDB table storing job metadata, status, and progress counters. DDB Stream triggers the Extraction Lambda on new or resumed jobs
Progress TableDynamoDB table storing Vespa continuation tokens per slice for checkpoint-based crash recovery
EcomJobs TableDynamoDB table tracking pending Ecommerce API job IDs awaiting completion (with 30-day TTL)
Extraction LambdaTriggered by DDB Stream on INSERT (pending) or MODIFY (pausedpending); creates a K8s Job in EKS
K8s Extraction JobRuns in a multi-tenant EKS cluster; extracts documents from Vespa using parallel sliced visits
Replay QueueSQS queue containing individual documents for replay to the Ecommerce API
Replay LambdaBatches up to 100 documents from the Replay Queue and sends them to the Ecommerce API
Progress QueueSQS queue for progress events tracking extracted, replayed, and completed document counts
Progress LambdaConsumes progress events and atomically updates counters on the Reindexing Table
EventBridge RuleScheduled trigger that invokes the Ecommerce Jobs Lambda every minute
Ecommerce Jobs LambdaPolls pending/in-progress jobs from the EcomJobs Table and checks completion status via the Ecommerce API
Dead Letter QueueSQS queue capturing failed messages for investigation (14-day retention)

Data Flow

  1. The Control Plane creates a reindexing job by writing a record (status pending) to the Reindexing Table
  2. The DDB Stream triggers the Extraction Lambda, which authenticates with EKS and creates a K8s Job
  3. The K8s Job extracts documents from Vespa using parallel sliced visits, transforms them to Marqo format, and sends them to the Replay Queue. It saves continuation tokens to the Progress Table after each page for crash recovery
  4. The Replay Lambda batches documents and sends them to the Ecommerce API (POST for full reindex, PATCH for field updates), storing the returned job IDs in the EcomJobs Table
  5. The EventBridge Rule triggers the Ecommerce Jobs Lambda every minute to poll job completion status from the Ecommerce API
  6. Progress counters (extracted_doc_count, replayed_doc_count, completed_doc_count) are updated atomically via the Progress Queue and Progress Lambda

Pause and Resume

Jobs can be paused by setting job_status to paused on the Reindexing Table. The K8s extraction job checks status and stops gracefully, preserving continuation tokens in the Progress Table. To resume, update job_status back to pending — the DDB Stream MODIFY event triggers the Extraction Lambda, which creates a new K8s Job that picks up from the saved checkpoints.

Key Design Decisions

  • K8s Job over Lambda for extraction: Avoids Lambda's 15-minute timeout and eliminates repeated EKS authentication. A single auth at Job startup replaces per-page auth
  • EventBridge + DynamoDB over SQS polling: Avoids AWS Lambda recursive loop detection, which stops SQS-based polling chains after ~16 iterations
  • Checkpoint recovery: Continuation tokens are persisted to the Progress Table after each page fetch, enabling crash recovery without re-extraction
  • At-least-once delivery: The Ecommerce API is idempotent on document ID, so duplicate sends result in the same indexed state
  • Strategy pattern for replay sinks: The ReplayEventSink abstraction supports different index types (Ecommerce, Classic, NoOp), making it extensible for future backends

Project Structure

reindexing-pipeline/
├── app/
│ ├── job/
│ │ └── main.py # K8s extraction job (Vespa → SQS)
│ ├── lambdas/
│ │ ├── extraction_lambda.py # DDB Stream → K8s Job creation
│ │ ├── progress_lambda.py # SQS → DynamoDB counter updates
│ │ ├── replay_lambda.py # SQS → Ecommerce API
│ │ ├── replay_event_sink.py # Strategy pattern for replay targets
│ │ └── ecom_jobs_lambda.py # Scheduled polling of Ecommerce job status
│ ├── shared/
│ │ ├── configs.py # HOCON config parsing into Pydantic models
│ │ ├── models.py # Pydantic v2 data models
│ │ └── utils.py # DynamoDB/SQS utilities
│ └── application.conf # HOCON configuration with env var overrides
├── test/ # pytest test suite (169 tests)
├── template.yaml # SAM/CloudFormation infrastructure
├── Dockerfile # Extraction job container (Python 3.14 Alpine)
├── Makefile # Lambda build targets for SAM
├── deploy.sh # Manual staging deployment script
├── samconfig.toml # SAM CLI configuration
├── requirements.txt # Runtime dependencies
└── requirements.dev.txt # Dev/test dependencies (pytest, moto, ruff, mypy)

Key Models

ReindexingJob

The central job record stored in the Reindexing Table with composite keys:

  • pk (HASH): system_account_id
  • sk (RANGE): {index_name}#{reindex_id}
  • GSI reindex_id_index (HASH): reindex_id
FieldTypeDescription
reindex_idstrUnique ID in format YYYY-MM-DD-HH-MM-SS-<8 hex chars>
reindex_typepost | patchFull document replacement or field-level update
job_statuspending | in_progress | paused | failedCurrent job state
concurrencyintNumber of parallel Vespa visit slices
fieldslist[str]?Fields to include (Patch mode only)
skip_extraction_if_fields_presentlist[str]?If set, drop docs at extraction time when their transformed Marqo doc already contains every named field — useful for cheap field-derivation backfills
destination_index_namestr?Optional target index name
extracted_doc_countint?Documents extracted from Vespa
replayed_doc_countint?Documents sent to Ecommerce API
completed_doc_countint?Documents confirmed indexed

IndexDetails

Index configuration read from the Index Config Table. Determines the index type based on metadata:

  • Ecommerce: metadata.shopId is present — uses Ecommerce API for replay
  • Classic: No shopId — currently unsupported (NoOp sink)

Validates that workflow_version starts with "4" (only v4 workflows supported).

Configuration

Configuration is managed via HOCON in app/application.conf. Each component has its own section with sensible defaults that can be overridden by environment variables:

extraction-lambda = {
reindexing-table = "reindexing-pipeline-reindexing-table"
reindexing-table = ${?REINDEXING_TABLE} # env var override
}

The SAM template injects environment variables from CloudFormation parameters and resource references (e.g., REINDEXING_TABLE: !Ref ReindexingTable).

Development

Prerequisites

  • Python 3.14+
  • AWS SAM CLI
  • Docker (for sam build --use-container)

Setup

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.dev.txt

Build

sam build --use-container

Validate

sam validate

Local Invocation

Lambdas can be invoked locally using SAM with the sample events in events/:

sam local invoke ExtractionLambda --event events/ddb_reindex_jobs.json
sam local invoke ReplayLambda --event events/sqs_docs_replay.json
sam local invoke ProgressLambda --event events/sqs_progress.json
sam local invoke EcomJobsLambda
FunctionEvent fileEvent type
ExtractionLambdaevents/ddb_reindex_jobs.jsonDynamoDB Stream INSERT
ReplayLambdaevents/sqs_docs_replay.jsonSQS message with document payload
ProgressLambdaevents/sqs_progress.jsonSQS message with progress counters
EcomJobsLambda(none — scheduled)CloudWatch scheduled event

Alternatively, test/main.py can invoke lambdas directly against a real AWS environment (e.g., staging) without SAM:

python test/main.py

Uncomment the desired function call in test/main.py and it will load the event JSON and invoke the handler using the staging AWS profile.

Deploy

sam deploy --no-fail-on-empty-changeset --capabilities CAPABILITY_NAMED_IAM

Testing

The test suite uses pytest with moto for AWS service mocking, responses for HTTP mocking, and unittest.mock for general mocking.

# Run all tests
pytest test

# Run with coverage
pytest test --cov=app --cov-report=term-missing

# Run a specific test file
pytest test/test_extraction_lambda.py -v

# Lint and format
ruff check
ruff format --check

# Type check
mypy app

CI/CD

The GitHub Actions workflow (.github/workflows/build-and-deploy.yml) runs on every push:

  1. Check code: mypy, ruff, pytest with coverage, sam validate
  2. Publish Docker image: Builds multi-arch (amd64/arm64) extraction job image, pushes to ECR
  3. Deploy (main branch only): Staging → Preprod → Production (with manual approval gates)

Production deployments create a GitHub release tagged YYYY-MM-DD.<short-sha>.