Skip to content

Examples

Практические примеры использования DataFlow Operator для различных сценариев обработки данных.

Поток данных в каждом конвейере следует схеме Источник → Трансформации → Приёмник. См. Архитектура — Поток данных для концептуальной диаграммы.

Простой Kafka → PostgreSQL поток

Базовый пример передачи данных из Kafka топика в PostgreSQL таблицу.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-postgres
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: output_table
      autoCreateTable: true

Применение:

kubectl apply -f dataflow/config/samples/kafka-to-postgres.yaml

Kafka с режимом сырой записи (rawMode)

Пример сохранения полного контекста Kafka-сообщения: value + метаданные (offset, partition, timestamp, key, topic). Используйте rawMode: true в sink для сохранения сообщений как JSON с колонками value и _metadata.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-raw-to-clickhouse
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: clickhouse
    config:
      connectionString: "clickhouse://default@clickhouse:9000/default"
      table: raw_events
      autoCreateTable: true
      rawMode: true  # Сохраняет каждое сообщение как {"value": ..., "_metadata": {...}}

Формат выходного сообщения при rawMode (sink оборачивает используя msg.Metadata):

{
  "value": {"id": 1, "event": "user_login"},
  "_metadata": {
    "offset": 100,
    "partition": 0,
    "timestamp": "2024-02-27T10:13:20.000Z",
    "key": "user-123",
    "topic": "input-topic"
  }
}

Для sink, ожидающего только данные без обёртки, добавьте трансформацию select с полем value:

transformations:
  - type: select
    config:
      fields: ["value"]

С трансформациями (Flatten + Timestamp)

Пример обработки сообщений с массивом товаров, развертывание в отдельные сообщения и добавление временной метки.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: stock-flatten
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: stock-topic
      consumerGroup: dataflow-group
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: stock_items
      autoCreateTable: true
      batchSize: 50
  transformations:
    # Развернуть массив rowsStock в отдельные сообщения
    - type: flatten
      config:
        field: rowsStock
    # Добавить временную метку
    - type: timestamp
      config:
        fieldName: created_at

Входное сообщение:

{
  "type": "stock",
  "version": 32476984,
  "rowsStock": [
    {"sku": 400125868, "section": "A015"},
    {"sku": 400125868, "section": "A001"}
  ]
}

Выходные сообщения:

{
  "type": "stock",
  "version": 32476984,
  "sku": 400125868,
  "section": "A015",
  "created_at": "2024-01-15T10:30:00Z"
}

{
  "type": "stock",
  "version": 32476984,
  "sku": 400125868,
  "section": "A001",
  "created_at": "2024-01-15T10:30:00Z"
}

Обработка ошибок с error sink

Пример настройки отдельного приёмника для сообщений, которые не удалось записать в основной sink.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-postgres-with-errors
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: output_table
      autoCreateTable: true
  errors:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: error-topic

Применение:

kubectl apply -f dataflow/config/samples/kafka-to-postgres-with-errors.yaml

Структура сообщений об ошибках и детали конфигурации — в разделе Обработка ошибок.

С роутером для множественных приемников

Пример маршрутизации сообщений в разные приемники на основе условий.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: router-example
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: events
      consumerGroup: dataflow-group
  # Основной приемник для сообщений, не соответствующих условиям
  sink:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: default-events
  transformations:
    - type: router
      config:
        routes:
          # Ошибки → отдельный топик
          - condition: "$.level"
            sink:
              type: kafka
              config:
                brokers:
                  - localhost:9092
                topic: error-events
          # Предупреждения → PostgreSQL
          - condition: "$.priority"
            sink:
              type: postgresql
              config:
                connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
                table: warnings
                autoCreateTable: true

Входные сообщения:

{"level": "error", "message": "Critical error"}     // → error-events топик
{"priority": "high", "message": "Warning"}          // → warnings таблица
{"message": "Info"}                                 // → default-events топик

