Load agentic cached queries for an index
Use the following script to load agentic cached queries into DDB for an index. Note this script overwrites any existing rows, but will not delete any rows.
Filter partitioning
Cached queries support filter partitioning — the same query can have different cached results depending on a filter
(e.g., gender:(womens) vs gender:(mens)). Set FILTER in the configuration to upload queries for a specific
partition. Leave it as None to upload filter-agnostic queries that apply to all search requests.
A successful run will have an output similar to this:
Filter: gender:(womens)
Summary:
Total queries: 10030
Success: 10030
Failed: 0
Transform errs: 0
Time: 4.6m
Throughput: 36.6 queries/sec
Once the upload is done, verify by calling agentic search, setting the invalidateCache=true query parameter to bypass
cache.
# Batch Index cached queries using batch API
import json
import requests
import time
from typing import Dict, Any, List, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
# Configuration
API_BASE_URL = "https://yks9phn9k2.execute-api.us-east-1.amazonaws.com" # staging
# API_BASE_URL = "https://stv37h2wrc.execute-api.us-east-1.amazonaws.com" # prod
ACCOUNT_ID = "ACCOUNT ID"
INDEX_NAME = "INDEX NAME"
API_KEY = "Cloudflare API key"
INPUT_FILE = "FILE PATH"
BATCH_SIZE = 80 # queries per batch
MAX_WORKERS = 1 # parallel batch requests, a high concurrency could lead to cancelled transactions
# Filter partitioning — set to a Marqo DSL filter string to partition the cache,
# or None for filter-agnostic queries that apply to all search requests.
# Examples:
# FILTER = None # filter-agnostic (applies to all)
# FILTER = "gender:(womens)" # womens partition
# FILTER = "gender:(mens)" # mens partition
FILTER: Optional[str] = None
def transform_query_data(query: str, data: Dict[str, Any], filter: Optional[str] = None) -> Dict[str, Any]:
categories = []
for expansion, category, score in zip(
data["query_expansion"],
data["query_expansion_category"],
data["query_expansion_scores"]
):
categories.append({
"query": expansion,
"category": category,
"confidence": score
})
result = {
"query": query,
"summary": data["summaries"],
"categories": categories
}
if filter:
result["filter"] = filter
return result
def send_batch(batch: List[Dict[str, Any]], batch_num: int) -> Tuple[int, int, int, List[str]]:
"""Send a batch of queries. Returns (batch_num, success_count, fail_count, errors)."""
url = f"{API_BASE_URL}/api/v1/accounts/{ACCOUNT_ID}/indexes/{INDEX_NAME}/agentic-cached-queries/batch"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}
payload = {"queries": batch}
try:
response = requests.post(url, json=payload, headers=headers, timeout=120)
response.raise_for_status()
return (batch_num, len(batch), 0, [])
except requests.exceptions.HTTPError as e:
body = e.response.text if e.response is not None else ""
error = f"Batch {batch_num}: {e.response.status_code} - {body[:200]}"
return (batch_num, 0, len(batch), [error])
except Exception as e:
error = f"Batch {batch_num}: {str(e)[:100]}"
return (batch_num, 0, len(batch), [error])
def format_eta(seconds: float) -> str:
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
return f"{seconds/60:.1f}m"
else:
return f"{seconds/3600:.1f}h"
def main():
print(f"Loading queries from {INPUT_FILE}...")
if FILTER:
print(f"Filter: {FILTER}")
else:
print("Filter: None (filter-agnostic)")
with open(INPUT_FILE, 'r') as f:
queries_data = json.load(f)
total_queries = len(queries_data)
print(f"Found {total_queries} queries")
# Transform all queries
print("Transforming queries...")
transformed = []
transform_errors = []
for query, data in queries_data.items():
try:
transformed.append(transform_query_data(query, data, FILTER))
except Exception as e:
transform_errors.append(f"{query}: {str(e)[:50]}")
if transform_errors:
print(f"Warning: {len(transform_errors)} queries failed to transform")
for err in transform_errors[:5]:
print(f" - {err}")
# Create batches
batches = []
for i in range(0, len(transformed), BATCH_SIZE):
batches.append(transformed[i:i + BATCH_SIZE])
total_batches = len(batches)
print(f"Created {total_batches} batches of up to {BATCH_SIZE} queries each")
print(f"Starting upload with {MAX_WORKERS} parallel workers...\n")
success_count = 0
failed_count = 0
all_errors = []
completed_batches = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(send_batch, batch, i): i
for i, batch in enumerate(batches)
}
for future in as_completed(futures):
batch_num, successes, failures, errors = future.result()
success_count += successes
failed_count += failures
all_errors.extend(errors)
completed_batches += 1
# Progress update
elapsed = time.time() - start_time
throughput = completed_batches / elapsed if elapsed > 0 else 0
remaining = total_batches - completed_batches
eta_sec = remaining / throughput if throughput > 0 else 0
print(f"\rBatches: {completed_batches}/{total_batches} | "
f"Queries: {success_count + failed_count}/{len(transformed)} | "
f"ETA: {format_eta(eta_sec)} ", end="")
elapsed = time.time() - start_time
print("\n\n" + "="*70)
print("Summary:")
print(f" Filter: {FILTER or 'None (filter-agnostic)'}")
print(f" Total queries: {len(transformed)}")
print(f" Success: {success_count}")
print(f" Failed: {failed_count}")
print(f" Transform errs: {len(transform_errors)}")
print(f" Time: {format_eta(elapsed)}")
print(f" Throughput: {len(transformed) / elapsed:.1f} queries/sec")
print("="*70)
if all_errors:
print("\nErrors:")
for err in all_errors[:10]:
print(f" - {err}")
if len(all_errors) > 10:
print(f" ... and {len(all_errors) - 10} more")
if __name__ == "__main__":
main()