DataFlow Operator
DataFlow Operator - это Kubernetes оператор для потоковой передачи данных между различными источниками данных с поддержкой трансформаций сообщений.
Текущие версии
| Компонент | Версия |
|---|---|
| DataFlow Operator | — |
| Helm Charts | — |
| DataFlow MCP | — |
| DataFlow Web | — |
Обзор
DataFlow Operator позволяет декларативно определять потоки данных между различными источниками и приемниками через Kubernetes Custom Resource Definitions (CRD). Оператор автоматически управляет жизненным циклом потоков данных, обрабатывает сообщения и применяет необходимые трансформации.
Основные возможности
Поддержка множественных источников данных
- Kafka — чтение и запись сообщений (TLS, SASL, Avro, Schema Registry)
- PostgreSQL — кастомные SQL, батч-вставки, UPSERT режим
- ClickHouse — опрос таблиц, батч-вставки, автосоздание MergeTree
- Trino — SQL запросы, Keycloak OAuth2, батч-вставки
- Nessie — таблицы Apache Iceberg через каталог Nessie (ветки, Basic/Bearer auth, опрос, батч-дозапись)
Богатый набор трансформаций
- Timestamp - добавление временной метки к каждому сообщению
- Flatten - развертывание массивов в отдельные сообщения с сохранением родительских полей
- Filter - фильтрация сообщений на основе JSONPath условий
- Mask - маскирование чувствительных данных с сохранением длины или без
- Router - маршрутизация сообщений в разные приемники на основе условий
- Select - выбор определенных полей из сообщений
- Remove - удаление указанных полей из сообщений
- SnakeCase - преобразование имен полей в snake_case
- CamelCase - преобразование имен полей в CamelCase
Гибкая маршрутизация
Оператор поддерживает условную маршрутизацию сообщений в разные приемники на основе JSONPath выражений, что позволяет создавать сложные сценарии обработки данных.
Простое управление
Декларативная конфигурация через Kubernetes CRD позволяет легко управлять потоками данных, версионировать конфигурации и интегрироваться с CI/CD системами.
Безопасная конфигурация
Поддержка конфигурации коннекторов из Kubernetes Secrets через SecretRef позволяет безопасно хранить credentials, токены и connection strings без их явного указания в спецификации DataFlow.
Быстрый старт
Установка за минуты
Установите оператор через Helm и создайте первый поток данных менее чем за 5 минут.
Установка оператора
# Установка оператора через Helm из OCI registry
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator
# Проверка установки
kubectl get pods -l app.kubernetes.io/name=dataflow-operator
# Проверка CRD
kubectl get crd dataflows.dataflow.dataflow.io
Создание первого потока данных
Создайте простой поток данных из Kafka в PostgreSQL:
kubectl apply -f dataflow/config/samples/kafka-to-postgres.yaml
Проверьте статус:
kubectl get dataflow kafka-to-postgres
kubectl describe dataflow kafka-to-postgres
Локальная разработка
Для локальной разработки и тестирования:
# Запуск зависимостей (Kafka, PostgreSQL)
docker-compose up -d
# Запуск оператора локально
make run
Архитектура
Оператор состоит из следующих компонентов:
CRD (Custom Resource Definitions)
Определяет схему ресурса DataFlow, который описывает конфигурацию потока данных, включая источник, приемник и список трансформаций.
Controller
Kubernetes контроллер, который отслеживает изменения ресурсов DataFlow и управляет их жизненным циклом. Контроллер создает и управляет процессорами для каждого активного потока данных.
Connectors
Модульная система коннекторов для различных источников и приемников данных. Каждый коннектор реализует стандартный интерфейс для чтения или записи данных.
Transformers
Модули трансформации сообщений, которые применяются последовательно к каждому сообщению в порядке, указанном в конфигурации.
Processor
Оркестратор обработки сообщений, который координирует работу источника, трансформаций и приемника. Обрабатывает ошибки, ведет статистику и управляет жизненным циклом потока данных.
Мониторинг и статус
Каждый ресурс DataFlow имеет статус, который включает:
- Phase - текущая фаза потока данных (Running, Error, etc.)
- Message - дополнительная информация о статусе
- LastProcessedTime - время последнего обработанного сообщения
- ProcessedCount - количество обработанных сообщений
- ErrorCount - количество ошибок
Оператор также экспортирует метрики Prometheus и поддерживает Sentry для детального мониторинга: - Количество полученных/отправленных сообщений по каждому манифесту - Ошибки в коннекторах и трансформерах - Время обработки сообщений и выполнения трансформеров - Статус подключения коннекторов - Мониторинг ошибок и трейсинг через Sentry (опционально)
Подробнее см. раздел Метрики.
Документация
- Начало работы — установка и первый поток данных
- Установка и Helm Configuration — Helm charts, values, опции установки
- Web GUI — веб-интерфейс: работа и возможности
- Коннекторы — Kafka, PostgreSQL, ClickHouse, Trino, Nessie (источники и приёмники)
- Трансформации — трансформации сообщений
- Примеры — практические примеры
- Обработка ошибок — error sink и обработка ошибок
- Метрики — метрики Prometheus
- Разработка — руководство для разработчиков
Лицензия
Apache License 2.0