Overview
Data enters Polo through collectors — independent Lambda functions, each responsible for one data source. Collectors normalise raw data into ResourceEvent rows and insert directly into ClickHouse via the HTTP interface (clickhouse-connect).
Collector Inventory
Implemented
AWS — Config (every 15 min)
| Collector | Source API | Resource types |
|---|
config_ec2 | ec2.describe_instances | ec2:instance |
config_ebs | ec2.describe_volumes | ebs:volume |
config_network | ec2.describe_nat_gateways, describe_addresses, elbv2.describe_load_balancers, ec2.describe_network_interfaces | vpc:nat_gateway, ec2:eip, elbv2:load_balancer, ec2:eni |
AWS — Cost (daily)
| Collector | Source | Notes |
|---|
cost_explorer | Cost Explorer API | Daily cost data ($0.01/query). |
savings_plans | Savings Plans API | Commitment, utilisation, coverage. Emits daily commitment values for net-cost computation. |
AWS — Metrics (every 5 min)
| Collector | Source API | Resource types |
|---|
metrics | CloudWatch GetMetricData | ec2:instance |
AWS — Real-time / Event-driven
| Collector | Source | Trigger |
|---|
lifecycle | CloudTrail via EventBridge | Real-time. RunInstances, TerminateInstances, CreateVolume, etc. |
tags | Resource Groups Tagging API | Every 15 min. |
AWS — Derived
| Collector | Source | Schedule | Output |
|---|
relationships | Composite (EC2, ELB, VPC APIs) | Every 15 min | resource_relationships |
hierarchy_builder | resource_snapshots + resource_relationships + hierarchy_nodes | Every 15 min | resource_ancestry closure table |
snapshot_builder | resource_events (config events) | Every 15 min | resource_snapshots |
data_quality | resource_snapshots + resource_ancestry + resource_events | Daily 07:00 UTC | data_quality_daily |
Planned — Not Yet Implemented
AWS — Config
| Collector | Source API | Resource types |
|---|
config_sagemaker | sagemaker.list/describe_notebook_instances | sagemaker:notebook |
ipv4 | ec2.describe_addresses + IPAM | ec2:ipv4 |
AWS — Cost
| Collector | Source | Notes |
|---|
cost_cur | CUR v2 Parquet files in S3 | Planned primary cost source. Hourly line items. See CUR setup. |
AWS — Real-time
| Collector | Source | Trigger |
|---|
flow_logs | VPC Flow Logs (S3) | Continuous (S3 trigger). Network traffic per ENI. |
AWS — Derived
| Collector | Source | Output |
|---|
suggestion_engine | resource_snapshots + metrics | action_suggestions |
savings_calculator | action_log + resource_events | Updates total_savings_usd |
| Collector | Source | Auth |
|---|
cost_github | GitHub Billing API + Workflow Runs API | PAT with admin:org |
cost_cloudflare | Cloudflare Analytics + Billing API | API token with Billing:Read |
cost_datadog | Datadog Usage API | API key + App key |
Governance (hourly/daily)
| Collector | Schedule | Output |
|---|
rule_evaluator | Hourly (budget/ratio), every 15 min (existence/config/lifecycle) | rule_violations |
anomaly_detector | Daily | anomalies |
weekly_digest | Weekly | Slack notification |
account_discovery | Daily | aws_accounts |
daily_snapshot | Nightly | resource_snapshots_daily |
forecaster | Daily | cost_forecasts |
cost_allocator | Daily | allocated_costs |
pricing | Weekly | aws_pricing |
Normalisation Layer
Every collector calls the normalisation layer (components/collectors/common/), which:
- Parses ARNs (
arn.py) — extracts service, region, resource type, resource ID
- Resolves tags (
tag_resolver.py) — maps AWS tags to marqo_* fields via priority cascade
- Stamps hierarchy metadata (
hierarchy_resolver.py) — looks up account_role from the hierarchy dictionary, walks physical parent chains for tag inheritance
- Produces
ResourceEvent (models.py) — the Pydantic model that maps 1:1 to resource_events columns
class ResourceEvent(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
event_time: datetime
collector: str
aws_account_id: str
aws_region: str
resource_arn: str
resource_type: str
resource_id: str
resource_name: str = ''
event_type: str
event_action: str
actor_arn: str = ''
actor_name: str = ''
marqo_customer: str = ''
marqo_cluster: str = ''
marqo_index: str = ''
marqo_env: str = ''
marqo_purpose: str = ''
account_role: str = ''
value: float = 0.0
unit: str = ''
properties: dict[str, str] = Field(default_factory=dict)
Ingestion Pipeline
Collectors → ClickHouse (direct HTTP insert)
- ClickHouse client (
components/collectors/common/clickhouse_client.py) uses clickhouse-connect with batching (DEFAULT_BATCH_SIZE = 1000), retry (3 attempts), and exponential backoff.
- Collectors call
insert_events(events) for direct batch inserts or buffer_event(event) which auto-flushes at batch size.
Planned: SQS buffer
At scale (10-20+ accounts), the ingestion pipeline will add an SQS buffer between collectors and ClickHouse for backpressure, retry, and dead-letter handling. This is not yet implemented.
Cross-Account Execution
Status: PLANNED — Currently collectors work with a single configured AWS account. The multi-account pattern described below will be implemented alongside the aws_accounts table and account_discovery collector.
Polo will monitor 10-20+ AWS accounts. Every collector will iterate over all active accounts:
def collect_across_accounts(collector_fn, ch_client):
accounts = ch_client.query("""
SELECT account_id, polo_read_role FROM polo.aws_accounts FINAL
WHERE polo_status = 'active' AND polo_read_role != ''
""")
for account_id, role_arn in accounts.rows:
try:
session = assume_role(role_arn)
events = collector_fn(session=session, account_id=account_id)
ch_client.insert_events(events)
update_last_scan(ch_client, account_id, success=True)
except Exception as e:
update_last_scan(ch_client, account_id, success=False, error=str(e))
See multi-account.md for IAM role details and provisioning.
Scheduling & Dependencies
Status: PLANNED — The EventBridge schedule below is the target configuration. Currently collectors are run manually or via the polo CLI.
Collectors will run on EventBridge schedules. Some have soft dependencies (hierarchy_builder should run after config collectors). Managed by staggering cron expressions:
| Minute | Collectors |
|---|
| :00 | config_ec2, config_ebs, config_network, tags |
| :05 | snapshot_builder, relationships |
| :10 | hierarchy_builder |
Daily jobs (cost_explorer, and eventually CUR, pricing, forecaster, etc.) run at fixed times.
Non-AWS Resource IDs
Status: PLANNED — For when non-AWS platform collectors are implemented.
For platforms without ARNs, use a consistent prefix convention:
| Platform | Format | Example |
|---|
| GitHub | github:{org}/{type}/{name} | github:marqo-ai/repo/marqo |
| Cloudflare | cloudflare:{product}/{id} | cloudflare:worker/polo-api |
| Datadog | datadog:{product} | datadog:infrastructure |