С фильтрацией и маскированием

Пример обработки пользовательских данных с фильтрацией активных пользователей, маскированием чувствительных данных и удалением внутренних полей.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: secure-pipeline
spec:
  source:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: users
      query: "SELECT * FROM users WHERE updated_at > NOW() - INTERVAL '1 hour'"
      pollInterval: 300
  sink:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: public-users
  transformations:
    # Фильтровать только активных пользователей
    - type: filter
      config:
        condition: "$.active"

    # Маскировать чувствительные данные
    - type: mask
      config:
        fields:
          - password
          - email
        keepLength: true

    # Удалить внутренние поля
    - type: remove
      config:
        fields:
          - internal_id
          - secret_token
          - debug_info

    # Добавить временную метку экспорта
    - type: timestamp
      config:
        fieldName: exported_at

Входное сообщение:

{
  "id": 1,
  "username": "john",
  "email": "john@example.com",
  "password": "secret123",
  "active": true,
  "internal_id": 999,
  "secret_token": "abc123"
}

Выходное сообщение:

{
  "id": 1,
  "username": "john",
  "email": "***********",
  "password": "*********",
  "active": true,
  "exported_at": "2024-01-15T10:30:00Z"
}

PostgreSQL → Kafka с выбором полей

Пример чтения из PostgreSQL, выборки определенных полей и отправки в Kafka.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: postgres-to-kafka-select
spec:
  source:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: orders
      query: "SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '1 day'"
      pollInterval: 60
  sink:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: order-events
  transformations:
    - type: select
      config:
        fields:
          - order_id
          - customer_id
          - total
          - status
          - created_at
    - type: timestamp
      config:
        fieldName: processed_at

PostgreSQL → PostgreSQL (репликация / ETL)

Пример чтения данных из одной PostgreSQL базы и записи преобразованных данных в другую PostgreSQL базу.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: postgres-to-postgres
spec:
  source:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@source-postgres:5432/source_db?sslmode=disable"
      table: source_orders
      query: "SELECT * FROM source_orders WHERE updated_at > NOW() - INTERVAL '5 minutes'"
      pollInterval: 60
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@target-postgres:5432/target_db?sslmode=disable"
      table: target_orders
      autoCreateTable: true
      batchSize: 100
      upsertMode: true  # Включает обновление существующих записей вместо пропуска
  transformations:
    # Оставляем только нужные поля
    - type: select
      config:
        fields:
          - id
          - customer_id
          - total
          - status
          - updated_at
    # Добавляем время синхронизации
    - type: timestamp
      config:
        fieldName: synced_at

Варианты использования:

  • Онлайн-репликация: периодическое копирование обновленных записей из операционной БД в аналитическую
  • ETL-пайплайн: подготовка и очистка данных при переносе между схемами/кластерами PostgreSQL

Важно: При использовании upsertMode: true существующие записи в целевой таблице будут обновляться при конфликте по PRIMARY KEY (или указанному conflictKey). Без upsertMode обновленные записи из источника будут пропускаться, если они уже существуют в целевой таблице.

Комплексный пример: ETL пайплайн

Полноценный ETL пайплайн с множественными трансформациями.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: etl-pipeline
spec:
  source:
    type: kafka
    config:
      brokers:
        - kafka1:9092
        - kafka2:9092
      topic: raw-events
      consumerGroup: etl-group
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/analytics?sslmode=disable"
      table: processed_events
      autoCreateTable: true
      batchSize: 100
  transformations:
    # 1. Развернуть вложенные массивы
    - type: flatten
      config:
        field: items

    # 2. Добавить временную метку обработки
    - type: timestamp
      config:
        fieldName: processed_at
        format: RFC3339

    # 3. Фильтровать только валидные события
    - type: filter
      config:
        condition: "$.valid"

    # 4. Маскировать PII данные
    - type: mask
      config:
        fields:
          - user.email
          - user.phone
        keepLength: true

    # 5. Удалить отладочную информацию
    - type: remove
      config:
        fields:
          - debug
          - internal_metadata
          - test_flag

    # 6. Выбрать только нужные поля для финального результата
    - type: select
      config:
        fields:
          - event_id
          - user.id
          - item.sku
          - item.quantity
          - processed_at

