Skip to main content

Ecom Indexer DDB Write Optimization

Problem

EcomIndexerJobsTable write costs grow super-linearly with job size. A typical 200-chunk indexing job consumes ~300–500 WCU today, driven by three compounding factors:

  1. Item size grows during the job. Each per-chunk flush list_appends to failed_items_details, skipped_items_details, and conflict_details. DynamoDB charges UpdateItem based on the post-update item size, so every subsequent write costs more than the previous one.
  2. GSI write amplification. Both GSI_JobLookup and GSI_JobsByStatus use ProjectionType.ALL, replicating the full item (including the growing detail maps) on every write. Each base-table write becomes three writes at the projected size.
  3. Per-chunk write multiplicity. The chunk batcher already collapses N chunks into one flush, but each flush is 1–2 writes, IN_PROGRESS / platform_data / completion are separate writes, and chunk_ids ({s3_key}#part#{idx}) and job_ids (36-char UUIDs) inflate both the item itself and the completed_chunks set.

Counted writes per 200-chunk job today (components/ecom_indexer/ecom_indexer/):

  • update_job_status(IN_PROGRESS) — 1
  • update_platform_data(settings) — 1
  • _apply_chunk_delta preparse (when applicable) — 1
  • ChunkDeltaBatcher flushes — ⌈200/50⌉ × (1 or 2) = 4–8
  • check_and_complete_job — 1
  • Retry / error paths — variable

Total: ~10 writes per job, each replicating a fat item to two ALL-projected GSIs.

Goals

  • Reduce WCU per typical job by ~50x.
  • Keep idempotency semantics intact (per-chunk dedup via completed_chunks continues to work).
  • Preserve diagnostic visibility into failed / skipped / conflict items.
  • Avoid behavior changes on the read path that callers depend on.

Non-goals: changing job semantics, SQS message flow, or the workflow topology.

Design overview

Four orthogonal changes, each independently valuable:

  1. Offload unbounded fields to S3. Failed / skipped / conflict details and platform_data move out of DDB into a derived-key S3 layout. The DDB item stays small and constant-size for the life of the job.
  2. Shrink remaining keys. Job IDs go from 36-char UUIDs to ~13-char base32. Chunk IDs drop the embedded s3_key prefix, becoming short suffix-only tokens.
  3. Reduce GSI projections. GSI_JobLookup becomes KEYS_ONLY; GSI_JobsByStatus becomes INCLUDE with a deliberately chosen attribute list that excludes counters and completed_chunks. Per-chunk flushes then skip both GSIs entirely.
  4. Merge writes. Fold IN_PROGRESS into the first chunk flush, route preparse skips through the batcher, and replace the strongly-consistent read in check_and_complete_job with ReturnValues="UPDATED_NEW" on the last flush.

Phase 1 — S3 offload

JobDetailsStore

Lives at components/ecom_utils/ecom_utils/job_details_store/job_details_store.py so both writers (the indexer) and readers (SyncJobResponse builders in admin_server / admin_lambda) can share it without taking a dependency on the indexer component.

class JobDetailsPayload(BaseModel):
chunk_ids: list[str]
failed: dict[str, list[str]]
skipped: dict[str, list[str]]
conflict: dict[str, list[ConflictItem]]

class JobDetailsStore:
def __init__(self, s3_client, bucket: str): ...

@staticmethod
def details_key(job_id: str, flush_id: str) -> str:
return f"job-details/{job_id}/{flush_id}.json.gz"

@staticmethod
def platform_data_key(job_id: str) -> str:
return f"job-details/{job_id}/platform-data.json"

def write_flush(self, job_id: str, flush_id: str, payload: JobDetailsPayload) -> None:
# Gzip + PutObject. Idempotent on flush_id.

def write_platform_data(self, job_id: str, data: dict) -> None:
# PutObject; same key per job, overwrite-safe.

def read_all_details(self, job_id: str) -> JobDetails:
# List prefix, parallel GETs, merge by reason.

def read_platform_data(self, job_id: str) -> dict | None:
# GET; returns None if absent.

Module-level helpers flush_id_for_chunks, dedupe_doc_ids, and dedupe_conflict_items are exposed alongside the store so writers don't reimplement them.

S3 keys are fully derivable from job_id — no pointer stored in DDB. Payloads are gzipped (Content-Encoding: gzip) since doc_id lists compress 5–10x.

Empty-bucket contract. The store constructor accepts any bucket string. The indexer's _get_job_details_store() is the single source of truth for "is the bucket configured" — it returns None when ECOM_JOB_DETAILS_BUCKET is unset, and writers no-op the S3 leg in that case. The singleton lookup is guarded by a threading.Lock since the indexer's parallel handler invokes from worker threads.

ChunkDeltaBatcher rewrite

components/ecom_indexer/ecom_indexer/chunk_delta_batcher.py:

  • _flush_combined becomes one DDB write instead of two.
  • Compute flush_id = sha256(sorted(chunk_ids))[:16] — deterministic so re-flushing the same set re-PUTs to the same S3 key (idempotent at the S3 layer).
  • If any details exist: call JobDetailsStore.write_flush(...) before the DDB update.
  • DDB update: ADD counters + completed_chunks, conditional on no chunk_id collision. Drop all failed_items_details / skipped_items_details / conflict_details set parts.
  • Drop _write_details_with_retry and the _DETAIL_WRITE_* constants entirely.

Failure modes:

S3 PUTDDB updateResult
okokcommitted
okconditional rejectcounters already applied; S3 object is a harmless extra
oktransient failSQS retry re-PUTs same key, retries DDB update
failnot attemptedclean retry on next message delivery

Platform data move

Replace the update_platform_data(...) call at lambda_function.py:289 with JobDetailsStore.write_platform_data(job_id, settings). Delete update_platform_data in job_management.py.

DDB schema changes

Drop these attributes from new writes (read-side fallback during migration — see Migration):

  • failed_items_details
  • skipped_items_details
  • conflict_details
  • platform_data
  • skipped_summary, error_summary (if they're derived from the details — needs verification)

Consumer audit

Anything that reads the dropped attributes switches to JobDetailsStore. Known sites:

  • components/shopify/admin_server/admin_server/services/sync_jobs_service.py
  • components/admin_lambda/admin_lambda/services/sync_jobs_service.py
  • components/ecom_indexer/ecom_indexer/job_metrics.py (if it surfaces detail counts)
  • components/ecom_utils/ecom_utils/sync_jobs_service/sync_job_repository.py (model deserialization)

Hot path (check_and_complete_job) only reads counters, so it's unaffected.

Phase 2 — Short chunk_ids + write merging

Short chunk_ids

Chunk uniqueness is scoped per-job (the completed_chunks set is per-row), so chunk_ids don't need the s3_key prefix:

SiteBeforeAfter
lambda_function.py:248f"{s3_key}#preparse""preparse"
lambda_function.py:465, 560, 575f"{s3_key}#part#{idx}"f"p{idx}"
lambda_function.py:1797f"{record.s3_key}#timeout_fallback""timeout"

A 1000-chunk job's completed_chunks set drops from ~100 KB to ~10 KB — a ~10x reduction on every write.

Write merging

Fold IN_PROGRESS into the first chunk flush. Today: lambda_function.py:1642 (sequential), :1330 (polled), :612 (parallel) each do update_job_status(IN_PROGRESS) before processing starts. After: drop those calls, and have the first batcher flush conditionally set the status:

UpdateExpression:
ADD processed_items :s, failed_items :f, ..., completed_chunks :cs
SET last_updated = :ts,
job_status = if-eligible(:inprog),
started_at = if_not_exists(started_at, :ts),
status_created_at = if-eligible(:sca)
ConditionExpression:
(attribute_not_exists(job_status) OR job_status IN (:pending, :failed, :conflict))
AND <existing chunk-id no-collision check>

DynamoDB doesn't support inline conditional SETs, so this is implemented as: read current job_status via _get_job_item_for_update (which we already do for pk/sk), include the appropriate SET parts in the flush only if the status warrants it. The condition guards against races.

Route preparse skips through the batcher. lambda_function.py:244 does a standalone _apply_chunk_delta for JSONL parse failures. Replace with batcher.add(chunk_id="preparse", success=0, skipped_details=preparse_skipped, ...) and let the batcher merge it with the first real flush.

Drop the strongly-consistent read in check_and_complete_job. job_management.py:309 does get_item(..., ConsistentRead=True) to decide whether to write terminal status. Replace with: the batcher's last update_item passes ReturnValues="UPDATED_NEW", and _process_documents_common inspects the returned counters in-memory before deciding to call into check_and_complete_job. If counters already sum to total_items, do the conditional terminal write directly without a re-read.

Fold first-retry attribution into IN_PROGRESS. retry_observability.py:53 writes set_first_retry_attribution as a dedicated conditional write on first retry. Since the redrive path's flush already sets IN_PROGRESS (after the merge above), include retry_attribution in that same update, conditional on attribute_not_exists(retry_attribution.first_retry_at).

Phase 3 — Short job IDs

Generation

Replace UUID4 with 13-char base32 (60 bits entropy):

def generate_job_id() -> str:
return base64.b32encode(secrets.token_bytes(8)).decode().rstrip("=").lower()
# Example: "k3jqxr2m7zaba"

At ~1M lifetime jobs, P(any collision) ≈ 10⁻⁶. The collision case is handled by the existing attribute_not_exists(pk) conditional create — retry on ConditionalCheckFailedException with a fresh ID.

Upstream changes

Job creation happens in:

  • components/shopify/admin_server/admin_server/services/sync_service.py
  • Possibly components/ecom_settings_exporter/

Need to audit:

  1. Where job_id is generated (likely a Pydantic default_factory=lambda: str(uuid.uuid4()) on SyncJob).
  2. Where s3_key is constructed and whether it embeds job_id (shorter job_id → naturally shorter s3_keys).
  3. Any logs or external systems that pattern-match on UUID-shaped job_ids.

Migration

Old UUID-shaped jobs and new short-id jobs coexist — no migration needed for in-flight rows. Once long enough has passed (e.g., 30 days, matching the TTL), all rows will be short-id and dashboards/queries can drop UUID-specific assumptions if any.

Phase 4 — GSI projection migration

Target projections

GSI_JobLookupProjectionType.KEYS_ONLY

  • Used only for "find pk/sk by job_id" lookups (sync_job_repository.py:get_shop_job_by_id, job_management.py:_get_job_item_for_update).
  • Callers do a follow-up get_item(pk, sk) on the base table. One extra read per lookup (~0.5 RCU), saves a full-item replicate on every write to that item.

GSI_JobsByStatusProjectionType.INCLUDE with explicit attribute list:

non_key_attributes = [
"job_id", "job_type", "is_internal", "platform",
"started_at", "completed_at", "error_message",
"ttl", "created_at", "retry_count",
"estimated_time_minutes", "initiator_user_id",
]

Deliberately excluded: counters (total_items, processed_items, failed_items, skipped_items, conflict_items), completed_chunks, retry_attribution, last_updated.

Why this works

DynamoDB only updates a GSI when a write touches an attribute projected into it. Per-chunk flushes modify only counters + completed_chunks + last_updated — none of which are projected — so they don't write to either GSI. Only status transitions update GSI_JobsByStatus (via status_created_at, which is a key attribute).

Listing UX trade-off

Confirmed offline: the job-list view does not need to show live progress counters. Detail view fetches counters from the base table when the user clicks a row. List view shows status, timestamps, and error_message — all projected.

Migration plan

GSI projection changes require dropping and recreating the GSI in DynamoDB. To avoid downtime:

  1. Add GSI_JobLookup_v2 (KEYS_ONLY) and GSI_JobsByStatus_v2 (INCLUDE) alongside the existing GSIs. Both backfill automatically as DDB scans the base table.
  2. Wait for backfill to complete. Verify item counts and a sample of expected attributes via console / aws dynamodb describe-table.
  3. In application code, switch all readers to query the _v2 indexes. Deploy and bake for one release.
  4. Drop the original GSI_JobLookup and GSI_JobsByStatus.
  5. (Optional, cosmetic) Add _v3 indexes with the original names and switch back, then drop _v2. Skip unless naming bothers operators.

The CDK change for step 1 is non-destructive (add); step 4 is destructive (remove) but only after readers have migrated.

Estimated impact

Per 200-chunk job, comparing today vs. fully migrated state:

MetricTodayAfter
DDB writes~10~5
Average item size on writegrows to ~50 KB~1–2 KB constant
GSI replications per write2 (full item)0 for chunk flushes, 1 small for status transitions
Approximate WCU per job~300–500~8–12
S3 PUTs per job0~5 (gzipped, < $0.0001)

That's roughly a 30–50x WCU reduction. The S3 cost is negligible by comparison.

Risks and open questions

  1. completed_chunks is still in DDB. Considered moving to S3 marker objects with IfNoneMatch: "*" for chunk-level idempotency. Decision: leave it in DDB — adds an S3 round-trip on the hot path per chunk for marginal savings once it's shrunk by short chunk_ids.

  2. skipped_summary and error_summary. Derived in SyncJobData.from_sync_job from the detail dicts, so they follow whichever source the reader picked (inline or S3). No independent DDB attribute to preserve.

  3. Read latency on GSI_JobLookup KEYS_ONLY. Every job lookup gains one get_item. Should be sub-millisecond for small items and not noticeable on the hot path, but worth measuring on the indexer's GSI lookup loop in _get_job_item_for_update after step #6.

  4. DDB Streams. infra/ecom/stacks/ecom_stack.py:250 does not enable streams on EcomIndexerJobsTable (TableV2 default is off). Confirm no consumer depends on streams before assuming this stays disabled.

  5. SQS event source batch size. Worth checking the event source mapping config — larger batches amortize GSI lookups and other fixed costs across messages.

  6. List-view detail summaries. After step #4, list-endpoint responses will show empty errorSummary / skippedSummary / conflictSummary for new jobs unless the listing route also constructs a JobDetailsStore and passes it through from_sync_job. The current step #3 plan wires only the single-job route to limit per-request S3 traffic; if list summaries are needed live, the listing route would have to parallelise an S3 fetch per visible job — explicit trade-off to revisit if the UI regresses.

Rollout sequence

Carved so every step is independently deployable. Each PR's additive changes are safe to land first; the destructive / behavior-changing steps gate on the additive predecessors being in production.

#Branch / PRTypeDepends on (deployed)
1claude/ddb-infraAdditive infra: _v2 GSIs (CDK), ECOM_JOB_DETAILS_BUCKET env var, JobDetailsStore module skeleton, moto + test fixtures updated to mirror the v2 projection shape
2claude/ddb-readerAdditive: optional details_store: JobDetailsStore | None parameter on SyncJobData.from_sync_job with S3 dual-read fallback. Default behaviour unchanged.#1
3(follow-up)Reader wiring: construct a JobDetailsStore in admin_server / admin_lambda single-job route handlers and pass it. Adds ECOM_JOB_DETAILS_BUCKET env var to those Lambdas' CDK config.#2
4claude/ddb-logicBehavior change: writer drops inline detail / platform_data DDB writes, single combined UpdateItem per flush, short chunk_ids, write-merging. Stacked on claude/ddb-reader so it can't merge before the reader fallback is in.#3 (so new jobs aren't UX-regressed)
5(follow-up)Short job IDs (Phase 3) — 13-char base32 generator in upstream SyncJob. Independent of #4; can deploy any time.
6(follow-up)GSI reader switch — get_shop_job_by_id and _get_job_item_for_update start querying GSI_JobLookup_v2 (KEYS_ONLY) with a follow-up get_item on the base table; listing endpoints query GSI_JobsByStatus_v2. Requires v2 backfill to be complete in prod (operator check).#1 backfill complete
7(follow-up)GSI v1 drop — CDK removes the original ALL-projection indexes. Destructive; only after readers are off them for a release cycle.#6

The git stack (#1 → #2 → #4) enforces the deploy-order dependency that matters most: the writer-drop-inline change literally cannot merge before the reader fallback is in main. Steps #3, #5, #6, #7 are independent enough to ship as flat siblings without restructuring the stack.

Each step is independently reversible until the destructive v1 GSI drop in #7.

Production verification

A four-pronged plan: synthetic dry-run before deploy → metrics during deploy → spot-checks after deploy → rollback levers.

Pre-deploy

  • GSI v2 backfill confirmation. After #1 deploys, before #4: aws dynamodb describe-table --table-name {env}-EcomIndexerJobsTable and assert both _v2 GSIs are ACTIVE with ItemCount matching the base table within tolerance. CDK doesn't tell you when backfill finishes; this is the operator check that gates #6 and (indirectly) #4 deployment safety.
  • Env-var smoke test. aws lambda get-function-configuration on the indexer Lambda → verify ECOM_JOB_DETAILS_BUCKET is set. If empty, _get_job_details_store() returns None and writes silently fall back to inline (i.e. no behavior change). Useful as a kill-switch (see rollback).
  • Bucket IAM smoke. Invoke the indexer once after deploy; check CloudWatch for any AccessDenied on s3:PutObject against job-details/.

During deploy — metrics to watch

  • EcomIndexerJobsTable CloudWatch:
    • ConsumedWriteCapacityUnits drops. 7-day average pre/post comparison. Expect ~30–50x reduction for large jobs, ~3–5x for small.
    • SuccessfulRequestLatency.UpdateItem p50/p99 flat or better.
    • No spike in UserErrors / ConditionalCheckFailedRequests.
  • Bucket CloudWatch (or S3 Storage Lens):
    • PutObject count with prefix job-details/ rises as expected (≈ 1 + ⌈chunks/50⌉ per job).
    • 4xx/5xx rate stays at baseline.
  • Lambda CloudWatch: error rate, duration, DLQ depth unchanged. Duration ticks up ~50–100ms per flush from the added S3 PUT.
  • Custom log alarm: grep Failed to write job-details S3 object — expect zero or very few.

Post-deploy functional checks

For a known production job that just completed under the new path:

  • aws dynamodb get-item on its row — verify item is a few KB, contains counters + completed_chunks, and does not carry failed_items_details / skipped_items_details / conflict_details / the indexer's platform_data.indexer subtree.
  • aws s3 ls s3://{bucket}/job-details/{job_id}/ — verify one or more *.json.gz flush files plus one platform-data.json.
  • GET /shops/{shop}/sync-jobs/{job_id} — single-job view still surfaces failedItemsDetails, errorSummary, platformData.indexer.* (populated from S3 via the reader fallback wiring from #3).
  • GET /shops/{shop}/sync-jobs — counters and status still display correctly. List view may show empty details for new jobs if #3 only wired the single-job route; explicit trade-off in that PR.

Canary order

dev-*stagingprod, with at least 24 h bake at each. Where cell-level prod rollout is supported, stage across cells.

Rollback levers

In ascending severity:

  1. Kill-switch (no redeploy). Set ECOM_JOB_DETAILS_BUCKET="" on the indexer Lambda. Post-#4 this means new detail data is dropped, but counters and progress still work. Recover by re-setting the env var.
  2. Revert #4 (writer-drop-inline). Inline DDB writes resume. Jobs that ran during the gap keep their S3 data; readers prefer inline so the S3 data simply isn't consulted. No data loss.
  3. Revert #3 (reader wiring). The reader fallback API still exists but no one calls it. Combined with #4 still deployed, this is the worst UX state (new-job detail drill-down empty); recoverable by re-applying.
  4. CDK changes (#1) are safe to leave deployed during any rollback. Nothing depends on the _v2 GSIs disappearing.

Future: cold-tier archival

The hot-path online layout (one flush file per per-chunk-batch, one platform-data.json per job) is right for the write path: simple, idempotent, partial-failure friendly, no cross-job coordination. It is not the right shape for long-term storage of completed jobs:

  • Many tiny objects compress poorly across job boundaries.
  • Per-object overhead dominates Glacier-tier storage costs.
  • A 30-day-old, terminal job's flush files are read at most once during forensic queries — perfect for archival.

The proposal is a separate async consolidator (Lambda on EventBridge schedule, or Step Functions task), not an online bundling change to the writer:

  1. Find terminal jobs older than N days: job_status IN (COMPLETED, FAILED, CONFLICT) AND completed_at < cutoff. Query via GSI_JobsByStatus_v2.
  2. For each shop's eligible jobs, bundle the per-flush files into one archive per {shop_id}/{yyyy-mm}. Inside: shared platform-data section, per-job JSONL records.
  3. Write the archive to s3://{bucket}/job-details-cold/{shop_id}/{yyyy-mm}/jobs.jsonl.gz. Verify, then delete the original per-flush objects.
  4. S3 lifecycle policy transitions job-details-cold/ to Glacier Instant Retrieval ($0.004/GB-mo, ms-latency) or Glacier Flexible Retrieval ($0.0036/GB-mo, minutes-latency) depending on how often forensic lookups happen.

Reads stay simple:

  • Recent jobs (< N days): same code path as today, hits job-details/ (S3 Standard).
  • Old jobs: the reader transparently falls back to the cold-tier archive — one GET, extract by job_id. Slower but acceptable for forensic queries.

Why not bundle online (during write) instead:

  • A single SQS batch can contain 1–10 distinct jobs in any combination; a single job spans multiple invocations over its lifetime. The natural archive unit (per-invocation) doesn't match a useful read unit (per-job), so reads degrade to "find every archive containing job X" — needs an index in DDB, which defeats the savings.
  • Archives can't be "finalized" until the invocation knows it's done. Partial-progress visibility (consumers watching a long-running job) is useful and the current model gives it for free.
  • Compression gain across jobs is real but small: ~20%, because the bulk of the payload is per-job doc-id lists that don't share content. The S3 PUT-count reduction is also negligible at this volume ($5/M PUTs).

Cold-tier consolidation is plausibly 5–6x cheaper than online bundling and keeps the hot path simple. Scope as a follow-up component once #1–#4 are stable.