DataFlow Operator Metrics
DataFlow Operator exports Prometheus metrics for monitoring the operator and data processing.
How metrics collection works
Prometheus scrapes only the operator Service (port 9090). The operator aggregates metrics from all processor pods.
On each /metrics request:
- The operator returns its own metrics (controller-runtime,
dataflow_status) - It discovers DataFlows and processor pods by labels
app=dataflow-processor,dataflow.dataflow.io/name - It fetches
http://<podIP>:9090/metricsfrom each processor pod - It keeps only
dataflow_*metrics (dropsgo_*,process_*) - It merges operator and processor metrics and returns the combined output
flowchart TB
subgraph PrometheusStack [Prometheus Stack]
Prometheus[Prometheus]
end
subgraph DataFlowOperator [DataFlow Operator]
Operator[Operator Pod]
OperatorMetrics[Operator Metrics]
end
subgraph ProcessorPods [Processor Pods]
Proc1[Processor Pod 1]
Proc2[Processor Pod 2]
end
Prometheus -->|"GET /metrics :9090"| Operator
Operator -->|"1. Serve operator metrics"| OperatorMetrics
Operator -->|"2. Scrape podIP:9090/metrics"| Proc1
Operator -->|"2. Scrape podIP:9090/metrics"| Proc2
Operator -->|"3. Merge dataflow_* only"| Prometheus
Available metrics
DataFlow manifest metrics
dataflow_messages_received_total— total number of messages received per manifest-
Labels:
namespace,name,source_type -
dataflow_messages_sent_total— total number of messages sent per manifest -
Labels:
namespace,name,sink_type,route -
dataflow_processing_duration_seconds— message processing time (histogram) -
Labels:
namespace,name -
dataflow_status— DataFlow manifest status (1 = Running, 0 = Stopped/Error) - Labels:
namespace,name,phase
Connector metrics
dataflow_connector_messages_read_total— number of messages read from source connector-
Labels:
namespace,name,connector_type,connector_name -
dataflow_connector_messages_written_total— number of messages written to sink connector -
Labels:
namespace,name,connector_type,connector_name,route -
dataflow_connector_errors_total— number of connector errors - Labels:
namespace,name,connector_type,connector_name,operation,error_type -
Possible
error_typevalues: see Error types -
dataflow_connector_connection_status— connector connection status (1 = connected, 0 = disconnected) - Labels:
namespace,name,connector_type,connector_name
Transformer metrics
dataflow_transformer_executions_total— number of transformer executions-
Labels:
namespace,name,transformer_type,transformer_index -
dataflow_transformer_errors_total— number of transformer errors - Labels:
namespace,name,transformer_type,transformer_index,error_type -
Possible
error_typevalues: see Error types -
dataflow_transformer_duration_seconds— transformer execution time (histogram) -
Labels:
namespace,name,transformer_type,transformer_index -
dataflow_transformer_messages_in_total— number of messages input to transformer -
Labels:
namespace,name,transformer_type,transformer_index -
dataflow_transformer_messages_out_total— number of messages output from transformer - Labels:
namespace,name,transformer_type,transformer_index
Task execution metrics
dataflow_task_stage_duration_seconds— individual task stage execution time (histogram)- Labels:
namespace,name,stage -
Stages:
read,transformation,write,sink_write,error_sink_write,transformer_0,transformer_1, etc. -
dataflow_task_message_size_bytes— message size at different processing stages (histogram) - Labels:
namespace,name,stage -
Stages:
input,output,transformer_0_input,transformer_0_output, etc. -
dataflow_task_stage_latency_seconds— latency between processing stages (histogram) -
Labels:
namespace,name,from_stage,to_stage -
dataflow_task_throughput_messages_per_second— current throughput (messages per second) -
Labels:
namespace,name -
dataflow_task_success_rate— task success rate (0.0 to 1.0) -
Labels:
namespace,name -
dataflow_task_end_to_end_latency_seconds— full message lifetime from receipt to delivery (histogram) -
Labels:
namespace,name -
dataflow_task_active_messages— number of messages currently being processed -
Labels:
namespace,name -
dataflow_task_queue_size— current message queue size - Labels:
namespace,name,queue_type -
Queue types:
routing,output,default, and router routes -
dataflow_task_queue_wait_time_seconds— time messages spend waiting in queue (histogram) -
Labels:
namespace,name,queue_type -
dataflow_task_operations_total— total operations by type - Labels:
namespace,name,operation,status - Operations:
transform,write,sink_write,error_sink_write -
Statuses:
success,error,cancelled -
dataflow_task_stage_errors_total— number of errors per stage - Labels:
namespace,name,stage,error_type - Possible
error_typevalues: see Error types
Labels
Metrics use namespace and name labels to bind to DataFlow, which is convenient for filtering and aggregation in Prometheus/Grafana. All metrics include at least these two labels; additional labels (source_type, sink_type, connector_type, stage, etc.) allow for more detailed queries.
Examples:
- Filter by specific DataFlow: dataflow_messages_received_total{namespace="default", name="my-dataflow"}
- Aggregate by namespace: sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)
Histograms
Histograms use exponential buckets suitable for latency and message sizes:
| Metric | Buckets | Range |
|---|---|---|
dataflow_processing_duration_seconds |
ExponentialBuckets(0.001, 2, 10) | 1 ms — ~1 s |
dataflow_transformer_duration_seconds |
ExponentialBuckets(0.0001, 2, 12) | 0.1 ms — ~400 ms |
dataflow_task_stage_duration_seconds |
ExponentialBuckets(0.0001, 2, 14) | 0.1 ms — ~1.6 s |
dataflow_task_message_size_bytes |
ExponentialBuckets(64, 2, 16) | 64 bytes — ~4 MB |
dataflow_task_stage_latency_seconds |
ExponentialBuckets(0.0001, 2, 12) | 0.1 ms — ~400 ms |
dataflow_task_end_to_end_latency_seconds |
ExponentialBuckets(0.001, 2, 12) | 1 ms — ~2 s |
dataflow_task_queue_wait_time_seconds |
ExponentialBuckets(0.0001, 2, 12) | 0.1 ms — ~400 ms |
- Latency (time in seconds): start from 0.1 ms or 1 ms, multiplier 2.
- Message sizes (bytes): start from 64 bytes, multiplier 2.
Monitoring setup
Prometheus ServiceMonitor
For automatic Prometheus discovery, create a ServiceMonitor:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: dataflow-operator
labels:
app: dataflow-operator
spec:
selector:
matchLabels:
app: dataflow-operator
endpoints:
- port: metrics
path: /metrics
interval: 30s
Or enable ServiceMonitor in the Helm chart:
serviceMonitor:
enabled: true
interval: 30s
scrapeTimeout: 10s
Grafana Dashboard
Import the dashboard from grafana-dashboard.json into Grafana to visualize metrics.
Via Helm chart (ConfigMap): Enable monitoring.dashboard.enabled when installing. The dashboard is bundled in the chart (dashboards/grafana-dashboard.json). Grafana sidecar (kube-prometheus-stack) picks up ConfigMaps with label grafana_dashboard: "1".
The dashboard includes: - Message received/sent rate charts - Connector and transformer error charts - Message processing time - Connector connection status - DataFlow manifest status - Transformer statistics - SLO panels aligned with PrometheusRule alerts (error rate, p95 processing, p99 E2E, queue, Kafka timeouts) - Pipeline health (source poll health, stage error rates, active messages) - Stages breakdown and operator reconcile metrics
Processor health probes (/livez)
Each processor pod exposes HTTP probes on port 9090:
| Path | Probe | Behavior |
|---|---|---|
/readyz |
Startup | Returns 200 when the processor has connected to source and started reading |
/livez |
Liveness | Returns 200 when ready and pipeline progress was recorded within PROCESSOR_PROGRESS_TIMEOUT_SECONDS |
/healthz |
— | Always 200 (process alive) |
Progress is updated after successful sink batch commits (all sink types) and periodically while a Kafka consumer group session is active (idle topics). If liveness fails with 503 and Events show stale: no pipeline progress, the pod was restarted because no progress was recorded within the timeout—common for low-traffic pipelines when the timeout is too short.
Configure via the operator Helm value processorProgressTimeoutSeconds (default 3600). Set to 0 to disable stale-progress checks on /livez (not recommended without other deadlock monitoring).
Sentry (Error Monitoring and Tracing)
DataFlow Operator supports Sentry for error monitoring and distributed tracing. When enabled, the operator and processor pods report errors and performance traces to Sentry.
Enable via Helm chart:
sentry:
enabled: true
dsn: "https://xxx@o0.ingest.sentry.io/123"
environment: production # or staging, development
tracesSampleRate: 0.1 # 0.0–1.0, fraction of traces to sample (default 10%)
debug: false # enable Sentry SDK debug output
release: "" # optional: git commit or version tag
Environment variables (passed to operator and processor pods when sentry.enabled and sentry.dsn are set):
| Variable | Description |
|---|---|
SENTRY_DSN |
Sentry project DSN (required) |
SENTRY_ENVIRONMENT |
Environment name (production, staging, etc.) |
SENTRY_TRACES_SAMPLE_RATE |
Sampling rate for traces (0.0–1.0) |
SENTRY_DEBUG |
Enable debug mode (true/false) |
SENTRY_RELEASE |
Release version for grouping |
The operator forwards SENTRY_* variables to processor pods, so they report to the same Sentry project. If SENTRY_DSN is empty, Sentry is not initialized.
Prometheus query examples
Messages per second per manifest
sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)
Transformer error rate
sum(rate(dataflow_transformer_errors_total[5m])) by (namespace, name, transformer_type)
/
sum(rate(dataflow_transformer_executions_total[5m])) by (namespace, name, transformer_type)
* 100
p95 message processing time
histogram_quantile(0.95, sum(rate(dataflow_processing_duration_seconds_bucket[5m])) by (namespace, name, le))
Active DataFlow manifests count
sum(dataflow_status) by (namespace, name)
Task throughput
dataflow_task_throughput_messages_per_second
Task success rate
dataflow_task_success_rate * 100
p95 read stage duration
histogram_quantile(0.95, sum(rate(dataflow_task_stage_duration_seconds_bucket{stage="read"}[5m])) by (namespace, name, le))
Average input message size
avg(dataflow_task_message_size_bytes{stage="input"}) by (namespace, name)
p99 end-to-end latency
histogram_quantile(0.99, sum(rate(dataflow_task_end_to_end_latency_seconds_bucket[5m])) by (namespace, name, le))
Active messages in processing
dataflow_task_active_messages
Message queue size
dataflow_task_queue_size
Average queue wait time
avg(rate(dataflow_task_queue_wait_time_seconds_sum[5m])) by (namespace, name, queue_type)
/
avg(rate(dataflow_task_queue_wait_time_seconds_count[5m])) by (namespace, name, queue_type)
Error rate by stage
sum(rate(dataflow_task_stage_errors_total[5m])) by (namespace, name, stage)
/
sum(rate(dataflow_task_operations_total[5m])) by (namespace, name)
* 100
Alerts
Ready-to-use manifests
A PrometheusRule manifest with pre-configured alerts is available in the repository:
Apply the manifest:
kubectl apply -f monitoring/alerts/prometheusrule.yaml
Via Helm chart: Enable monitoring.prometheusRule.enabled and set monitoring.prometheusRule.additionalLabels (e.g. release: kube-prometheus-stack) to match your Prometheus ruleSelector.
Requirements: Prometheus Operator (e.g. kube-prometheus-stack). The release: kube-prometheus-stack label must match your Prometheus instance's ruleSelector. Adjust the label if you use a different Prometheus deployment.
| Alert | Description |
|---|---|
| DataFlowInError | DataFlow is in Error state |
| DataFlowConnectorDisconnected | Connector is disconnected |
| DataFlowHighErrorRate | Error rate (connector + transformer) > 1% |
| DataFlowSlowProcessing | p95 message processing duration > 1s |
| DataFlowLowTaskSuccessRate | Task success rate < 95% |
| DataFlowHighQueueSize | Queue size > 1000 messages |
| DataFlowHighE2ELatency | p99 end-to-end latency > 5s |
Additional queries
Use these PromQL expressions for custom dashboards or ad-hoc alerts:
High error rate
(
sum(rate(dataflow_connector_errors_total[5m])) by (namespace, name)
+ sum(rate(dataflow_transformer_errors_total[5m])) by (namespace, name)
)
/
sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)
> 0.01
Disconnected connectors
dataflow_connector_connection_status == 0
DataFlow in Error or Stopped
dataflow_status{phase=~"Error|Stopped"} == 1