Kafka → Kafka с роутингом по типам

Пример чтения из одного Kafka топика и маршрутизации в разные топики на основе типа события.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-router
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: all-events
      consumerGroup: router-group
  sink:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: default-events
  transformations:
    - type: router
      config:
        routes:
          - condition: "$.type"
            sink:
              type: kafka
              config:
                brokers:
                  - localhost:9092
                topic: user-events
          - condition: "$.category"
            sink:
              type: kafka
              config:
                brokers:
                  - localhost:9092
                topic: product-events

Использование Secrets для credentials

DataFlow Operator поддерживает конфигурацию коннекторов из Kubernetes Secrets через поля *SecretRef.

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
  namespace: default
type: Opaque
stringData:
  brokers: "kafka1:9092,kafka2:9092"
  topic: "input-topic"
  consumerGroup: "dataflow-group"
  username: "kafka-user"
  password: "kafka-password"
---
apiVersion: v1
kind: Secret
metadata:
  name: postgres-credentials
  namespace: default
type: Opaque
stringData:
  connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
  table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: secure-dataflow
spec:
  source:
    type: kafka
    config:
      brokersSecretRef:
        name: kafka-credentials
        key: brokers
      topicSecretRef:
        name: kafka-credentials
        key: topic
      consumerGroupSecretRef:
        name: kafka-credentials
        key: consumerGroup
      sasl:
        mechanism: scram-sha-256
        usernameSecretRef:
          name: kafka-credentials
          key: username
        passwordSecretRef:
          name: kafka-credentials
          key: password
  sink:
    type: postgresql
    config:
      connectionStringSecretRef:
        name: postgres-credentials
        key: connectionString
      tableSecretRef:
        name: postgres-credentials
        key: table
      autoCreateTable: true

Применение:

kubectl apply -f dataflow/config/samples/kafka-to-postgres-secrets.yaml

Поддерживаемые поля, TLS сертификаты и устранение неполадок — в разделе Использование Secrets в Kubernetes.

Мониторинг и отладка

Проверка статуса DataFlow

# Получить список всех DataFlow
kubectl get dataflow

# Детальная информация
kubectl describe dataflow <name>

# Статус в формате YAML
kubectl get dataflow <name> -o yaml

Просмотр логов

# Логи оператора
kubectl logs -l app.kubernetes.io/name=dataflow-operator -f

# События Kubernetes
kubectl get events --sort-by='.lastTimestamp' | grep dataflow

Проверка обработанных сообщений

Статус DataFlow содержит метрики:

status:
  phase: Running
  processedCount: 1500
  errorCount: 2
  lastProcessedTime: "2024-01-15T10:30:00Z"
  message: "Processing messages successfully"

Рекомендации

Производительность

  • Используйте batchSize для PostgreSQL приемников
  • Настройте правильный pollInterval для PostgreSQL источников
  • Используйте несколько инстансов оператора для масштабирования

Безопасность

  • Используйте Kubernetes Secrets для credentials
  • Включайте TLS для Kafka соединений
  • Маскируйте чувствительные данные перед отправкой

Надежность

  • Настройте правильные consumer groups для Kafka
  • Мониторьте статус DataFlow ресурсов

Высоконагруженный Kafka-пайплайн

При высокой скорости сообщений Kafka (десятки тысяч msg/s) увеличьте channelBufferSize и batchSize в sink:

spec:
  channelBufferSize: 500   # по умолчанию 100; снижает блокировки, когда sink медленнее source
  source:
    type: kafka
    config:
      brokers: [localhost:9092]
      topic: high-volume-topic
      consumerGroup: dataflow-group
  sink:
    type: postgresql
    config:
      connectionString: "..."
      table: events
      batchSize: 500
      batchFlushIntervalSeconds: 2

