Polo Legacy — Collectors & Data Pipeline
Sync Architecture
The sync pipeline collects AWS resources from 16 accounts and stores them in DynamoDB.
EventBridge schedule
│
▼
lambda_sync(target_account_id, target_role_arn, dry_run)
│
├── Assume role into target AWS account
│
▼
SyncService.sync_all()
│
├── ThreadPoolExecutor: run all 15 SERVICE_TYPES in parallel
│ ├── AwsAccountService.map_all()
│ ├── OtherService.map_all()
│ ├── SavingsPlanService.map_all()
│ ├── UserService.map_all()
│ ├── AccountService.map_all()
│ ├── IndexService.map_all()
│ ├── BucketService.map_all()
│ ├── VpcService.map_all()
│ ├── SubnetService.map_all()
│ ├── NatGatewayService.map_all()
│ ├── LoadBalancerService.map_all()
│ ├── InstanceService.map_all()
│ ├── VolumeService.map_all()
│ ├── PublicIPv4Service.map_all()
│ └── NotebookService.map_all()
│
├── Build Everything object (all resources by type)
│
├── Enrich all (pricing, relationships, roles)
│
├── Post-enrich (reverse relationships)
│
├── Diff against existing DynamoDB data
│
└── Write changes to DynamoDB ResourceTable
Service Types (15)
Each service implements:
list_all()— boto3 API calls to fetch raw AWS resourcesmap()— converts AWS API response to Resource dataclassenrich()— adds pricing, relationships, role inferencepost_enrich()— adds derived/reverse-lookup fieldsmap_all()— orchestrates the above
| # | Service | Resource Type | AWS API |
|---|---|---|---|
| 1 | AwsAccountService | AWS (Account) | Organizations |
| 2 | OtherService | OT | Various |
| 3 | SavingsPlanService | SP | Savings Plans |
| 4 | UserService | US | Cognito |
| 5 | AccountService | AC | Internal API/DynamoDB |
| 6 | IndexService | IX | Internal API/DynamoDB |
| 7 | BucketService | BU | S3 |
| 8 | VpcService | V | EC2 (VPCs) |
| 9 | SubnetService | S | EC2 (Subnets) |
| 10 | NatGatewayService | NG | EC2 (NAT Gateways) |
| 11 | LoadBalancerService | L | ELB/ALB |
| 12 | InstanceService | I | EC2 (Instances) |
| 13 | VolumeService | VO | EC2 (EBS) |
| 14 | PublicIPv4Service | IP | EC2 (EIPs) |
| 15 | NotebookService | NB | SageMaker |
Local Sync (CLI via tasks.py)
For development, tasks.py provides a CLI-based sync using invoke:
inv sync # sync all resources from all profiles
inv sync -r instances # sync only instances
inv sync -p prod,staging # sync from specific AWS profiles
Uses multiprocessing.Pool() for parallelism across profiles.
After sync, combine() merges per-type JSON files + pricing into a single data.json.
Profile-to-Account Mapping
prod → 651774330118
staging → 468036072962
controller → 023568249301
core → 707042731317
oss → 424082663841
preprod → 339712831429
commercial → 780949682512
ml → 940994029740
marqtune → 975050354766
marqtune_prod → 905418443936
Enrichment Pipeline
Phase 1: Enrich (per resource)
- Pricing: Look up
INSTANCE_PRICES[subtype]for $/hr cost - Role inference: Derived from instance name, tags, cluster membership
- Relationship linking: Set
cluster_id,marqo_index,account_idfrom path/tags - Cloud version detection:
- Version 1: Legacy tagged instances with
system_account_id - Version 2: Current role-based architecture
- Null: Not part of cloud infrastructure
- Version 1: Legacy tagged instances with
Phase 2: Post-enrich (cross-resource)
- sum_cost: Roll up descendant costs to parents
- Reverse lookups: Build
volumes_by_instance,instances_by_cluster, etc. - Audience/team propagation: Inherit from parent account/cluster
Report Generation Pipeline
EventBridge schedule
│
▼
lambda_report
│
├── Load all resources via DynamoDataService.get_everything()
│
├── ThreadPoolExecutor: run all reporters in parallel
│ ├── MultiCostReport — cost by account × usage_type × period
│ ├── Budget — cost vs target tracking
│ ├── PruneClusters — empty cluster candidates
│ ├── PruneVolumes — unattached volume candidates
│ ├── PruneIPs — unused IP candidates
│ ├── PruneNatGateways — unused NAT candidates
│ └── StopNotebooks — idle notebook candidates
│
└── Write reports to DynamoDB ReportTable (PK: REPORT#{ClassName})
Cloud Cop Pipeline
EventBridge schedule
│
▼
lambda_cop
│
├── Load resources
│
├── Detect issues:
│ ├── find_persistent_dev_indexes() — high-cost dev indexes
│ ├── find_no_role_instances() — EC2 without IAM roles
│ └── find_orphaned_index_instances() — instances without parent index
│
├── Build Slack blocks via cop_sections()
│
└── POST to SLACK_WEBHOOK_URL
Data Pruning Pipeline
EventBridge schedule
│
▼
lambda_prune
│
├── Scan ResourceTable for deleted_at < (now - 1 day)
│
├── Batch delete matching items
│
└── Publish CloudWatch metric (count of deleted items)
Data Service Abstraction
Both local dev and production use the same DataService interface:
class DataService(ABC):
write_sync(data) # persist resources
get(type, path) -> Resource # single resource
get_everything() -> Everything # all resources
list_all(fields) -> List[Resource] # all, optional projection
list_by_type(type) -> List[Resource] # by type code
list_by_type_and_aws_id(type, aws_id) # filtered
list_by_column(column, value) # custom filter
list_tree(root_path) -> List[Resource] # hierarchical subtree
| Implementation | Backing Store | Used When |
|---|---|---|
DynamoDataService | DynamoDB tables | Production Lambda |
LocalDataService | local_data.json file | USE_LOCAL_DATA=true |