Data Pipeline
GreenKube’s data pipeline is the heart of the system — an asynchronous, multi-stage pipeline that collects raw metrics, transforms them into energy and carbon data, and stores combined results.
Pipeline Overview
Section titled “Pipeline Overview”Phase 1: Data Collection
Section titled “Phase 1: Data Collection”All collectors run concurrently via asyncio.gather for maximum throughput.
Prometheus Collector
Section titled “Prometheus Collector”The most complex collector — executes 8 concurrent PromQL queries:
| Query # | Metric | PromQL Pattern |
|---|---|---|
| 1 | CPU usage | rate(container_cpu_usage_seconds_total[5m]) |
| 2 | Memory usage | container_memory_working_set_bytes |
| 3 | Network Rx | rate(container_network_receive_bytes_total[5m]) |
| 4 | Network Tx | rate(container_network_transmit_bytes_total[5m]) |
| 5 | Disk Read | rate(container_fs_reads_bytes_total[5m]) |
| 6 | Disk Write | rate(container_fs_writes_bytes_total[5m]) |
| 7 | Restarts | kube_pod_container_status_restarts_total |
| 8 | Node Labels | kube_node_labels |
Auto-discovery: The collector probes multiple common Prometheus endpoints in the cluster before connecting.
Fallback queries: If container-level metrics are unavailable, the collector tries pod-level aggregations.
Node Collector
Section titled “Node Collector”Uses the kubernetes_asyncio library to query the Kubernetes API:
Node metadata:├── cpu_capacity_cores (cores)├── memory_capacity_bytes (bytes)├── instance_type (from labels)├── zone (availability zone)├── region (cloud region)├── architecture (amd64/arm64)├── node_pool (node pool/agent pool name)├── cloud_provider (from labels)└── embodied_emissions_kg (from Boavizta, if available)Pod Collector
Section titled “Pod Collector”Collects resource requests from pod specifications:
Per pod:├── cpu_request (millicores)├── memory_request (bytes)├── ephemeral_storage_request (bytes)├── owner_kind (Deployment, StatefulSet, etc.)└── owner_nameOpenCost Collector
Section titled “OpenCost Collector”Fetches cost allocation data via the OpenCost API:
Per pod:├── cpu_cost ($)├── ram_cost ($)└── total_cost ($)HPA Collector
Section titled “HPA Collector”Detects existing Horizontal Pod Autoscalers in the cluster. This data is used by the recommendation engine to avoid suggesting autoscaling for workloads that already have an HPA configured.
Per HPA:├── namespace├── target_kind (Deployment, StatefulSet, etc.)└── target_namePhase 2: Processing
Section titled “Phase 2: Processing”The DataProcessor orchestrates the transformation pipeline.
Step 1: Resource Aggregation
Section titled “Step 1: Resource Aggregation”Builds per-pod maps from raw collector outputs:
pod_cpu_usage_map → {pod_key: millicores}pod_memory_usage_map → {pod_key: bytes}pod_network_rx_map → {pod_key: bytes}pod_network_tx_map → {pod_key: bytes}pod_disk_read_map → {pod_key: bytes}pod_disk_write_map → {pod_key: bytes}pod_restart_map → {pod_key: count}pod_request_map → {pod_key: cpu_request}pod_mem_map → {pod_key: memory_request}pod_ephemeral_map → {pod_key: storage_request}Step 2: Energy Estimation
Section titled “Step 2: Energy Estimation”The BasicEstimator converts CPU metrics into energy (Joules):
For each node: 1. Look up instance power profile → (min_watts, max_watts) per vCPU
2. Calculate total node power: P_node = P_idle + (P_max - P_idle) × CPU_utilization Where: P_idle = min_watts × vCPUs P_max = max_watts × vCPUs
3. Distribute to pods proportionally: E_pod = (cpu_pod / cpu_total_node) × P_node × durationIf the instance type is unknown, GreenKube falls back to a configurable default profile.
Step 3: Zone Mapping
Section titled “Step 3: Zone Mapping”Maps cloud regions to Electricity Maps carbon zones:
AWS eu-west-1 → IE (Ireland)GCP europe-west1 → BE (Belgium)Azure westeurope → NL (Netherlands)OVH GRA → FR (France)Scaleway fr-par-1 → FR (France)Step 4: Carbon Intensity Prefetching
Section titled “Step 4: Carbon Intensity Prefetching”Pre-fetches intensity data for all (zone, timestamp) pairs to minimize API calls:
For each unique (zone, normalized_timestamp): 1. Check in-memory cache 2. Check database repository 3. Call Electricity Maps API 4. Fallback to per-zone default intensity (built-in table) 5. Fallback to global default intensity (500 gCO₂e/kWh) 6. Cache result for rest of runStep 5: Carbon Calculation
Section titled “Step 5: Carbon Calculation”The CarbonCalculator converts energy to emissions:
CO₂e (grams) = Energy (kWh) × Grid Intensity (gCO₂e/kWh) × PUE
Where: Energy (kWh) = Joules / 3,600,000 Grid Intensity = from Electricity Maps API → fallback: per-zone default (built-in table) → fallback: global default (500 gCO₂e/kWh) PUE = Power Usage Effectiveness (provider-specific)Step 6: CombinedMetric Assembly
Section titled “Step 6: CombinedMetric Assembly”Merges all data sources into a single CombinedMetric object per pod:
CombinedMetric:├── Identity: pod_name, namespace, timestamp├── Energy: joules, co2e_grams, grid_intensity, pue├── Cost: total_cost├── Resources: cpu, memory, network, disk, storage, restarts, gpu├── Metadata: node, instance_type, zone, owner└── Quality: is_estimated, estimation_reasons, embodied_co2ePhase 3: Storage
Section titled “Phase 3: Storage”Combined metrics are batch-inserted into the configured database:
PostgreSQL: COPY or INSERT ... ON CONFLICTSQLite: INSERT OR REPLACEElastic: Bulk index operationHistorical Analysis (Range Queries)
Section titled “Historical Analysis (Range Queries)”For reporting on past time periods, the pipeline uses a chunked processing approach:
run_range(start, end): 1. Load historical node snapshots └── Reconstruct node timeline
2. For each 1-day chunk: ├── Prometheus range queries (5 concurrent) ├── Parse time-series data ├── Energy estimation per timestamp ├── Carbon calculation with historical intensity └── Generate CombinedMetrics
3. Collect all chunks → Filter → ReturnWhy 1-day chunks? To prevent out-of-memory errors on large clusters. Each chunk processes data for a single day, with explicit memory cleanup (del statements) between chunks.
Performance Characteristics
Section titled “Performance Characteristics”| Operation | Typical Duration | Notes |
|---|---|---|
| Instant collection | 2-5 seconds | Parallel collection + processing |
| 1-day range report | 5-10 seconds | Single chunk processing |
| 7-day range report | 30-60 seconds | 7 sequential chunks |
| 30-day range report | 2-4 minutes | 30 sequential chunks |
Performance scales with cluster size and Prometheus response times.