Skip to content

Метрики DataFlow Operator

DataFlow Operator экспортирует метрики Prometheus для мониторинга работы оператора и обработки данных.

Как работает сбор метрик

Prometheus scrape только Service оператора (порт 9090). Оператор агрегирует метрики всех processor pods.

При каждом запросе /metrics:

  1. Оператор отдаёт свои метрики (controller-runtime, dataflow_status)
  2. Находит DataFlow и processor pods по labels app=dataflow-processor, dataflow.dataflow.io/name
  3. Запрашивает http://<podIP>:9090/metrics у каждого processor pod
  4. Оставляет только метрики dataflow_* (исключает go_*, process_*)
  5. Объединяет метрики оператора и processor pods и возвращает результат
flowchart TB
    subgraph PrometheusStack [Prometheus Stack]
        Prometheus[Prometheus]
    end

    subgraph DataFlowOperator [DataFlow Operator]
        Operator[Operator Pod]
        OperatorMetrics[Operator Metrics]
    end

    subgraph ProcessorPods [Processor Pods]
        Proc1[Processor Pod 1]
        Proc2[Processor Pod 2]
    end

    Prometheus -->|"GET /metrics :9090"| Operator
    Operator -->|"1. Serve operator metrics"| OperatorMetrics
    Operator -->|"2. Scrape podIP:9090/metrics"| Proc1
    Operator -->|"2. Scrape podIP:9090/metrics"| Proc2
    Operator -->|"3. Merge dataflow_* only"| Prometheus

Доступные метрики

Метрики DataFlow манифестов

  • dataflow_messages_received_total - общее количество полученных сообщений по манифесту
  • Метки: namespace, name, source_type

  • dataflow_messages_sent_total - общее количество отправленных сообщений по манифесту

  • Метки: namespace, name, sink_type, route

  • dataflow_processing_duration_seconds - время обработки сообщений (гистограмма)

  • Метки: namespace, name

  • dataflow_status - статус DataFlow манифеста (1 = Running, 0 = Stopped/Error)

  • Метки: namespace, name, phase

Метрики коннекторов

  • dataflow_connector_messages_read_total - количество прочитанных сообщений из source коннектора
  • Метки: namespace, name, connector_type, connector_name

  • dataflow_connector_messages_written_total - количество записанных сообщений в sink коннектор

  • Метки: namespace, name, connector_type, connector_name, route

  • dataflow_connector_errors_total - количество ошибок в коннекторах

  • Метки: namespace, name, connector_type, connector_name, operation, error_type
  • Возможные значения error_type: см. Типы ошибок

  • dataflow_connector_connection_status - статус подключения коннектора (1 = connected, 0 = disconnected)

  • Метки: namespace, name, connector_type, connector_name

Метрики трансформеров

  • dataflow_transformer_executions_total - количество выполнений трансформера
  • Метки: namespace, name, transformer_type, transformer_index

  • dataflow_transformer_errors_total - количество ошибок в трансформерах

  • Метки: namespace, name, transformer_type, transformer_index, error_type
  • Возможные значения error_type: см. Типы ошибок

  • dataflow_transformer_duration_seconds - время выполнения трансформера (гистограмма)

  • Метки: namespace, name, transformer_type, transformer_index

  • dataflow_transformer_messages_in_total - количество входящих сообщений в трансформер

  • Метки: namespace, name, transformer_type, transformer_index

  • dataflow_transformer_messages_out_total - количество исходящих сообщений из трансформера

  • Метки: namespace, name, transformer_type, transformer_index

