Skip to main content

Polo Legacy — Collectors & Data Pipeline

Sync Architecture

The sync pipeline collects AWS resources from 16 accounts and stores them in DynamoDB.

EventBridge schedule


lambda_sync(target_account_id, target_role_arn, dry_run)

├── Assume role into target AWS account


SyncService.sync_all()

├── ThreadPoolExecutor: run all 15 SERVICE_TYPES in parallel
│ ├── AwsAccountService.map_all()
│ ├── OtherService.map_all()
│ ├── SavingsPlanService.map_all()
│ ├── UserService.map_all()
│ ├── AccountService.map_all()
│ ├── IndexService.map_all()
│ ├── BucketService.map_all()
│ ├── VpcService.map_all()
│ ├── SubnetService.map_all()
│ ├── NatGatewayService.map_all()
│ ├── LoadBalancerService.map_all()
│ ├── InstanceService.map_all()
│ ├── VolumeService.map_all()
│ ├── PublicIPv4Service.map_all()
│ └── NotebookService.map_all()

├── Build Everything object (all resources by type)

├── Enrich all (pricing, relationships, roles)

├── Post-enrich (reverse relationships)

├── Diff against existing DynamoDB data

└── Write changes to DynamoDB ResourceTable

Service Types (15)

Each service implements:

  • list_all() — boto3 API calls to fetch raw AWS resources
  • map() — converts AWS API response to Resource dataclass
  • enrich() — adds pricing, relationships, role inference
  • post_enrich() — adds derived/reverse-lookup fields
  • map_all() — orchestrates the above
#ServiceResource TypeAWS API
1AwsAccountServiceAWS (Account)Organizations
2OtherServiceOTVarious
3SavingsPlanServiceSPSavings Plans
4UserServiceUSCognito
5AccountServiceACInternal API/DynamoDB
6IndexServiceIXInternal API/DynamoDB
7BucketServiceBUS3
8VpcServiceVEC2 (VPCs)
9SubnetServiceSEC2 (Subnets)
10NatGatewayServiceNGEC2 (NAT Gateways)
11LoadBalancerServiceLELB/ALB
12InstanceServiceIEC2 (Instances)
13VolumeServiceVOEC2 (EBS)
14PublicIPv4ServiceIPEC2 (EIPs)
15NotebookServiceNBSageMaker

Local Sync (CLI via tasks.py)

For development, tasks.py provides a CLI-based sync using invoke:

inv sync # sync all resources from all profiles
inv sync -r instances # sync only instances
inv sync -p prod,staging # sync from specific AWS profiles

Uses multiprocessing.Pool() for parallelism across profiles. After sync, combine() merges per-type JSON files + pricing into a single data.json.

Profile-to-Account Mapping

prod → 651774330118
staging → 468036072962
controller → 023568249301
core → 707042731317
oss → 424082663841
preprod → 339712831429
commercial → 780949682512
ml → 940994029740
marqtune → 975050354766
marqtune_prod → 905418443936

Enrichment Pipeline

Phase 1: Enrich (per resource)

  • Pricing: Look up INSTANCE_PRICES[subtype] for $/hr cost
  • Role inference: Derived from instance name, tags, cluster membership
  • Relationship linking: Set cluster_id, marqo_index, account_id from path/tags
  • Cloud version detection:
    • Version 1: Legacy tagged instances with system_account_id
    • Version 2: Current role-based architecture
    • Null: Not part of cloud infrastructure

Phase 2: Post-enrich (cross-resource)

  • sum_cost: Roll up descendant costs to parents
  • Reverse lookups: Build volumes_by_instance, instances_by_cluster, etc.
  • Audience/team propagation: Inherit from parent account/cluster

Report Generation Pipeline

EventBridge schedule


lambda_report

├── Load all resources via DynamoDataService.get_everything()

├── ThreadPoolExecutor: run all reporters in parallel
│ ├── MultiCostReport — cost by account × usage_type × period
│ ├── Budget — cost vs target tracking
│ ├── PruneClusters — empty cluster candidates
│ ├── PruneVolumes — unattached volume candidates
│ ├── PruneIPs — unused IP candidates
│ ├── PruneNatGateways — unused NAT candidates
│ └── StopNotebooks — idle notebook candidates

└── Write reports to DynamoDB ReportTable (PK: REPORT#{ClassName})

Cloud Cop Pipeline

EventBridge schedule


lambda_cop

├── Load resources

├── Detect issues:
│ ├── find_persistent_dev_indexes() — high-cost dev indexes
│ ├── find_no_role_instances() — EC2 without IAM roles
│ └── find_orphaned_index_instances() — instances without parent index

├── Build Slack blocks via cop_sections()

└── POST to SLACK_WEBHOOK_URL

Data Pruning Pipeline

EventBridge schedule


lambda_prune

├── Scan ResourceTable for deleted_at < (now - 1 day)

├── Batch delete matching items

└── Publish CloudWatch metric (count of deleted items)

Data Service Abstraction

Both local dev and production use the same DataService interface:

class DataService(ABC):
write_sync(data) # persist resources
get(type, path) -> Resource # single resource
get_everything() -> Everything # all resources
list_all(fields) -> List[Resource] # all, optional projection
list_by_type(type) -> List[Resource] # by type code
list_by_type_and_aws_id(type, aws_id) # filtered
list_by_column(column, value) # custom filter
list_tree(root_path) -> List[Resource] # hierarchical subtree
ImplementationBacking StoreUsed When
DynamoDataServiceDynamoDB tablesProduction Lambda
LocalDataServicelocal_data.json fileUSE_LOCAL_DATA=true