Examples
Практические примеры использования DataFlow Operator для различных сценариев обработки данных.
Поток данных в каждом конвейере следует схеме Источник → Трансформации → Приёмник. См. Архитектура — Поток данных для концептуальной диаграммы.
Простой Kafka → PostgreSQL поток
Базовый пример передачи данных из Kafka топика в PostgreSQL таблицу.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: output_table
autoCreateTable: true
Применение:
kubectl apply -f dataflow/config/samples/kafka-to-postgres.yaml
Kafka с режимом сырой записи (rawMode)
Пример сохранения полного контекста Kafka-сообщения: value + метаданные (offset, partition, timestamp, key, topic). Используйте rawMode: true в sink для сохранения сообщений как JSON с колонками value и _metadata.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-raw-to-clickhouse
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default"
table: raw_events
autoCreateTable: true
rawMode: true # Сохраняет каждое сообщение как {"value": ..., "_metadata": {...}}
Формат выходного сообщения при rawMode (sink оборачивает используя msg.Metadata):
{
"value": {"id": 1, "event": "user_login"},
"_metadata": {
"offset": 100,
"partition": 0,
"timestamp": "2024-02-27T10:13:20.000Z",
"key": "user-123",
"topic": "input-topic"
}
}
Для sink, ожидающего только данные без обёртки, добавьте трансформацию select с полем value:
transformations:
- type: select
config:
fields: ["value"]
С трансформациями (Flatten + Timestamp)
Пример обработки сообщений с массивом товаров, развертывание в отдельные сообщения и добавление временной метки.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: stock-flatten
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: stock-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: stock_items
autoCreateTable: true
batchSize: 50
transformations:
# Развернуть массив rowsStock в отдельные сообщения
- type: flatten
config:
field: rowsStock
# Добавить временную метку
- type: timestamp
config:
fieldName: created_at
Входное сообщение:
{
"type": "stock",
"version": 32476984,
"rowsStock": [
{"sku": 400125868, "section": "A015"},
{"sku": 400125868, "section": "A001"}
]
}
Выходные сообщения:
{
"type": "stock",
"version": 32476984,
"sku": 400125868,
"section": "A015",
"created_at": "2024-01-15T10:30:00Z"
}
{
"type": "stock",
"version": 32476984,
"sku": 400125868,
"section": "A001",
"created_at": "2024-01-15T10:30:00Z"
}
Обработка ошибок с error sink
Пример настройки отдельного приёмника для сообщений, которые не удалось записать в основной sink.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres-with-errors
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: output_table
autoCreateTable: true
errors:
type: kafka
config:
brokers:
- localhost:9092
topic: error-topic
Применение:
kubectl apply -f dataflow/config/samples/kafka-to-postgres-with-errors.yaml
Структура сообщений об ошибках и детали конфигурации — в разделе Обработка ошибок.
С роутером для множественных приемников
Пример маршрутизации сообщений в разные приемники на основе условий.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: router-example
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: events
consumerGroup: dataflow-group
# Основной приемник для сообщений, не соответствующих условиям
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: default-events
transformations:
- type: router
config:
routes:
# Ошибки → отдельный топик
- condition: "$.level"
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: error-events
# Предупреждения → PostgreSQL
- condition: "$.priority"
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: warnings
autoCreateTable: true
Входные сообщения:
{"level": "error", "message": "Critical error"} // → error-events топик
{"priority": "high", "message": "Warning"} // → warnings таблица
{"message": "Info"} // → default-events топик
С фильтрацией и маскированием
Пример обработки пользовательских данных с фильтрацией активных пользователей, маскированием чувствительных данных и удалением внутренних полей.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: secure-pipeline
spec:
source:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: users
query: "SELECT * FROM users WHERE updated_at > NOW() - INTERVAL '1 hour'"
pollInterval: 300
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: public-users
transformations:
# Фильтровать только активных пользователей
- type: filter
config:
condition: "$.active"
# Маскировать чувствительные данные
- type: mask
config:
fields:
- password
- email
keepLength: true
# Удалить внутренние поля
- type: remove
config:
fields:
- internal_id
- secret_token
- debug_info
# Добавить временную метку экспорта
- type: timestamp
config:
fieldName: exported_at
Входное сообщение:
{
"id": 1,
"username": "john",
"email": "john@example.com",
"password": "secret123",
"active": true,
"internal_id": 999,
"secret_token": "abc123"
}
Выходное сообщение:
{
"id": 1,
"username": "john",
"email": "***********",
"password": "*********",
"active": true,
"exported_at": "2024-01-15T10:30:00Z"
}
PostgreSQL → Kafka с выбором полей
Пример чтения из PostgreSQL, выборки определенных полей и отправки в Kafka.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-to-kafka-select
spec:
source:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: orders
query: "SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '1 day'"
pollInterval: 60
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: order-events
transformations:
- type: select
config:
fields:
- order_id
- customer_id
- total
- status
- created_at
- type: timestamp
config:
fieldName: processed_at
PostgreSQL → PostgreSQL (репликация / ETL)
Пример чтения данных из одной PostgreSQL базы и записи преобразованных данных в другую PostgreSQL базу.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-to-postgres
spec:
source:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@source-postgres:5432/source_db?sslmode=disable"
table: source_orders
query: "SELECT * FROM source_orders WHERE updated_at > NOW() - INTERVAL '5 minutes'"
pollInterval: 60
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@target-postgres:5432/target_db?sslmode=disable"
table: target_orders
autoCreateTable: true
batchSize: 100
upsertMode: true # Включает обновление существующих записей вместо пропуска
transformations:
# Оставляем только нужные поля
- type: select
config:
fields:
- id
- customer_id
- total
- status
- updated_at
# Добавляем время синхронизации
- type: timestamp
config:
fieldName: synced_at
Варианты использования:
- Онлайн-репликация: периодическое копирование обновленных записей из операционной БД в аналитическую
- ETL-пайплайн: подготовка и очистка данных при переносе между схемами/кластерами PostgreSQL
Важно: При использовании upsertMode: true существующие записи в целевой таблице будут обновляться при конфликте по PRIMARY KEY (или указанному conflictKey). Без upsertMode обновленные записи из источника будут пропускаться, если они уже существуют в целевой таблице.
Комплексный пример: ETL пайплайн
Полноценный ETL пайплайн с множественными трансформациями.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: etl-pipeline
spec:
source:
type: kafka
config:
brokers:
- kafka1:9092
- kafka2:9092
topic: raw-events
consumerGroup: etl-group
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/analytics?sslmode=disable"
table: processed_events
autoCreateTable: true
batchSize: 100
transformations:
# 1. Развернуть вложенные массивы
- type: flatten
config:
field: items
# 2. Добавить временную метку обработки
- type: timestamp
config:
fieldName: processed_at
format: RFC3339
# 3. Фильтровать только валидные события
- type: filter
config:
condition: "$.valid"
# 4. Маскировать PII данные
- type: mask
config:
fields:
- user.email
- user.phone
keepLength: true
# 5. Удалить отладочную информацию
- type: remove
config:
fields:
- debug
- internal_metadata
- test_flag
# 6. Выбрать только нужные поля для финального результата
- type: select
config:
fields:
- event_id
- user.id
- item.sku
- item.quantity
- processed_at
Kafka → Kafka с роутингом по типам
Пример чтения из одного Kafka топика и маршрутизации в разные топики на основе типа события.
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-router
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: all-events
consumerGroup: router-group
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: default-events
transformations:
- type: router
config:
routes:
- condition: "$.type"
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: user-events
- condition: "$.category"
sink:
type: kafka
config:
brokers:
- localhost:9092
topic: product-events
Использование Secrets для credentials
DataFlow Operator поддерживает конфигурацию коннекторов из Kubernetes Secrets через поля *SecretRef.
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
namespace: default
type: Opaque
stringData:
brokers: "kafka1:9092,kafka2:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
username: "kafka-user"
password: "kafka-password"
---
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: default
type: Opaque
stringData:
connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: secure-dataflow
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
sasl:
mechanism: scram-sha-256
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
sink:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
autoCreateTable: true
Применение:
kubectl apply -f dataflow/config/samples/kafka-to-postgres-secrets.yaml
Поддерживаемые поля, TLS сертификаты и устранение неполадок — в разделе Использование Secrets в Kubernetes.
Мониторинг и отладка
Проверка статуса DataFlow
# Получить список всех DataFlow
kubectl get dataflow
# Детальная информация
kubectl describe dataflow <name>
# Статус в формате YAML
kubectl get dataflow <name> -o yaml
Просмотр логов
# Логи оператора
kubectl logs -l app.kubernetes.io/name=dataflow-operator -f
# События Kubernetes
kubectl get events --sort-by='.lastTimestamp' | grep dataflow
Проверка обработанных сообщений
Статус DataFlow содержит метрики:
status:
phase: Running
processedCount: 1500
errorCount: 2
lastProcessedTime: "2024-01-15T10:30:00Z"
message: "Processing messages successfully"
Рекомендации
Производительность
- Используйте
batchSizeдля PostgreSQL приемников - Настройте правильный
pollIntervalдля PostgreSQL источников - Используйте несколько инстансов оператора для масштабирования
Безопасность
- Используйте Kubernetes Secrets для credentials
- Включайте TLS для Kafka соединений
- Маскируйте чувствительные данные перед отправкой
Надежность
- Настройте правильные consumer groups для Kafka
- Мониторьте статус DataFlow ресурсов
Высоконагруженный Kafka-пайплайн
При высокой скорости сообщений Kafka (десятки тысяч msg/s) увеличьте channelBufferSize и batchSize в sink:
spec:
channelBufferSize: 500 # по умолчанию 100; снижает блокировки, когда sink медленнее source
source:
type: kafka
config:
brokers: [localhost:9092]
topic: high-volume-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "..."
table: events
batchSize: 500
batchFlushIntervalSeconds: 2
Настройка ресурсов и размещения подов
Каждый ресурс DataFlow создает отдельный под (Deployment) для обработки данных. Вы можете настроить ресурсы, выбор нод, affinity и tolerations для этих подов.
Пример: Кастомные ресурсы и выбор нод
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres-with-resources
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: output_table
# Настройка ресурсов для пода процессора
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "1Gi"
# Выбор нод для размещения пода
nodeSelector:
node-type: compute
zone: us-east-1
# Правила affinity для более точного контроля размещения
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/arch
operator: In
values:
- amd64
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: node-type
operator: In
values:
- compute
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 50
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- dataflow-processor
topologyKey: kubernetes.io/hostname
# Tolerations для работы с tainted нодами
tolerations:
- key: dedicated
operator: Equal
value: dataflow
effect: NoSchedule
- key: workload-type
operator: Equal
value: batch
effect: NoSchedule
Применение:
kubectl apply -f dataflow/config/samples/kafka-to-postgres-with-resources.yaml
Настройка ресурсов
- resources: Определяет запросы и лимиты CPU и памяти для пода процессора
- Если не указано, используются значения по умолчанию:
100mCPU /128Miпамяти (requests),500mCPU /512Miпамяти (limits) - Используйте это для обеспечения достаточных ресурсов для высоконагруженной обработки
Выбор нод
- nodeSelector: Простые пары ключ-значение для выбора конкретных нод
- Пример:
node-type: computeгарантирует, что поды будут запускаться только на нодах с меткойnode-type=compute
Правила Affinity
- affinity: Продвинутые правила размещения с использованием Kubernetes affinity
- nodeAffinity: Контроль того, на каких нодах могут запускаться поды
- podAffinity: Предпочтение запуска подов рядом с другими подами (например, другими процессорами dataflow)
- podAntiAffinity: Избегание запуска подов рядом с другими подами (например, распределение по нодам)
Tolerations
- tolerations: Позволяют подам запускаться на tainted нодах
- Полезно для выделенных compute нод или специализированного оборудования
- Пример: Запуск процессоров dataflow на нодах, выделенных для batch workloads
Поведение по умолчанию
Если ресурсы, nodeSelector, affinity или tolerations не указаны: - Применяются ресурсы по умолчанию (100m CPU / 128Mi памяти requests, 500m CPU / 512Mi памяти limits) - Поды могут запускаться на любой ноде (нет nodeSelector) - Не применяются правила affinity - Поды не могут запускаться на tainted нодах (нет tolerations)
Проверка статуса подов
После создания DataFlow с кастомными ресурсами проверьте под:
# Список подов, созданных DataFlow
kubectl get pods -l app=dataflow-processor
# Описание конкретного пода
kubectl describe pod dataflow-<name>-<hash>
# Проверка использования ресурсов
kubectl top pod dataflow-<name>-<hash>
Дополнительные примеры
Больше примеров можно найти в директории dataflow/config/samples/:
kafka-to-postgres.yaml- базовый Kafka → PostgreSQLkafka-to-postgres-with-resources.yaml- пример с настройкой ресурсов и размещенияflatten-example.yaml- пример с Flatten трансформациейrouter-example.yaml- пример с Router трансформацией