Детальные метрики выполнения задач

  • dataflow_task_stage_duration_seconds - время выполнения отдельных этапов задачи (гистограмма)
  • Метки: namespace, name, stage
  • Этапы: read, transformation, write, sink_write, error_sink_write, transformer_0, transformer_1, и т.д.

  • dataflow_task_message_size_bytes - размер сообщений на разных этапах обработки (гистограмма)

  • Метки: namespace, name, stage
  • Этапы: input, output, transformer_0_input, transformer_0_output, и т.д.

  • dataflow_task_stage_latency_seconds - задержка между этапами обработки (гистограмма)

  • Метки: namespace, name, from_stage, to_stage

  • dataflow_task_throughput_messages_per_second - текущая пропускная способность (сообщений в секунду)

  • Метки: namespace, name

  • dataflow_task_success_rate - процент успешных задач (0.0 до 1.0)

  • Метки: namespace, name

  • dataflow_task_end_to_end_latency_seconds - полное время жизни сообщения от получения до отправки (гистограмма)

  • Метки: namespace, name

  • dataflow_task_active_messages - количество активных сообщений в обработке

  • Метки: namespace, name

  • dataflow_task_queue_size - текущий размер очереди сообщений

  • Метки: namespace, name, queue_type
  • Типы очередей: routing, output, default, и маршруты роутера

  • dataflow_task_queue_wait_time_seconds - время ожидания сообщений в очереди (гистограмма)

  • Метки: namespace, name, queue_type

  • dataflow_task_operations_total - общее количество операций по типу

  • Метки: namespace, name, operation, status
  • Операции: transform, write, sink_write, error_sink_write
  • Статусы: success, error, cancelled

  • dataflow_task_stage_errors_total - количество ошибок на каждом этапе

  • Метки: namespace, name, stage, error_type
  • Возможные значения error_type: см. Типы ошибок

6.2 Метки

Метрики используют метки namespace и name для привязки к DataFlow, что удобно для фильтрации и агрегации в Prometheus/Grafana. Все метрики содержат как минимум эти две метки; дополнительные метки (source_type, sink_type, connector_type, stage и др.) позволяют детализировать запросы.

Примеры: - Фильтрация по конкретному DataFlow: dataflow_messages_received_total{namespace="default", name="my-dataflow"} - Агрегация по namespace: sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)

6.3 Гистограммы

Для гистограмм используются экспоненциальные buckets, подходящие для латентности и размеров сообщений:

Метрика Buckets Диапазон
dataflow_processing_duration_seconds ExponentialBuckets(0.001, 2, 10) 1 мс — ~1 с
dataflow_transformer_duration_seconds ExponentialBuckets(0.0001, 2, 12) 0.1 мс — ~400 мс
dataflow_task_stage_duration_seconds ExponentialBuckets(0.0001, 2, 14) 0.1 мс — ~1.6 с
dataflow_task_message_size_bytes ExponentialBuckets(64, 2, 16) 64 байт — ~4 МБ
dataflow_task_stage_latency_seconds ExponentialBuckets(0.0001, 2, 12) 0.1 мс — ~400 мс
dataflow_task_end_to_end_latency_seconds ExponentialBuckets(0.001, 2, 12) 1 мс — ~2 с
dataflow_task_queue_wait_time_seconds ExponentialBuckets(0.0001, 2, 12) 0.1 мс — ~400 мс
  • Латентность (время в секундах): начальное значение от 0.1 мс или 1 мс, множитель 2.
  • Размер сообщений (байты): начальное значение 64 байт, множитель 2.

Настройка мониторинга

Prometheus ServiceMonitor

Для автоматического обнаружения метрик Prometheus создайте ServiceMonitor:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: dataflow-operator
  labels:
    app: dataflow-operator
spec:
  selector:
    matchLabels:
      app: dataflow-operator
  endpoints:
    - port: metrics
      path: /metrics
      interval: 30s

Или включите ServiceMonitor в Helm chart:

serviceMonitor:
  enabled: true
  interval: 30s
  scrapeTimeout: 10s

Grafana Dashboard

Импортируйте дашборд из файла grafana-dashboard.json в Grafana для визуализации метрик.

Через Helm chart (ConfigMap): Включите monitoring.dashboard.enabled при установке. Дашборд входит в chart (dashboards/grafana-dashboard.json). Grafana sidecar (kube-prometheus-stack) подхватывает ConfigMap с меткой grafana_dashboard: "1".

Дашборд включает: - Графики количества полученных/отправленных сообщений - Графики ошибок в коннекторах и трансформерах - Время обработки сообщений - Статус подключения коннекторов - Статус DataFlow манифестов - Статистику по трансформерам

Sentry (мониторинг ошибок и трейсинг)

DataFlow Operator поддерживает Sentry для мониторинга ошибок и распределённого трейсинга. При включении оператор и поды процессоров отправляют ошибки и трейсы производительности в Sentry.

Включение через Helm chart:

sentry:
  enabled: true
  dsn: "https://xxx@o0.ingest.sentry.io/123"
  environment: production   # или staging, development
  tracesSampleRate: 0.1     # 0.0–1.0, доля трейсов для сэмплирования (по умолчанию 10%)
  debug: false             # вывод отладки Sentry SDK
  release: ""              # опционально: git commit или версия

