Skip to content
GreenKube

Data Pipeline

GreenKube’s data pipeline is the heart of the system — an asynchronous, multi-stage pipeline that collects raw metrics, transforms them into energy and carbon data, and stores combined results.

1 Collection Parallel · async
🔥 Prometheus 8 concurrent PromQL queries
☸️ Kubernetes API Nodes, Pods, HPAs
💲 OpenCost Cost allocation
Electricity Maps gCO₂/kWh per zone
2 Processing Sequential
📋 Aggregate Build per-pod resource maps
🔋 Estimate Energy CPU × TDP → Joules
🗺️ Map Zones Region → Carbon zone
📡 Fetch Intensity Cache + API + fallback
🌿 Calculate CO₂ kWh × gCO₂/kWh × PUE
📦 Assemble CombinedMetric per pod
3 Output Async batch
🗄️ Database Batch INSERT / COPY
🚀 REST API Real-time endpoints
🧠 Recommender Optimization analysis

All collectors run concurrently via asyncio.gather for maximum throughput.

The most complex collector — executes 8 concurrent PromQL queries:

Query #MetricPromQL Pattern
1CPU usagerate(container_cpu_usage_seconds_total[5m])
2Memory usagecontainer_memory_working_set_bytes
3Network Rxrate(container_network_receive_bytes_total[5m])
4Network Txrate(container_network_transmit_bytes_total[5m])
5Disk Readrate(container_fs_reads_bytes_total[5m])
6Disk Writerate(container_fs_writes_bytes_total[5m])
7Restartskube_pod_container_status_restarts_total
8Node Labelskube_node_labels

Auto-discovery: The collector probes multiple common Prometheus endpoints in the cluster before connecting.

Fallback queries: If container-level metrics are unavailable, the collector tries pod-level aggregations.

Uses the kubernetes_asyncio library to query the Kubernetes API:

Node metadata:
├── cpu_capacity_cores (cores)
├── memory_capacity_bytes (bytes)
├── instance_type (from labels)
├── zone (availability zone)
├── region (cloud region)
├── architecture (amd64/arm64)
├── node_pool (node pool/agent pool name)
├── cloud_provider (from labels)
└── embodied_emissions_kg (from Boavizta, if available)

Collects resource requests from pod specifications:

Per pod:
├── cpu_request (millicores)
├── memory_request (bytes)
├── ephemeral_storage_request (bytes)
├── owner_kind (Deployment, StatefulSet, etc.)
└── owner_name

Fetches cost allocation data via the OpenCost API:

Per pod:
├── cpu_cost ($)
├── ram_cost ($)
└── total_cost ($)

Detects existing Horizontal Pod Autoscalers in the cluster. This data is used by the recommendation engine to avoid suggesting autoscaling for workloads that already have an HPA configured.

Per HPA:
├── namespace
├── target_kind (Deployment, StatefulSet, etc.)
└── target_name

The DataProcessor orchestrates the transformation pipeline.

Builds per-pod maps from raw collector outputs:

pod_cpu_usage_map → {pod_key: millicores}
pod_memory_usage_map → {pod_key: bytes}
pod_network_rx_map → {pod_key: bytes}
pod_network_tx_map → {pod_key: bytes}
pod_disk_read_map → {pod_key: bytes}
pod_disk_write_map → {pod_key: bytes}
pod_restart_map → {pod_key: count}
pod_request_map → {pod_key: cpu_request}
pod_mem_map → {pod_key: memory_request}
pod_ephemeral_map → {pod_key: storage_request}

The BasicEstimator converts CPU metrics into energy (Joules):

For each node:
1. Look up instance power profile
→ (min_watts, max_watts) per vCPU
2. Calculate total node power:
P_node = P_idle + (P_max - P_idle) × CPU_utilization
Where:
P_idle = min_watts × vCPUs
P_max = max_watts × vCPUs
3. Distribute to pods proportionally:
E_pod = (cpu_pod / cpu_total_node) × P_node × duration

If the instance type is unknown, GreenKube falls back to a configurable default profile.

Maps cloud regions to Electricity Maps carbon zones:

AWS eu-west-1 → IE (Ireland)
GCP europe-west1 → BE (Belgium)
Azure westeurope → NL (Netherlands)
OVH GRA → FR (France)
Scaleway fr-par-1 → FR (France)

Pre-fetches intensity data for all (zone, timestamp) pairs to minimize API calls:

For each unique (zone, normalized_timestamp):
1. Check in-memory cache
2. Check database repository
3. Call Electricity Maps API
4. Fallback to per-zone default intensity (built-in table)
5. Fallback to global default intensity (500 gCO₂e/kWh)
6. Cache result for rest of run

The CarbonCalculator converts energy to emissions:

CO₂e (grams) = Energy (kWh) × Grid Intensity (gCO₂e/kWh) × PUE
Where:
Energy (kWh) = Joules / 3,600,000
Grid Intensity = from Electricity Maps API
→ fallback: per-zone default (built-in table)
→ fallback: global default (500 gCO₂e/kWh)
PUE = Power Usage Effectiveness (provider-specific)

Merges all data sources into a single CombinedMetric object per pod:

CombinedMetric:
├── Identity: pod_name, namespace, timestamp
├── Energy: joules, co2e_grams, grid_intensity, pue
├── Cost: total_cost
├── Resources: cpu, memory, network, disk, storage, restarts, gpu
├── Metadata: node, instance_type, zone, owner
└── Quality: is_estimated, estimation_reasons, embodied_co2e

Combined metrics are batch-inserted into the configured database:

PostgreSQL: COPY or INSERT ... ON CONFLICT
SQLite: INSERT OR REPLACE
Elastic: Bulk index operation

For reporting on past time periods, the pipeline uses a chunked processing approach:

run_range(start, end):
1. Load historical node snapshots
└── Reconstruct node timeline
2. For each 1-day chunk:
├── Prometheus range queries (5 concurrent)
├── Parse time-series data
├── Energy estimation per timestamp
├── Carbon calculation with historical intensity
└── Generate CombinedMetrics
3. Collect all chunks → Filter → Return

Why 1-day chunks? To prevent out-of-memory errors on large clusters. Each chunk processes data for a single day, with explicit memory cleanup (del statements) between chunks.

OperationTypical DurationNotes
Instant collection2-5 secondsParallel collection + processing
1-day range report5-10 secondsSingle chunk processing
7-day range report30-60 seconds7 sequential chunks
30-day range report2-4 minutes30 sequential chunks

Performance scales with cluster size and Prometheus response times.