Skip to main content

Collection Architecture

Overview

Data enters Polo through collectors — independent Lambda functions, each responsible for one data source. Collectors normalise raw data into ResourceEvent rows and insert directly into ClickHouse via the HTTP interface (clickhouse-connect).

Collector Inventory

Implemented

AWS — Config (every 15 min)

CollectorSource APIResource types
config_ec2ec2.describe_instancesec2:instance
config_ebsec2.describe_volumesebs:volume
config_networkec2.describe_nat_gateways, describe_addresses, elbv2.describe_load_balancers, ec2.describe_network_interfacesvpc:nat_gateway, ec2:eip, elbv2:load_balancer, ec2:eni

AWS — Cost (daily)

CollectorSourceNotes
cost_explorerCost Explorer APIDaily cost data ($0.01/query).
savings_plansSavings Plans APICommitment, utilisation, coverage. Emits daily commitment values for net-cost computation.

AWS — Metrics (every 5 min)

CollectorSource APIResource types
metricsCloudWatch GetMetricDataec2:instance

AWS — Real-time / Event-driven

CollectorSourceTrigger
lifecycleCloudTrail via EventBridgeReal-time. RunInstances, TerminateInstances, CreateVolume, etc.
tagsResource Groups Tagging APIEvery 15 min.

AWS — Derived

CollectorSourceScheduleOutput
relationshipsComposite (EC2, ELB, VPC APIs)Every 15 minresource_relationships
hierarchy_builderresource_snapshots + resource_relationships + hierarchy_nodesEvery 15 minresource_ancestry closure table
snapshot_builderresource_events (config events)Every 15 minresource_snapshots
data_qualityresource_snapshots + resource_ancestry + resource_eventsDaily 07:00 UTCdata_quality_daily

Planned — Not Yet Implemented

AWS — Config

CollectorSource APIResource types
config_sagemakersagemaker.list/describe_notebook_instancessagemaker:notebook
ipv4ec2.describe_addresses + IPAMec2:ipv4

AWS — Cost

CollectorSourceNotes
cost_curCUR v2 Parquet files in S3Planned primary cost source. Hourly line items. See CUR setup.

AWS — Real-time

CollectorSourceTrigger
flow_logsVPC Flow Logs (S3)Continuous (S3 trigger). Network traffic per ENI.

AWS — Derived

CollectorSourceOutput
suggestion_engineresource_snapshots + metricsaction_suggestions
savings_calculatoraction_log + resource_eventsUpdates total_savings_usd

Non-AWS Platforms (daily)

CollectorSourceAuth
cost_githubGitHub Billing API + Workflow Runs APIPAT with admin:org
cost_cloudflareCloudflare Analytics + Billing APIAPI token with Billing:Read
cost_datadogDatadog Usage APIAPI key + App key

Governance (hourly/daily)

CollectorScheduleOutput
rule_evaluatorHourly (budget/ratio), every 15 min (existence/config/lifecycle)rule_violations
anomaly_detectorDailyanomalies
weekly_digestWeeklySlack notification
account_discoveryDailyaws_accounts
daily_snapshotNightlyresource_snapshots_daily
forecasterDailycost_forecasts
cost_allocatorDailyallocated_costs
pricingWeeklyaws_pricing

Normalisation Layer

Every collector calls the normalisation layer (components/collectors/common/), which:

  1. Parses ARNs (arn.py) — extracts service, region, resource type, resource ID
  2. Resolves tags (tag_resolver.py) — maps AWS tags to marqo_* fields via priority cascade
  3. Stamps hierarchy metadata (hierarchy_resolver.py) — looks up account_role from the hierarchy dictionary, walks physical parent chains for tag inheritance
  4. Produces ResourceEvent (models.py) — the Pydantic model that maps 1:1 to resource_events columns
class ResourceEvent(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
event_time: datetime
collector: str
aws_account_id: str
aws_region: str
resource_arn: str
resource_type: str
resource_id: str
resource_name: str = ''
event_type: str # 'lifecycle', 'cost', 'metric', 'config', 'tag_change'
event_action: str
actor_arn: str = ''
actor_name: str = ''
marqo_customer: str = ''
marqo_cluster: str = ''
marqo_index: str = ''
marqo_env: str = ''
marqo_purpose: str = ''
account_role: str = ''
value: float = 0.0
unit: str = ''
properties: dict[str, str] = Field(default_factory=dict)

Ingestion Pipeline

Collectors → ClickHouse (direct HTTP insert)
  • ClickHouse client (components/collectors/common/clickhouse_client.py) uses clickhouse-connect with batching (DEFAULT_BATCH_SIZE = 1000), retry (3 attempts), and exponential backoff.
  • Collectors call insert_events(events) for direct batch inserts or buffer_event(event) which auto-flushes at batch size.

Planned: SQS buffer

At scale (10-20+ accounts), the ingestion pipeline will add an SQS buffer between collectors and ClickHouse for backpressure, retry, and dead-letter handling. This is not yet implemented.

Cross-Account Execution

Status: PLANNED — Currently collectors work with a single configured AWS account. The multi-account pattern described below will be implemented alongside the aws_accounts table and account_discovery collector.

Polo will monitor 10-20+ AWS accounts. Every collector will iterate over all active accounts:

def collect_across_accounts(collector_fn, ch_client):
accounts = ch_client.query("""
SELECT account_id, polo_read_role FROM polo.aws_accounts FINAL
WHERE polo_status = 'active' AND polo_read_role != ''
""")
for account_id, role_arn in accounts.rows:
try:
session = assume_role(role_arn)
events = collector_fn(session=session, account_id=account_id)
ch_client.insert_events(events)
update_last_scan(ch_client, account_id, success=True)
except Exception as e:
update_last_scan(ch_client, account_id, success=False, error=str(e))

See multi-account.md for IAM role details and provisioning.

Scheduling & Dependencies

Status: PLANNED — The EventBridge schedule below is the target configuration. Currently collectors are run manually or via the polo CLI.

Collectors will run on EventBridge schedules. Some have soft dependencies (hierarchy_builder should run after config collectors). Managed by staggering cron expressions:

MinuteCollectors
:00config_ec2, config_ebs, config_network, tags
:05snapshot_builder, relationships
:10hierarchy_builder

Daily jobs (cost_explorer, and eventually CUR, pricing, forecaster, etc.) run at fixed times.

Non-AWS Resource IDs

Status: PLANNED — For when non-AWS platform collectors are implemented.

For platforms without ARNs, use a consistent prefix convention:

PlatformFormatExample
GitHubgithub:{org}/{type}/{name}github:marqo-ai/repo/marqo
Cloudflarecloudflare:{product}/{id}cloudflare:worker/polo-api
Datadogdatadog:{product}datadog:infrastructure