Настройка ресурсов и размещения подов

Каждый ресурс DataFlow создает отдельный под (Deployment) для обработки данных. Вы можете настроить ресурсы, выбор нод, affinity и tolerations для этих подов.

Пример: Кастомные ресурсы и выбор нод

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-postgres-with-resources
spec:
  source:
    type: kafka
    config:
      brokers:
        - localhost:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: postgresql
    config:
      connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
      table: output_table
  # Настройка ресурсов для пода процессора
  resources:
    requests:
      cpu: "200m"
      memory: "256Mi"
    limits:
      cpu: "1000m"
      memory: "1Gi"
  # Выбор нод для размещения пода
  nodeSelector:
    node-type: compute
    zone: us-east-1
  # Правила affinity для более точного контроля размещения
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: kubernetes.io/arch
            operator: In
            values:
            - amd64
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        preference:
          matchExpressions:
          - key: node-type
            operator: In
            values:
            - compute
    podAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 50
        podAffinityTerm:
          labelSelector:
            matchExpressions:
            - key: app
              operator: In
              values:
              - dataflow-processor
          topologyKey: kubernetes.io/hostname
  # Tolerations для работы с tainted нодами
  tolerations:
  - key: dedicated
    operator: Equal
    value: dataflow
    effect: NoSchedule
  - key: workload-type
    operator: Equal
    value: batch
    effect: NoSchedule

Применение:

kubectl apply -f dataflow/config/samples/kafka-to-postgres-with-resources.yaml

Настройка ресурсов

  • resources: Определяет запросы и лимиты CPU и памяти для пода процессора
  • Если не указано, используются значения по умолчанию: 100m CPU / 128Mi памяти (requests), 500m CPU / 512Mi памяти (limits)
  • Используйте это для обеспечения достаточных ресурсов для высоконагруженной обработки

Выбор нод

  • nodeSelector: Простые пары ключ-значение для выбора конкретных нод
  • Пример: node-type: compute гарантирует, что поды будут запускаться только на нодах с меткой node-type=compute

Правила Affinity

  • affinity: Продвинутые правила размещения с использованием Kubernetes affinity
  • nodeAffinity: Контроль того, на каких нодах могут запускаться поды
  • podAffinity: Предпочтение запуска подов рядом с другими подами (например, другими процессорами dataflow)
  • podAntiAffinity: Избегание запуска подов рядом с другими подами (например, распределение по нодам)

Tolerations

  • tolerations: Позволяют подам запускаться на tainted нодах
  • Полезно для выделенных compute нод или специализированного оборудования
  • Пример: Запуск процессоров dataflow на нодах, выделенных для batch workloads

Поведение по умолчанию

Если ресурсы, nodeSelector, affinity или tolerations не указаны: - Применяются ресурсы по умолчанию (100m CPU / 128Mi памяти requests, 500m CPU / 512Mi памяти limits) - Поды могут запускаться на любой ноде (нет nodeSelector) - Не применяются правила affinity - Поды не могут запускаться на tainted нодах (нет tolerations)

Проверка статуса подов

После создания DataFlow с кастомными ресурсами проверьте под:

# Список подов, созданных DataFlow
kubectl get pods -l app=dataflow-processor

# Описание конкретного пода
kubectl describe pod dataflow-<name>-<hash>

# Проверка использования ресурсов
kubectl top pod dataflow-<name>-<hash>

Дополнительные примеры

Больше примеров можно найти в директории dataflow/config/samples/:

  • kafka-to-postgres.yaml - базовый Kafka → PostgreSQL
  • kafka-to-postgres-with-resources.yaml - пример с настройкой ресурсов и размещения
  • flatten-example.yaml - пример с Flatten трансформацией
  • router-example.yaml - пример с Router трансформацией