Ecom Indexer DDB Write Optimization
Problem
EcomIndexerJobsTable write costs grow super-linearly with job size. A typical 200-chunk indexing job consumes ~300–500 WCU today, driven by three compounding factors:
- Item size grows during the job. Each per-chunk flush
list_appends tofailed_items_details,skipped_items_details, andconflict_details. DynamoDB chargesUpdateItembased on the post-update item size, so every subsequent write costs more than the previous one. - GSI write amplification. Both
GSI_JobLookupandGSI_JobsByStatususeProjectionType.ALL, replicating the full item (including the growing detail maps) on every write. Each base-table write becomes three writes at the projected size. - Per-chunk write multiplicity. The chunk batcher already collapses N chunks into one flush, but each flush is 1–2 writes, IN_PROGRESS / platform_data / completion are separate writes, and chunk_ids (
{s3_key}#part#{idx}) and job_ids (36-char UUIDs) inflate both the item itself and thecompleted_chunksset.
Counted writes per 200-chunk job today (components/ecom_indexer/ecom_indexer/):
update_job_status(IN_PROGRESS)— 1update_platform_data(settings)— 1_apply_chunk_deltapreparse (when applicable) — 1ChunkDeltaBatcherflushes — ⌈200/50⌉ × (1 or 2) = 4–8check_and_complete_job— 1- Retry / error paths — variable
Total: ~10 writes per job, each replicating a fat item to two ALL-projected GSIs.
Goals
- Reduce WCU per typical job by ~50x.
- Keep idempotency semantics intact (per-chunk dedup via
completed_chunkscontinues to work). - Preserve diagnostic visibility into failed / skipped / conflict items.
- Avoid behavior changes on the read path that callers depend on.
Non-goals: changing job semantics, SQS message flow, or the workflow topology.
Design overview
Four orthogonal changes, each independently valuable:
- Offload unbounded fields to S3. Failed / skipped / conflict details and platform_data move out of DDB into a derived-key S3 layout. The DDB item stays small and constant-size for the life of the job.
- Shrink remaining keys. Job IDs go from 36-char UUIDs to ~13-char base32. Chunk IDs drop the embedded s3_key prefix, becoming short suffix-only tokens.
- Reduce GSI projections.
GSI_JobLookupbecomesKEYS_ONLY;GSI_JobsByStatusbecomesINCLUDEwith a deliberately chosen attribute list that excludes counters andcompleted_chunks. Per-chunk flushes then skip both GSIs entirely. - Merge writes. Fold IN_PROGRESS into the first chunk flush, route preparse skips through the batcher, and replace the strongly-consistent read in
check_and_complete_jobwithReturnValues="UPDATED_NEW"on the last flush.
Phase 1 — S3 offload
JobDetailsStore
Lives at components/ecom_utils/ecom_utils/job_details_store/job_details_store.py so both writers (the indexer) and readers (SyncJobResponse builders in admin_server / admin_lambda) can share it without taking a dependency on the indexer component.
class JobDetailsPayload(BaseModel):
chunk_ids: list[str]
failed: dict[str, list[str]]
skipped: dict[str, list[str]]
conflict: dict[str, list[ConflictItem]]
class JobDetailsStore:
def __init__(self, s3_client, bucket: str): ...
@staticmethod
def details_key(job_id: str, flush_id: str) -> str:
return f"job-details/{job_id}/{flush_id}.json.gz"
@staticmethod
def platform_data_key(job_id: str) -> str:
return f"job-details/{job_id}/platform-data.json"
def write_flush(self, job_id: str, flush_id: str, payload: JobDetailsPayload) -> None:
# Gzip + PutObject. Idempotent on flush_id.
def write_platform_data(self, job_id: str, data: dict) -> None:
# PutObject; same key per job, overwrite-safe.
def read_all_details(self, job_id: str) -> JobDetails:
# List prefix, parallel GETs, merge by reason.
def read_platform_data(self, job_id: str) -> dict | None:
# GET; returns None if absent.
Module-level helpers flush_id_for_chunks, dedupe_doc_ids, and dedupe_conflict_items are exposed alongside the store so writers don't reimplement them.
S3 keys are fully derivable from job_id — no pointer stored in DDB. Payloads are gzipped (Content-Encoding: gzip) since doc_id lists compress 5–10x.
Empty-bucket contract. The store constructor accepts any bucket string. The indexer's _get_job_details_store() is the single source of truth for "is the bucket configured" — it returns None when ECOM_JOB_DETAILS_BUCKET is unset, and writers no-op the S3 leg in that case. The singleton lookup is guarded by a threading.Lock since the indexer's parallel handler invokes from worker threads.
ChunkDeltaBatcher rewrite
components/ecom_indexer/ecom_indexer/chunk_delta_batcher.py:
_flush_combinedbecomes one DDB write instead of two.- Compute
flush_id = sha256(sorted(chunk_ids))[:16]— deterministic so re-flushing the same set re-PUTs to the same S3 key (idempotent at the S3 layer). - If any details exist: call
JobDetailsStore.write_flush(...)before the DDB update. - DDB update:
ADDcounters +completed_chunks, conditional on no chunk_id collision. Drop allfailed_items_details/skipped_items_details/conflict_detailsset parts. - Drop
_write_details_with_retryand the_DETAIL_WRITE_*constants entirely.
Failure modes:
| S3 PUT | DDB update | Result |
|---|---|---|
| ok | ok | committed |
| ok | conditional reject | counters already applied; S3 object is a harmless extra |
| ok | transient fail | SQS retry re-PUTs same key, retries DDB update |
| fail | not attempted | clean retry on next message delivery |
Platform data move
Replace the update_platform_data(...) call at lambda_function.py:289 with JobDetailsStore.write_platform_data(job_id, settings). Delete update_platform_data in job_management.py.
DDB schema changes
Drop these attributes from new writes (read-side fallback during migration — see Migration):
failed_items_detailsskipped_items_detailsconflict_detailsplatform_dataskipped_summary,error_summary(if they're derived from the details — needs verification)
Consumer audit
Anything that reads the dropped attributes switches to JobDetailsStore. Known sites:
components/shopify/admin_server/admin_server/services/sync_jobs_service.pycomponents/admin_lambda/admin_lambda/services/sync_jobs_service.pycomponents/ecom_indexer/ecom_indexer/job_metrics.py(if it surfaces detail counts)components/ecom_utils/ecom_utils/sync_jobs_service/sync_job_repository.py(model deserialization)
Hot path (check_and_complete_job) only reads counters, so it's unaffected.
Phase 2 — Short chunk_ids + write merging
Short chunk_ids
Chunk uniqueness is scoped per-job (the completed_chunks set is per-row), so chunk_ids don't need the s3_key prefix:
| Site | Before | After |
|---|---|---|
lambda_function.py:248 | f"{s3_key}#preparse" | "preparse" |
lambda_function.py:465, 560, 575 | f"{s3_key}#part#{idx}" | f"p{idx}" |
lambda_function.py:1797 | f"{record.s3_key}#timeout_fallback" | "timeout" |
A 1000-chunk job's completed_chunks set drops from ~100 KB to ~10 KB — a ~10x reduction on every write.
Write merging
Fold IN_PROGRESS into the first chunk flush. Today: lambda_function.py:1642 (sequential), :1330 (polled), :612 (parallel) each do update_job_status(IN_PROGRESS) before processing starts. After: drop those calls, and have the first batcher flush conditionally set the status:
UpdateExpression:
ADD processed_items :s, failed_items :f, ..., completed_chunks :cs
SET last_updated = :ts,
job_status = if-eligible(:inprog),
started_at = if_not_exists(started_at, :ts),
status_created_at = if-eligible(:sca)
ConditionExpression:
(attribute_not_exists(job_status) OR job_status IN (:pending, :failed, :conflict))
AND <existing chunk-id no-collision check>
DynamoDB doesn't support inline conditional SETs, so this is implemented as: read current job_status via _get_job_item_for_update (which we already do for pk/sk), include the appropriate SET parts in the flush only if the status warrants it. The condition guards against races.
Route preparse skips through the batcher. lambda_function.py:244 does a standalone _apply_chunk_delta for JSONL parse failures. Replace with batcher.add(chunk_id="preparse", success=0, skipped_details=preparse_skipped, ...) and let the batcher merge it with the first real flush.
Drop the strongly-consistent read in check_and_complete_job. job_management.py:309 does get_item(..., ConsistentRead=True) to decide whether to write terminal status. Replace with: the batcher's last update_item passes ReturnValues="UPDATED_NEW", and _process_documents_common inspects the returned counters in-memory before deciding to call into check_and_complete_job. If counters already sum to total_items, do the conditional terminal write directly without a re-read.
Fold first-retry attribution into IN_PROGRESS. retry_observability.py:53 writes set_first_retry_attribution as a dedicated conditional write on first retry. Since the redrive path's flush already sets IN_PROGRESS (after the merge above), include retry_attribution in that same update, conditional on attribute_not_exists(retry_attribution.first_retry_at).
Phase 3 — Short job IDs
Generation
Replace UUID4 with 13-char base32 (60 bits entropy):
def generate_job_id() -> str:
return base64.b32encode(secrets.token_bytes(8)).decode().rstrip("=").lower()
# Example: "k3jqxr2m7zaba"
At ~1M lifetime jobs, P(any collision) ≈ 10⁻⁶. The collision case is handled by the existing attribute_not_exists(pk) conditional create — retry on ConditionalCheckFailedException with a fresh ID.
Upstream changes
Job creation happens in:
components/shopify/admin_server/admin_server/services/sync_service.py- Possibly
components/ecom_settings_exporter/
Need to audit:
- Where
job_idis generated (likely a Pydanticdefault_factory=lambda: str(uuid.uuid4())onSyncJob). - Where
s3_keyis constructed and whether it embedsjob_id(shorter job_id → naturally shorter s3_keys). - Any logs or external systems that pattern-match on UUID-shaped job_ids.
Migration
Old UUID-shaped jobs and new short-id jobs coexist — no migration needed for in-flight rows. Once long enough has passed (e.g., 30 days, matching the TTL), all rows will be short-id and dashboards/queries can drop UUID-specific assumptions if any.
Phase 4 — GSI projection migration
Target projections
GSI_JobLookup → ProjectionType.KEYS_ONLY
- Used only for "find pk/sk by job_id" lookups (
sync_job_repository.py:get_shop_job_by_id,job_management.py:_get_job_item_for_update). - Callers do a follow-up
get_item(pk, sk)on the base table. One extra read per lookup (~0.5 RCU), saves a full-item replicate on every write to that item.
GSI_JobsByStatus → ProjectionType.INCLUDE with explicit attribute list:
non_key_attributes = [
"job_id", "job_type", "is_internal", "platform",
"started_at", "completed_at", "error_message",
"ttl", "created_at", "retry_count",
"estimated_time_minutes", "initiator_user_id",
]
Deliberately excluded: counters (total_items, processed_items, failed_items, skipped_items, conflict_items), completed_chunks, retry_attribution, last_updated.
Why this works
DynamoDB only updates a GSI when a write touches an attribute projected into it. Per-chunk flushes modify only counters + completed_chunks + last_updated — none of which are projected — so they don't write to either GSI. Only status transitions update GSI_JobsByStatus (via status_created_at, which is a key attribute).
Listing UX trade-off
Confirmed offline: the job-list view does not need to show live progress counters. Detail view fetches counters from the base table when the user clicks a row. List view shows status, timestamps, and error_message — all projected.
Migration plan
GSI projection changes require dropping and recreating the GSI in DynamoDB. To avoid downtime:
- Add
GSI_JobLookup_v2(KEYS_ONLY) andGSI_JobsByStatus_v2(INCLUDE) alongside the existing GSIs. Both backfill automatically as DDB scans the base table. - Wait for backfill to complete. Verify item counts and a sample of expected attributes via console /
aws dynamodb describe-table. - In application code, switch all readers to query the
_v2indexes. Deploy and bake for one release. - Drop the original
GSI_JobLookupandGSI_JobsByStatus. - (Optional, cosmetic) Add
_v3indexes with the original names and switch back, then drop_v2. Skip unless naming bothers operators.
The CDK change for step 1 is non-destructive (add); step 4 is destructive (remove) but only after readers have migrated.
Estimated impact
Per 200-chunk job, comparing today vs. fully migrated state:
| Metric | Today | After |
|---|---|---|
| DDB writes | ~10 | ~5 |
| Average item size on write | grows to ~50 KB | ~1–2 KB constant |
| GSI replications per write | 2 (full item) | 0 for chunk flushes, 1 small for status transitions |
| Approximate WCU per job | ~300–500 | ~8–12 |
| S3 PUTs per job | 0 | ~5 (gzipped, < $0.0001) |
That's roughly a 30–50x WCU reduction. The S3 cost is negligible by comparison.
Risks and open questions
-
completed_chunksis still in DDB. Considered moving to S3 marker objects withIfNoneMatch: "*"for chunk-level idempotency. Decision: leave it in DDB — adds an S3 round-trip on the hot path per chunk for marginal savings once it's shrunk by short chunk_ids. -
skipped_summaryanderror_summary. Derived inSyncJobData.from_sync_jobfrom the detail dicts, so they follow whichever source the reader picked (inline or S3). No independent DDB attribute to preserve. -
Read latency on
GSI_JobLookupKEYS_ONLY. Every job lookup gains oneget_item. Should be sub-millisecond for small items and not noticeable on the hot path, but worth measuring on the indexer's GSI lookup loop in_get_job_item_for_updateafter step #6. -
DDB Streams.
infra/ecom/stacks/ecom_stack.py:250does not enable streams onEcomIndexerJobsTable(TableV2 default is off). Confirm no consumer depends on streams before assuming this stays disabled. -
SQS event source batch size. Worth checking the event source mapping config — larger batches amortize GSI lookups and other fixed costs across messages.
-
List-view detail summaries. After step #4, list-endpoint responses will show empty
errorSummary/skippedSummary/conflictSummaryfor new jobs unless the listing route also constructs aJobDetailsStoreand passes it throughfrom_sync_job. The current step #3 plan wires only the single-job route to limit per-request S3 traffic; if list summaries are needed live, the listing route would have to parallelise an S3 fetch per visible job — explicit trade-off to revisit if the UI regresses.
Rollout sequence
Carved so every step is independently deployable. Each PR's additive changes are safe to land first; the destructive / behavior-changing steps gate on the additive predecessors being in production.
| # | Branch / PR | Type | Depends on (deployed) |
|---|---|---|---|
| 1 | claude/ddb-infra | Additive infra: _v2 GSIs (CDK), ECOM_JOB_DETAILS_BUCKET env var, JobDetailsStore module skeleton, moto + test fixtures updated to mirror the v2 projection shape | — |
| 2 | claude/ddb-reader | Additive: optional details_store: JobDetailsStore | None parameter on SyncJobData.from_sync_job with S3 dual-read fallback. Default behaviour unchanged. | #1 |
| 3 | (follow-up) | Reader wiring: construct a JobDetailsStore in admin_server / admin_lambda single-job route handlers and pass it. Adds ECOM_JOB_DETAILS_BUCKET env var to those Lambdas' CDK config. | #2 |
| 4 | claude/ddb-logic | Behavior change: writer drops inline detail / platform_data DDB writes, single combined UpdateItem per flush, short chunk_ids, write-merging. Stacked on claude/ddb-reader so it can't merge before the reader fallback is in. | #3 (so new jobs aren't UX-regressed) |
| 5 | (follow-up) | Short job IDs (Phase 3) — 13-char base32 generator in upstream SyncJob. Independent of #4; can deploy any time. | — |
| 6 | (follow-up) | GSI reader switch — get_shop_job_by_id and _get_job_item_for_update start querying GSI_JobLookup_v2 (KEYS_ONLY) with a follow-up get_item on the base table; listing endpoints query GSI_JobsByStatus_v2. Requires v2 backfill to be complete in prod (operator check). | #1 backfill complete |
| 7 | (follow-up) | GSI v1 drop — CDK removes the original ALL-projection indexes. Destructive; only after readers are off them for a release cycle. | #6 |
The git stack (#1 → #2 → #4) enforces the deploy-order dependency that matters most: the writer-drop-inline change literally cannot merge before the reader fallback is in main. Steps #3, #5, #6, #7 are independent enough to ship as flat siblings without restructuring the stack.
Each step is independently reversible until the destructive v1 GSI drop in #7.
Production verification
A four-pronged plan: synthetic dry-run before deploy → metrics during deploy → spot-checks after deploy → rollback levers.
Pre-deploy
- GSI v2 backfill confirmation. After #1 deploys, before #4:
aws dynamodb describe-table --table-name {env}-EcomIndexerJobsTableand assert both_v2GSIs areACTIVEwithItemCountmatching the base table within tolerance. CDK doesn't tell you when backfill finishes; this is the operator check that gates #6 and (indirectly) #4 deployment safety. - Env-var smoke test.
aws lambda get-function-configurationon the indexer Lambda → verifyECOM_JOB_DETAILS_BUCKETis set. If empty,_get_job_details_store()returnsNoneand writes silently fall back to inline (i.e. no behavior change). Useful as a kill-switch (see rollback). - Bucket IAM smoke. Invoke the indexer once after deploy; check CloudWatch for any
AccessDeniedons3:PutObjectagainstjob-details/.
During deploy — metrics to watch
EcomIndexerJobsTableCloudWatch:ConsumedWriteCapacityUnitsdrops. 7-day average pre/post comparison. Expect ~30–50x reduction for large jobs, ~3–5x for small.SuccessfulRequestLatency.UpdateItemp50/p99 flat or better.- No spike in
UserErrors/ConditionalCheckFailedRequests.
- Bucket CloudWatch (or S3 Storage Lens):
- PutObject count with prefix
job-details/rises as expected (≈ 1 + ⌈chunks/50⌉ per job). - 4xx/5xx rate stays at baseline.
- PutObject count with prefix
- Lambda CloudWatch: error rate, duration, DLQ depth unchanged. Duration ticks up ~50–100ms per flush from the added S3 PUT.
- Custom log alarm: grep
Failed to write job-details S3 object— expect zero or very few.
Post-deploy functional checks
For a known production job that just completed under the new path:
aws dynamodb get-itemon its row — verify item is a few KB, contains counters +completed_chunks, and does not carryfailed_items_details/skipped_items_details/conflict_details/ the indexer'splatform_data.indexersubtree.aws s3 ls s3://{bucket}/job-details/{job_id}/— verify one or more*.json.gzflush files plus oneplatform-data.json.GET /shops/{shop}/sync-jobs/{job_id}— single-job view still surfacesfailedItemsDetails,errorSummary,platformData.indexer.*(populated from S3 via the reader fallback wiring from #3).GET /shops/{shop}/sync-jobs— counters and status still display correctly. List view may show empty details for new jobs if #3 only wired the single-job route; explicit trade-off in that PR.
Canary order
dev-* → staging → prod, with at least 24 h bake at each. Where cell-level prod rollout is supported, stage across cells.
Rollback levers
In ascending severity:
- Kill-switch (no redeploy). Set
ECOM_JOB_DETAILS_BUCKET=""on the indexer Lambda. Post-#4 this means new detail data is dropped, but counters and progress still work. Recover by re-setting the env var. - Revert #4 (writer-drop-inline). Inline DDB writes resume. Jobs that ran during the gap keep their S3 data; readers prefer inline so the S3 data simply isn't consulted. No data loss.
- Revert #3 (reader wiring). The reader fallback API still exists but no one calls it. Combined with #4 still deployed, this is the worst UX state (new-job detail drill-down empty); recoverable by re-applying.
- CDK changes (#1) are safe to leave deployed during any rollback. Nothing depends on the
_v2GSIs disappearing.
Future: cold-tier archival
The hot-path online layout (one flush file per per-chunk-batch, one platform-data.json per job) is right for the write path: simple, idempotent, partial-failure friendly, no cross-job coordination. It is not the right shape for long-term storage of completed jobs:
- Many tiny objects compress poorly across job boundaries.
- Per-object overhead dominates Glacier-tier storage costs.
- A 30-day-old, terminal job's flush files are read at most once during forensic queries — perfect for archival.
The proposal is a separate async consolidator (Lambda on EventBridge schedule, or Step Functions task), not an online bundling change to the writer:
- Find terminal jobs older than N days:
job_status IN (COMPLETED, FAILED, CONFLICT) AND completed_at < cutoff. Query viaGSI_JobsByStatus_v2. - For each shop's eligible jobs, bundle the per-flush files into one archive per
{shop_id}/{yyyy-mm}. Inside: sharedplatform-datasection, per-job JSONL records. - Write the archive to
s3://{bucket}/job-details-cold/{shop_id}/{yyyy-mm}/jobs.jsonl.gz. Verify, then delete the original per-flush objects. - S3 lifecycle policy transitions
job-details-cold/to Glacier Instant Retrieval ($0.004/GB-mo, ms-latency) or Glacier Flexible Retrieval ($0.0036/GB-mo, minutes-latency) depending on how often forensic lookups happen.
Reads stay simple:
- Recent jobs (< N days): same code path as today, hits
job-details/(S3 Standard). - Old jobs: the reader transparently falls back to the cold-tier archive — one GET, extract by
job_id. Slower but acceptable for forensic queries.
Why not bundle online (during write) instead:
- A single SQS batch can contain 1–10 distinct jobs in any combination; a single job spans multiple invocations over its lifetime. The natural archive unit (per-invocation) doesn't match a useful read unit (per-job), so reads degrade to "find every archive containing job X" — needs an index in DDB, which defeats the savings.
- Archives can't be "finalized" until the invocation knows it's done. Partial-progress visibility (consumers watching a long-running job) is useful and the current model gives it for free.
- Compression gain across jobs is real but small: ~20%, because the bulk of the payload is per-job doc-id lists that don't share content. The S3 PUT-count reduction is also negligible at this volume ($5/M PUTs).
Cold-tier consolidation is plausibly 5–6x cheaper than online bundling and keeps the hot path simple. Scope as a follow-up component once #1–#4 are stable.