Skip to content

DataFlow Operator

DataFlow Operator - это Kubernetes оператор для потоковой передачи данных между различными источниками данных с поддержкой трансформаций сообщений.

Текущие версии

Компонент Версия
DataFlow Operator
Helm Charts
DataFlow MCP
DataFlow Web

Обзор

DataFlow Operator позволяет декларативно определять потоки данных между различными источниками и приемниками через Kubernetes Custom Resource Definitions (CRD). Оператор автоматически управляет жизненным циклом потоков данных, обрабатывает сообщения и применяет необходимые трансформации.

Основные возможности

Поддержка множественных источников данных

  • Kafka — чтение и запись сообщений (TLS, SASL, Avro, Schema Registry)
  • PostgreSQL — кастомные SQL, батч-вставки, UPSERT режим
  • ClickHouse — опрос таблиц, батч-вставки, автосоздание MergeTree
  • Trino — SQL запросы, Keycloak OAuth2, батч-вставки
  • Nessie — таблицы Apache Iceberg через каталог Nessie (ветки, Basic/Bearer auth, опрос, батч-дозапись)

Богатый набор трансформаций

  • Timestamp - добавление временной метки к каждому сообщению
  • Flatten - развертывание массивов в отдельные сообщения с сохранением родительских полей
  • Filter - фильтрация сообщений на основе JSONPath условий
  • Mask - маскирование чувствительных данных с сохранением длины или без
  • Router - маршрутизация сообщений в разные приемники на основе условий
  • Select - выбор определенных полей из сообщений
  • Remove - удаление указанных полей из сообщений
  • SnakeCase - преобразование имен полей в snake_case
  • CamelCase - преобразование имен полей в CamelCase

Гибкая маршрутизация

Оператор поддерживает условную маршрутизацию сообщений в разные приемники на основе JSONPath выражений, что позволяет создавать сложные сценарии обработки данных.

Простое управление

Декларативная конфигурация через Kubernetes CRD позволяет легко управлять потоками данных, версионировать конфигурации и интегрироваться с CI/CD системами.

Безопасная конфигурация

Поддержка конфигурации коннекторов из Kubernetes Secrets через SecretRef позволяет безопасно хранить credentials, токены и connection strings без их явного указания в спецификации DataFlow.

Быстрый старт

Установка за минуты

Установите оператор через Helm и создайте первый поток данных менее чем за 5 минут.

Установка оператора

# Установка оператора через Helm из OCI registry
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator

# Проверка установки
kubectl get pods -l app.kubernetes.io/name=dataflow-operator

# Проверка CRD
kubectl get crd dataflows.dataflow.dataflow.io

Создание первого потока данных

Создайте простой поток данных из Kafka в PostgreSQL:

kubectl apply -f dataflow/config/samples/kafka-to-postgres.yaml

Проверьте статус:

kubectl get dataflow kafka-to-postgres
kubectl describe dataflow kafka-to-postgres

Локальная разработка

Для локальной разработки и тестирования:

# Запуск зависимостей (Kafka, PostgreSQL)
docker-compose up -d

# Запуск оператора локально
make run

Архитектура

Оператор состоит из следующих компонентов:

CRD (Custom Resource Definitions)

Определяет схему ресурса DataFlow, который описывает конфигурацию потока данных, включая источник, приемник и список трансформаций.

Controller

Kubernetes контроллер, который отслеживает изменения ресурсов DataFlow и управляет их жизненным циклом. Контроллер создает и управляет процессорами для каждого активного потока данных.

Connectors

Модульная система коннекторов для различных источников и приемников данных. Каждый коннектор реализует стандартный интерфейс для чтения или записи данных.

Transformers

Модули трансформации сообщений, которые применяются последовательно к каждому сообщению в порядке, указанном в конфигурации.

Processor

Оркестратор обработки сообщений, который координирует работу источника, трансформаций и приемника. Обрабатывает ошибки, ведет статистику и управляет жизненным циклом потока данных.

Мониторинг и статус

Каждый ресурс DataFlow имеет статус, который включает:

  • Phase - текущая фаза потока данных (Running, Error, etc.)
  • Message - дополнительная информация о статусе
  • LastProcessedTime - время последнего обработанного сообщения
  • ProcessedCount - количество обработанных сообщений
  • ErrorCount - количество ошибок

Оператор также экспортирует метрики Prometheus и поддерживает Sentry для детального мониторинга: - Количество полученных/отправленных сообщений по каждому манифесту - Ошибки в коннекторах и трансформерах - Время обработки сообщений и выполнения трансформеров - Статус подключения коннекторов - Мониторинг ошибок и трейсинг через Sentry (опционально)

Подробнее см. раздел Метрики.

Документация

Лицензия

Apache License 2.0