Переменные окружения (передаются в поды оператора и процессоров при sentry.enabled и sentry.dsn):

Переменная Описание
SENTRY_DSN DSN проекта Sentry (обязательно)
SENTRY_ENVIRONMENT Имя окружения (production, staging и т.д.)
SENTRY_TRACES_SAMPLE_RATE Доля сэмплирования трейсов (0.0–1.0)
SENTRY_DEBUG Режим отладки (true/false)
SENTRY_RELEASE Версия релиза для группировки

Оператор передаёт переменные SENTRY_* в поды процессоров, поэтому они отправляют данные в тот же проект Sentry. Если SENTRY_DSN пустой, Sentry не инициализируется.

Примеры запросов Prometheus

Количество сообщений в секунду по манифесту

sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)

Процент ошибок в трансформерах

sum(rate(dataflow_transformer_errors_total[5m])) by (namespace, name, transformer_type)
/
sum(rate(dataflow_transformer_executions_total[5m])) by (namespace, name, transformer_type)
* 100

p95 время обработки сообщений

histogram_quantile(0.95, sum(rate(dataflow_processing_duration_seconds_bucket[5m])) by (namespace, name, le))

Количество активных DataFlow манифестов

sum(dataflow_status) by (namespace, name)

Средняя скорость обработки сообщений

avg(rate(dataflow_messages_received_total[5m])) by (namespace, name)

Топ манифестов по количеству ошибок

topk(10, sum(rate(dataflow_connector_errors_total[5m])) by (namespace, name))

Время выполнения трансформеров (p99)

histogram_quantile(0.99, sum(rate(dataflow_transformer_duration_seconds_bucket[5m])) by (namespace, name, transformer_type, transformer_index, le))

Интерпретация метрик

Метрики производительности

  • dataflow_messages_received_total - показывает общую нагрузку на систему. Резкий рост может указывать на увеличение трафика.
  • dataflow_processing_duration_seconds - время обработки сообщений. Высокие значения могут указывать на узкие места в обработке.
  • dataflow_transformer_duration_seconds - время выполнения трансформеров. Помогает выявить медленные трансформации.

Метрики ошибок

  • dataflow_connector_errors_total - ошибки подключения или записи/чтения в коннекторах. Высокое значение требует внимания.
  • dataflow_transformer_errors_total - ошибки в трансформерах. Может указывать на проблемы с данными или конфигурацией.

Метрики состояния

  • dataflow_connector_connection_status - статус подключения коннекторов. Значение 0 означает, что коннектор отключен.
  • dataflow_status - статус DataFlow манифеста. Значение 1 означает, что манифест работает.

Алерты

Готовые манифесты

Готовый манифест PrometheusRule с настроенными алертами доступен в репозитории:

Применить манифест:

kubectl apply -f monitoring/alerts/prometheusrule.yaml

Через Helm chart: Включите monitoring.prometheusRule.enabled и задайте monitoring.prometheusRule.additionalLabels (например, release: kube-prometheus-stack) в соответствии с ruleSelector вашего Prometheus.

Требования: Prometheus Operator (например, kube-prometheus-stack). Метка release: kube-prometheus-stack должна совпадать с ruleSelector вашего Prometheus. Измените метку, если используете другой Prometheus.

Алерт Описание
DataFlowInError DataFlow в состоянии Error
DataFlowConnectorDisconnected Коннектор отключён
DataFlowHighErrorRate Процент ошибок (коннектор + трансформер) > 1%
DataFlowSlowProcessing p95 время обработки сообщений > 1 с
DataFlowLowTaskSuccessRate Процент успешных задач < 95%
DataFlowHighQueueSize Размер очереди > 1000 сообщений
DataFlowHighE2ELatency p99 end-to-end латентность > 5 с

Дополнительные запросы

Используйте эти PromQL-выражения для кастомных дашбордов или разовых алертов:

Высокий процент ошибок

(
  sum(rate(dataflow_connector_errors_total[5m])) by (namespace, name)
  + sum(rate(dataflow_transformer_errors_total[5m])) by (namespace, name)
)
/
sum(rate(dataflow_messages_received_total[5m])) by (namespace, name)
> 0.01

Отключённые коннекторы

dataflow_connector_connection_status == 0

DataFlow в Error или Stopped

dataflow_status{phase=~"Error|Stopped"} == 1