Архитектура
В этом разделе описано, как работает DataFlow Operator: его роль в Kubernetes, цикл реконсиляции и поток данных в рантайме внутри каждого процессора.
Обзор
DataFlow Operator обеспечивает декларативное управление потоками данных через пользовательский ресурс Kubernetes (CRD). Вы описываете ресурс DataFlow с источником, опциональными трансформациями и приёмником; оператор создаёт и поддерживает процессор — под, который непрерывно читает из источника, применяет трансформации и пишет в приёмник (и при необходимости в приёмник ошибок).
Общая схема:
- Вы создаёте или обновляете DataFlow (например, через
kubectl apply). - Оператор следит за ресурсами DataFlow и для каждого создаёт или обновляет ConfigMap (с подставленным spec) и Deployment (один под с бинарником процессора).
- Под процессора читает spec из смонтированного ConfigMap, подключается к источнику и приёмнику(ам) и выполняет конвейер: чтение → трансформация → запись.
Поток данных (концептуально)
Поток данных в каждом процессоре следует линейному конвейеру: Источник → Трансформации → Приёмник. Опционально неудачные записи направляются в Приёмник ошибок.
flowchart LR
subgraph Input[" "]
Source["Источник\n(Kafka / PostgreSQL / Trino / ClickHouse / Nessie)"]
end
subgraph Transform[" "]
T1["Трансформация 1"]
T2["Трансформация 2"]
TN["Трансформация N"]
T1 --> T2 --> TN
end
subgraph Output[" "]
MainSink["Основной приёмник"]
ErrSink["Приёмник ошибок\n(опционально)"]
end
Source -->|"чтение"| T1
TN -->|"запись"| MainSink
TN -.->|"при ошибке"| ErrSink
Трансформации применяются по порядку: timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase. Каждое сообщение проходит цепочку перед записью в приёмник.
Архитектура в Kubernetes
Custom Resource Definition (CRD)
- API group:
dataflow.dataflow.io - Ресурс:
dataflows(kindDataFlow), namespaced. - Spec включает:
- Source: тип (например
kafka,postgresql,trino) и конфигурацию (брокеры, топик, строка подключения и т.д.). - Sink: тип и конфигурация основного приёмника.
- Transformations: упорядоченный список трансформаций (timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase и др.).
- Errors: опциональный приёмник для сообщений, которые не удалось записать в основной.
- Resources: опциональные CPU/память для пода процессора.
- Scheduling: опционально
nodeSelector,affinity,tolerations. - CheckpointPersistence: опционально; по умолчанию
true. При включении polling-источники (PostgreSQL, ClickHouse, Trino) сохраняют позицию чтения в ConfigMap, уменьшая дубликаты при перезапуске. Задайтеfalseдля отключения. - ChannelBufferSize: опционально; по умолчанию
100. Размер буфера каналов между source, processor и sink. Используйте 500–1000 при высокой нагрузке Kafka, чтобы снизить блокировки, когда sink медленнее source.
Секреты задаются через SecretRef в spec; оператор подставляет их перед записью spec в ConfigMap.
Структура DataFlow CRD
Схема основных полей spec DataFlow для быстрого понимания:
flowchart TB
subgraph DataFlow["DataFlow"]
Spec["spec"]
Status["status"]
end
subgraph SpecFields["поля spec"]
Source["source (обязательно)"]
Sink["sink (обязательно)"]
Trans["transformations (опционально)"]
Errors["errors (опционально)"]
Resources["resources (опционально)"]
Scheduling["scheduling (опционально)"]
Checkpoint["checkpointPersistence (опционально)"]
ChannelBuffer["channelBufferSize (опционально)"]
Image["processorImage / processorVersion (опционально)"]
end
Source --> SourceTypes["type: kafka | postgresql | trino | clickhouse | nessie"]
Sink --> SinkTypes["type: kafka | postgresql | trino | clickhouse | nessie"]
Trans --> TransTypes["timestamp | flatten | filter | mask | router | select | remove | snakeCase | camelCase"]
Spec --> Source
Spec --> Sink
Spec --> Trans
Spec --> Errors
Spec --> Resources
Spec --> Scheduling
Spec --> Checkpoint
Spec --> ChannelBuffer
Spec --> Image
Deployment оператора
Оператор работает как один Deployment в кластере (например, установленный через Helm). Он использует controller-runtime и один контроллер DataFlowReconciler, который реконсилирует ресурсы DataFlow. Включено Leader election (ID dataflow-operator.dataflow.io): при нескольких репликах только один активный лидер выполняет цикл реконсиляции (высокая доступность).
Контроллер
DataFlowReconciler:
- Следит за:
DataFlow(основной ресурс), владеет объектамиDeploymentиConfigMapдля каждого DataFlow (их жизненный цикл привязан к DataFlow). - Опционально следит за собственным Deployment оператора (когда заданы
OPERATOR_DEPLOYMENT_NAMEиOPERATOR_NAMESPACE). При обновлении этого Deployment (например, новый образ) контроллер запускает реконсиляцию всех DataFlow, чтобы поды процессоров могли перейти на новый образ процессора.
Ресурсы на один DataFlow
Для каждого DataFlow с именем <name> в namespace создаются:
| Ресурс | Имя | Назначение |
|---|---|---|
| ConfigMap | dataflow-<name>-spec |
Хранит spec.json (spec с подставленными секретами). |
| ConfigMap | dataflow-<name>-checkpoint |
Хранит позицию чтения для polling-источников (по умолчанию). Не создаётся при checkpointPersistence: false. |
| Deployment | dataflow-<name> |
Одна реплика; под запускает контейнер processor. |
| ServiceAccount, Role, RoleBinding | dataflow-<name>-processor |
RBAC для доступа процессора к checkpoint ConfigMap (по умолчанию). Не создаётся при checkpointPersistence: false. |
Контейнер процессора:
- Образ: из переменной окружения
PROCESSOR_IMAGE(часто тот же образ и тег, что и у оператора). - Аргументы:
--spec-path=/etc/dataflow/spec.json,--namespace=...,--name=.... - Том: ConfigMap
dataflow-<name>-specсмонтирован в/etc/dataflow(только чтение). - Переменные:
LOG_LEVEL(например изPROCESSOR_LOG_LEVEL).
Контроллер выставляет owner reference от DataFlow к ConfigMap и Deployment, чтобы при удалении DataFlow эти объекты тоже удалялись.
RBAC
Оператор использует ClusterRole (и ClusterRoleBinding на свой ServiceAccount) с правами на:
- Чтение/запись DataFlow и статуса (CRD).
- Создание/обновление events.
- Чтение secrets (для подстановки).
- Создание/обновление/удаление ConfigMap и Deployment в тех же namespace, где находятся DataFlow.
- При включённой персистенции checkpoint: создание ServiceAccount, Role и RoleBinding для доступа подов процессора к checkpoint ConfigMap.
Точные правила заданы в Helm-шаблонах (например clusterrole.yaml, clusterrolebinding.yaml).
Опционально: GUI
Helm-чарт может развернуть опциональный GUI (отдельный Deployment, Service и при необходимости Ingress) для просмотра и управления потоками данных. Он не входит в ядро архитектуры оператора и процессора.
Admission Webhook (Validating)
Оператор может принимать запросы Validating Admission Webhook: при создании или обновлении ресурса DataFlow API-сервер Kubernetes отправляет объект оператору на порт 9443; оператор проверяет spec (типы source/sink, наличие обязательных полей, допустимые трансформации и т.д.) и либо разрешает операцию, либо отклоняет её с понятным сообщением об ошибке.
Зачем это нужно: без webhook некорректный spec (неверный тип source/sink, пустые обязательные поля, неизвестная трансформация) приведёт к ошибке только в рантайме — когда контроллер уже создал ConfigMap и Deployment, а процессор при старте не смог построить коннектор или трансформацию. Пользователь увидит ошибку в статусе DataFlow или в логах пода, а не при kubectl apply. С включённым webhook невалидный spec отклоняется до записи в etcd: kubectl apply и GUI получают ответ 4xx с текстом ошибки, в кластере не появляются лишние объекты и не создаётся под процессора с заведомо нерабочей конфигурацией.
Опциональность: webhook включается только на уровне Helm (value webhook.enabled). По умолчанию он выключен: ValidatingWebhookConfiguration не создаётся, API-сервер не вызывает оператор при create/update DataFlow — поведение как раньше. Подробная настройка (TLS, caBundle, включение в production) описана в разделе Настройка Validating Webhook руководства по разработке.
Схема архитектуры в Kubernetes
На схеме показано взаимодействие пользователя, API server, оператора и подов процессора.
flowchart LR
User["User (kubectl)"]
API["API Server"]
CRD["DataFlow CRD"]
Operator["Operator Pod"]
CMSpec["ConfigMap spec"]
CMCheckpoint["ConfigMap checkpoint"]
Dep["Deployment"]
Proc["Processor Pod"]
Ext["Kafka / PostgreSQL / Trino / Nessie"]
User -->|"apply DataFlow"| API
API --> CRD
Operator -->|watch| CRD
Operator -->|create/update| CMSpec
Operator -->|create/update| CMCheckpoint
Operator -->|create/update| Dep
Dep --> Proc
Proc -->|mount spec| CMSpec
Proc -->|read/write checkpoint| CMCheckpoint
Proc -->|connect| Ext
Цикл реконсиляции
Для каждого DataFlow контроллер выполняет следующие шаги (при создании, обновлении или изменении принадлежащих ресурсов):
-
Получить DataFlow
Если ресурс не найден — выйти. Если задан DeletionTimestamp: удалить Deployment, ConfigMap (spec и checkpoint) и RBAC процессора (очистка), обновить статус наStoppedи выйти. -
Подставить секреты
SecretResolver подставляет все поля сSecretRefв spec значениями из Kubernetes Secrets. Результат: resolved spec. -
ConfigMap
Создать или обновить ConfigMapdataflow-<name>-specс ключомspec.json= JSON resolved spec. Установить controller reference на DataFlow. -
Checkpoint ConfigMap и RBAC (когда
checkpointPersistenceнеfalse, по умолчанию включено)
Создать ConfigMapdataflow-<name>-checkpointи RBAC (ServiceAccount, Role, RoleBinding), чтобы под процессора мог читать и записывать checkpoint. Процессор сохраняет позицию чтения источника (lastReadID, lastReadChangeTime), уменьшая дубликаты при перезапуске. -
Deployment
Создать или обновить Deploymentdataflow-<name>: образ процессора, том из spec ConfigMap, аргументы и переменные окружения как выше. При включённой персистенции checkpoint задатьserviceAccountNameдля использования выделенного ServiceAccount. Ресурсы и affinity из spec DataFlow, если заданы. Установить controller reference на DataFlow. -
Статус Deployment
Прочитать Deployment; выставить в статусе DataFlow Phase и Message по нему (напримерRunningприReadyReplicas > 0,Pendingпри запуске реплик,Errorпри отсутствии реплик). -
Обновить статус DataFlow
Записать Phase, Message и остальные поля статуса в ресурс DataFlow (с повторными попытками при конфликте).
Схема цикла реконсиляции
flowchart TD
A[Get DataFlow] --> B{Deleted?}
B -->|Yes| C[Cleanup Deployment, ConfigMaps, RBAC]
C --> D[Update Status Stopped]
B -->|No| E[Resolve Secrets]
E --> F[Create or Update ConfigMap]
F --> F2{CheckpointPersistence?}
F2 -->|Yes| F3[Create Checkpoint ConfigMap and RBAC]
F2 -->|No| G
F3 --> G[Create or Update Deployment]
G --> H[Read Deployment Status]
H --> I[Update DataFlow Status]
Процессор данных (рантайм)
Процессор — компонент, который перемещает данные: читает из источника, применяет трансформации и пишет в приёмник(и). Он работает в поде, созданном оператором.
Точка входа
Бинарник процессора (собирается из cmd/processor/main.go) запускается с аргументами:
--spec-path(по умолчанию/etc/dataflow/spec.json)--namespace,--name(namespace и имя DataFlow для логов и метрик)
Он читает spec из файла, строит по нему Processor и вызывает Processor.Start(ctx) до отмены контекста (например по SIGTERM).
Структура процессора
Processor (в internal/processor/processor.go) строится из spec и содержит:
- Source: SourceConnector (Kafka, PostgreSQL, Trino или Nessie) —
Connect,Read,Close. По умолчанию polling-источники загружают начальный checkpoint из ConfigMap и сохраняют его после каждой успешной записи в sink (с debounce). Отключить:checkpointPersistence: false. - Sink: SinkConnector основного приёмника —
Connect,Write,Close. - Error sink (опционально): ещё один SinkConnector для неудачных записей.
- Transformations: упорядоченный список реализаций Transformer (timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase).
- Router sinks: при использовании трансформации
router— отображение условие → SinkSpec для динамических приёмников.
Коннекторы создаются фабрикой из spec.Source, spec.Sink (и spec.Errors); см. internal/connectors/.
Поток выполнения
-
Подключение
source.Connect(ctx), затемsink.Connect(ctx), при необходимостиerrorSink.Connect(ctx). -
Чтение
source.Read(ctx)возвращает канал Message (каждое сообщение — payload и опциональные метаданные, напримерrouted_conditionот router). -
Обработка
Горутина processMessages: для каждого сообщения из канала по порядку применяются трансформации. Каждая трансформация принимает одно или несколько сообщений и возвращает одно или несколько (filter отбрасывает, flatten может порождать несколько, router добавляетrouted_conditionи может направлять по разным маршрутам). Результат отправляется во внутренний канал. -
Запись
writeMessages читает из этого канала: - Если заданы router-приёмники: сообщения распределяются по метаданным в соответствующий приёмник маршрута или в приёмник по умолчанию (основной); для каждого маршрута может создаваться свой коннектор (по требованию).
- Иначе: все сообщения идут в основной приёмник.
- При ошибке записи, если задан error sink, неудачное сообщение может быть отправлено туда.
Интерфейсы коннекторов описаны в internal/connectors/interface.go: SourceConnector (Connect, Read, Close), SinkConnector (Connect, Write, Close).
Коннекторы и трансформации
- Типы source/sink: Kafka, PostgreSQL, Trino, Nessie (выбор по
spec.source.typeиspec.sink.type). - Трансформации (порядок задаётся в spec): timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase.
Поток данных в процессоре (схема)
flowchart LR
Src[Source Connector]
ReadChan[Read Channel]
Trans[Transform 1 .. N]
Write[writeMessages]
MainSink[Main Sink]
ErrSink[Error Sink]
RouteSinks[Router Sinks]
Src -->|Connect, Read| ReadChan
ReadChan --> Trans
Trans --> Write
Write --> MainSink
Write --> ErrSink
Write --> RouteSinks
Кратко
- Kubernetes: вы объявляете ресурс DataFlow; оператор приводит его к ConfigMap (spec) и Deployment (под процессора). По умолчанию создаётся второй ConfigMap и RBAC для хранения checkpoint (задайте
checkpointPersistence: falseдля отключения). RBAC и опциональный GUI дополняют картину. - Реконсиляция: получить DataFlow → подставить секреты → обновить ConfigMap → обновить Deployment → отразить статус Deployment в статусе DataFlow.
- Рантайм: каждый под процессора выполняет один конвейер: источник → канал чтения → трансформации → запись в основной (и при необходимости в приёмник ошибок и маршруты router), с подключаемыми коннекторами и фиксированным набором трансформаций.