Перейти к содержанию

Connectors

DataFlow Operator поддерживает различные коннекторы для источников и приемников данных. Каждый коннектор реализует стандартный интерфейс и может использоваться как источник (source) или приемник (sink) данных.

Обзор коннекторов

Коннектор Источник Приемник Особенности
Kafka Consumer groups, TLS, SASL, Avro, Schema Registry
PostgreSQL SQL запросы, батч-вставки, автосоздание таблиц, UPSERT режим
Trino SQL запросы, аутентификация Keycloak OAuth2, батч-вставки
ClickHouse Опрос таблиц, батч-вставки, автосоздание MergeTree таблиц
Nessie Iceberg через каталог Nessie, опрос, батч-дозапись, sink rawMode (data + _metadata); spec.replicas только 1
Iceberg Apache Iceberg через официальный REST Catalog API; опрос, батч-дозапись, Bearer/Basic/OAuth2, опционально S3; spec.replicas только 1

Kafka

Kafka коннектор поддерживает чтение и запись сообщений из/в топики Apache Kafka. Поддерживает consumer groups для масштабирования, TLS и SASL аутентификацию, а также работу с Avro форматом через Confluent Schema Registry или статическую схему.

Источник (Source)

Конфигурация Kafka источника:

source:
  type: kafka
  config:
    # Список брокеров Kafka (обязательно)
    brokers:
      - kafka1:9092
      - kafka2:9092
      - kafka3:9092

    # Топик для чтения (обязательно)
    topic: input-topic

    # Consumer group (опционально, по умолчанию: dataflow-operator)
    consumerGroup: my-group

    # Протокол безопасности Kafka (опционально)
    # Значения: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
    # Если не указано — протокол выводится из секций tls/sasl (обратная совместимость)
    securityProtocol: SASL_PLAINTEXT

    # TLS конфигурация (опционально)
    tls:
      # Пропустить проверку сертификата (не рекомендуется для production)
      insecureSkipVerify: false
      # Путь к CA сертификату
      caFile: /path/to/ca.crt
      # Путь к клиентскому сертификату
      certFile: /path/to/client.crt
      # Путь к приватному ключу
      keyFile: /path/to/client.key

    # SASL аутентификация (опционально)
    sasl:
      # Механизм: plain, scram-sha-256, scram-sha-512
      mechanism: scram-sha-256
      username: kafka-user
      password: kafka-password

    # Формат сообщений (опционально, по умолчанию: "json")
    # Поддерживаемые форматы: "json", "avro"
    format: json

    # Конфигурация Avro (требуется если format: "avro")
    # Вариант 1: Использование Confluent Schema Registry (рекомендуется)
    schemaRegistry:
      # URL Schema Registry (обязательно)
      url: https://schema-registry:8081
      # Basic Auth для Schema Registry (опционально)
      basicAuth:
        username: schema-user
        password: schema-password
      # TLS конфигурация для Schema Registry (опционально)
      tls:
        insecureSkipVerify: false
        caFile: /path/to/schema-registry-ca.crt

    # Вариант 2: Статическая Avro схема (альтернатива Schema Registry)
    # avroSchema: |
    #   {
    #     "type": "record",
    #     "name": "MyRecord",
    #     "fields": [
    #       {"name": "id", "type": "long"},
    #       {"name": "name", "type": "string"}
    #     ]
    #   }
    # Или путь к файлу со схемой:
    # avroSchemaFile: /path/to/schema.avsc

Особенности Kafka источника

  • Consumer Groups: Используйте разные consumer groups для масштабирования обработки
  • Начальная позиция: По умолчанию читает с самого старого сообщения (OffsetOldest)
  • Форматы данных: Поддерживает JSON (по умолчанию) и Avro форматы
  • Avro поддержка:
  • Confluent Schema Registry: Автоматическое получение схем по ID из сообщений (формат: magic byte + schema ID + data)
  • Статическая схема: Использование предопределенной схемы из конфигурации или файла
  • Кэширование схем: Схемы из Schema Registry кэшируются для повышения производительности
  • Метаданные: Каждое сообщение содержит метаданные:
  • topic - название топика
  • partition - номер партиции
  • offset - смещение сообщения
  • key - ключ сообщения (если есть)
  • securityProtocol: Явная настройка Kafka security.protocol. Поддерживаемые значения: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL (регистр и -/_ нормализуются). Если поле не задано, протокол определяется неявно: только saslSASL_PLAINTEXT, tls + saslSASL_SSL, только tlsSSL, без обоих → PLAINTEXT. При явном указании webhook проверяет согласованность с секциями tls/sasl (например, SASL_PLAINTEXT требует sasl и не допускает tls).

Пример с TLS и SASL

source:
  type: kafka
  config:
    brokers:
      - secure-kafka:9093
    topic: secure-topic
    consumerGroup: secure-group
    securityProtocol: SASL_SSL
    tls:
      caFile: /etc/kafka/ca.crt
      certFile: /etc/kafka/client.crt
      keyFile: /etc/kafka/client.key
    sasl:
      mechanism: scram-sha-512
      username: kafka-user
      password: ${KAFKA_PASSWORD}  # Используйте Secrets в Kubernetes

Пример с SASL_PLAINTEXT (без TLS)

source:
  type: kafka
  config:
    brokers:
      - kafka1:9092
      - kafka2:9092
    topic: input-topic
    consumerGroup: my-group
    securityProtocol: SASL_PLAINTEXT
    sasl:
      mechanism: scram-sha-256
      usernameSecretRef:
        name: kafka-credentials
        key: username
      passwordSecretRef:
        name: kafka-credentials
        key: password

Пример с Avro и Schema Registry

source:
  type: kafka
  config:
    brokers:
      - kafka:9092
    topic: avro-topic
    consumerGroup: avro-group
    format: avro
    schemaRegistry:
      url: https://schema-registry:8081
      basicAuth:
        username: schema-user
        password: schema-password
      tls:
        insecureSkipVerify: true  # Для self-signed сертификатов
        # caFile: /path/to/ca.crt  # Или укажите CA сертификат

Пример с Avro и статической схемой

source:
  type: kafka
  config:
    brokers:
      - kafka:9092
    topic: avro-topic
    consumerGroup: avro-group
    format: avro
    avroSchema: |
      {
        "type": "record",
        "name": "Stock",
        "namespace": "com.example",
        "fields": [
          {"name": "id", "type": "long"},
          {"name": "symbol", "type": "string"},
          {"name": "price", "type": "double"},
          {"name": "quantity", "type": "int"}
        ]
      }
    # Или используйте файл:
    # avroSchemaFile: /path/to/schema.avsc

Приемник (Sink)

Конфигурация Kafka приемника:

sink:
  type: kafka
  config:
    # Список брокеров Kafka (обязательно)
    brokers:
      - kafka1:9092

    # Топик для записи (обязательно)
    topic: output-topic

    # Протокол безопасности (опционально, см. источник)
    securityProtocol: SASL_PLAINTEXT

    # TLS конфигурация (опционально, аналогично источнику)
    tls:
      caFile: /path/to/ca.crt
      certFile: /path/to/client.crt
      keyFile: /path/to/client.key

    # SASL конфигурация (опционально, аналогично источнику)
    sasl:
      mechanism: scram-sha-256
      username: kafka-user
      password: kafka-password

Особенности Kafka приемника

  • Синхронная запись: Использует SyncProducer для гарантированной доставки
  • Ключи сообщений: Сохраняет ключи из метаданных сообщения
  • Метаданные: После записи обновляет метаданные с partition и offset

PostgreSQL

PostgreSQL коннектор поддерживает чтение из таблиц и запись в таблицы PostgreSQL. Поддерживает кастомные SQL запросы, периодический опрос, батч-вставки, CDC-стиль отслеживания изменений (INSERT и UPDATE), soft delete и SecretRef для учётных данных.

Для нативного CDC через logical replication (WAL / pgoutput) используйте отдельный тип источника postgresql-cdc.

Источник (Source)

Конфигурация PostgreSQL источника:

source:
  type: postgresql
  config:
    # Connection string (обязательно, или connectionStringSecretRef)
    # Формат: postgres://user:password@host:port/dbname?sslmode=mode
    connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"

    # Таблица для чтения (обязательно, если не указан query). Поддерживает schema.table (напр. public.products)
    table: source_table

    # Кастомный SQL запрос (опционально)
    # Если указан, используется вместо чтения всей таблицы
    query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"

    # Интервал опроса в секундах (опционально, по умолчанию: 5)
    pollInterval: 60

    # Опции CDC-стиля (опционально)
    readBatchSize: 1000           # Ограничение строк за один опрос (0 = без лимита)
    changeTrackingColumn: updated_at  # Колонка для отслеживания изменений (по умолчанию: updated_at). В query mode задайте явно для инкрементальной обёртки subquery
    orderByColumn: id             # Вторичный ключ сортировки для стабильной пагинации (по умолчанию: id). Пример: price_id
    autoCreateTable: true         # Создать таблицу, если не существует, перед чтением

    # SecretRef (опционально)
    # connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
    # tableSecretRef: { name: postgres-credentials, key: table }

Особенности PostgreSQL источника

  • Периодический опрос: Регулярно опрашивает таблицу на наличие новых данных
  • Кастомные запросы: Поддержка сложных SQL запросов с JOIN, WHERE, и т.д.
  • Метаданные: Каждое сообщение содержит метаданные table и operation (insert/update)
  • Размер батча чтения: Ограничивает количество строк за один опрос для снижения нагрузки на БД
  • Отслеживание обновлений: По умолчанию отслеживает изменения по колонке updated_at (или changeTrackingColumn), захватывает INSERT и UPDATE
  • Стабильная сортировка: В table mode добавляется ORDER BY changeTrackingColumn, orderByColumn (по умолчанию вторичный ключ — id). При одновременном указании query и changeTrackingColumn SQL оборачивается в subquery с composite checkpoint (changeTrackingColumn, orderByColumn) > (lastTime, lastId) и ORDER BY. Без явного changeTrackingColumn query mode выполняет SQL как есть (legacy). Значение колонки orderByColumn попадает в metadata id.
  • Требования query mode: При инкрементальном query mode в SELECT должны быть колонки changeTrackingColumn и orderByColumn, иначе checkpoint не продвигается.
  • Автосоздание таблицы: При autoCreateTable: true создаёт таблицу с CDC-совместимой схемой (id SERIAL PRIMARY KEY, created_at, updated_at), если не существует. Создание выполняется при подключении (Connect).
  • Схема таблицы: Имя таблицы поддерживает формат schema.table (напр. public.products)
  • Персистенция checkpoint: По умолчанию позиция чтения (lastReadChangeTime, lastReadOrderByValue) сохраняется в ConfigMap и при перезапуске чтение продолжается с последней позиции. Задайте checkpointPersistence: false в spec, чтобы хранить только в памяти. Для pg→pg включите upsertMode: true в sink, чтобы дубликаты обновлялись, а не вставлялись повторно.

Разовая миграция (batch exhaust)

Для однократного копирования снимка (например, job DataFlowCron, который должен завершиться после чтения всех строк):

  1. Предпочтительно MATERIALIZED VIEW на source DB для тяжёлого бизнес-SQL: CREATE MATERIALIZED VIEW + REFRESH, затем table указывает на MVIEW.
  2. changeTrackingColumn и orderByColumn — одна и та же колонка-ключ без timestamp (например material_id), если в результате нет updated_at.
  3. readBatchSize (например 10000) для батчевого чтения.
  4. В sink: upsertMode: true, явный conflictKey, совпадающие типы колонок (например BIGINT для числовых ID, не TEXT).
  5. Processor останавливается, когда следующий poll возвращает 0 строк (source exhausted). Checkpoint продвигается через Ack после успешной записи в sink.

Пример source:

source:
  type: postgresql
  config:
    table: price_calendar.mv_one_p_prices_migration
    changeTrackingColumn: material_id
    orderByColumn: material_id
    readBatchSize: 10000
    pollInterval: 5

Тот же паттерн применим к инкрементальным source Trino и ClickHouse. Kafka и Nessie используют другую модель offset/snapshot.

Пример с кастомным запросом

source:
  type: postgresql
  config:
    connectionString: "postgres://user:password@localhost:5432/analytics"
    query: |
      SELECT
        u.id,
        u.email,
        o.order_id,
        o.total
      FROM users u
      JOIN orders o ON u.id = o.user_id
      WHERE o.created_at > NOW() - INTERVAL '1 day'
    pollInterval: 300  # Опрос каждые 5 минут

Приемник (Sink)

Конфигурация PostgreSQL приемника:

sink:
  type: postgresql
  config:
    connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
    # Таблица для записи. Поддерживает schema.table (напр. public.products_clone)
    table: target_table

    # Размер батча (опционально, по умолчанию: 1). 0 — сброс только по таймеру
    batchSize: 100
    # Интервал сброса в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
    batchFlushIntervalSeconds: 10
    autoCreateTable: true

    # Режим rawMode (опционально, по умолчанию: false)
    # При true ожидает сообщения в формате {"value": <данные>, "_metadata": {...}} или plain + msg.Metadata
    # Таблица создаётся с колонками data JSONB и _metadata JSONB
    rawMode: false

    # Режим UPSERT (опционально, по умолчанию: false)
    upsertMode: true
    conflictKey: "id"
    upsertStrategy: ifNewer       # always (по умолчанию) | ifNewer
    upsertVersionColumn: updated_at  # обязательно при upsertStrategy: ifNewer
    # Колонка для soft delete (опционально)
    softDeleteColumn: "deleted_at"

    # SecretRef (опционально)
    # connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
    # tableSecretRef: { name: postgres-credentials, key: table }

Особенности PostgreSQL приемника

  • Батч-вставки: Группирует сообщения для эффективной записи. Сброс при достижении batchSize или по таймеру. Только по размеру: batchFlushIntervalSeconds: 0. Только по времени: batchSize: 0
  • Автосоздание таблиц (при autoCreateTable: true):
  • rawMode: true — таблица создаётся при подключении (Connect). Схема: id SERIAL PRIMARY KEY, data JSONB, _metadata JSONB, created_at, updated_at, deleted_at, GIN индекс по data
  • rawMode: false — таблица создаётся при первой записи из структуры первого сообщения (реплицирует схему источника). Типы колонок выводятся автоматически (TEXT, BIGINT, NUMERIC, JSONB и т.д.)
  • Режим rawMode: При true тело сообщения пишется в колонку data, metadata — в _metadata (или в отдельные колонки с flattenMetadataColumns). При false использует колоночный формат из сообщения
  • flattenMetadataColumns: при rawMode: true можно развернуть metadata в отдельные колонки (см. flattenMetadataColumns)
  • UPSERT режим: Обновляет существующие записи при конфликте по PRIMARY KEY или conflictKey. Batch flush выполняется в явной транзакции PostgreSQL.
  • Стратегия upsert: always (по умолчанию) — обновление при каждом конфликте; ifNewer — только если входящее значение upsertVersionColumn больше сохранённого (защита от устаревших replay).
  • Soft delete: При заданном softDeleteColumn и metadata.operation=delete выполняет UPDATE ... SET deleted_at = NOW() вместо физического DELETE

Пример с автосозданием таблицы (rawMode)

sink:
  type: postgresql
  config:
    connectionString: "postgres://user:password@localhost:5432/analytics"
    table: events
    autoCreateTable: true
    rawMode: true  # Таблица с data и _metadata
    batchSize: 50

При rawMode: true и autoCreateTable: true таблица создаётся со структурой:

CREATE TABLE events (
    id SERIAL PRIMARY KEY,
    data JSONB NOT NULL,
    _metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    deleted_at TIMESTAMP
);
CREATE INDEX idx_events_value ON events USING GIN (value);

При rawMode: false структура таблицы выводится из первого сообщения (колоночный формат).

Пример с UPSERT для обновления существующих записей

sink:
  type: postgresql
  config:
    connectionString: "postgres://user:password@localhost:5432/analytics"
    table: orders
    upsertMode: true  # Включает обновление существующих записей
    conflictKey: "id"  # Использует поле 'id' для определения конфликта
    batchSize: 100

При включенном upsertMode: - Если запись с таким же id (или указанным conflictKey) уже существует, она будет обновлена - Если записи нет, она будет вставлена - Это особенно полезно для синхронизации данных, когда источник периодически отправляет обновленные записи

Важно: Для работы UPSERT таблица должна иметь PRIMARY KEY или UNIQUE constraint на указанном conflictKey.

PostgreSQL CDC (logical replication)

Нативный Change Data Capture через logical replication PostgreSQL (pgoutput). Читает INSERT, UPDATE и DELETE из WAL через replication slot и publication. Пример: config/samples/postgresql-cdc-to-postgres.yaml.

Требования

PostgreSQL должен иметь включённую logical replication:

-- postgresql.conf
wal_level = logical
max_replication_slots >= 1
max_wal_senders >= 1

-- pg_hba.conf (пример)
host replication repl_user 0.0.0.0/0 scram-sha-256

-- privileges
GRANT REPLICATION ON DATABASE db TO repl_user;

Пользователю нужны права REPLICATION и создание publication/slot (или создайте их заранее). Для таблиц без PK: ALTER TABLE ... REPLICA IDENTITY FULL.

Источник (Source)

source:
  type: postgresql-cdc
  config:
    connectionString: "postgres://repl_user:pass@pg:5432/db?sslmode=disable"
    slotName: dataflow_orders_slot
    publicationName: dataflow_orders_pub
    tables:
      - public.orders
      - public.customers
    snapshotMode: initial          # initial | never | always (по умолчанию: initial)
    createSlotIfNotExists: true
    createPublicationIfNotExists: true
    heartbeatIntervalSeconds: 10   # standby status при простое (0 = выключить)
    primaryKeyColumn: id           # metadata.id (по умолчанию: id)
    includeColumns: []             # опциональный allow-list колонок
    excludeColumns: []
    envelopeFormat: row            # row (по умолчанию) | debezium

Особенности

  • Streaming WAL — continuous logical replication (не polling)
  • Initial snapshotsnapshotMode: initial копирует существующие строки перед streaming
  • Checkpoint — LSN в ConfigMap (postgresql-cdc), продвигается только после Ack sink
  • Metadata: operation (insert/update/delete), table, lsn, id (при наличии PK)
  • Multi-table — одна publication, фильтр через список tables
  • Schema evolution — relation cache обновляется при каждом RelationMessageV2 (ALTER ADD/DROP COLUMN)
  • Debezium envelopeenvelopeFormat: debezium для payload.before/after/op/source (совместимо с debeziumUnwrap)
  • replicas: только 1

Идемпотентный sink

Используйте ackGranularity: message и идемпотентный sink (upsertMode + conflictKey), чтобы минимизировать дубликаты при restart.

Отличия от Debezium

Возможность Debezium postgresql-cdc
Initial snapshot op: r То же при envelopeFormat: debezium
UPDATE before/after В envelope Оба поля, когда WAL содержит old tuple
Schema history topic Да Нет — inline refresh relation cache
DDL events Отдельные сообщения Не эмитятся (только refresh схемы)
Kafka Connect Нужен Не нужен

ClickHouse

ClickHouse коннектор поддерживает чтение из таблиц и запись в таблицы ClickHouse. Поддерживает периодический опрос для инкрементального чтения, кастомные SQL запросы, батч-вставки и автосоздание MergeTree таблиц.

Источник (Source)

source:
  type: clickhouse
  config:
    # Connection string (обязательно)
    # Native: clickhouse://host:9000?username=default&password=xxx&database=default
    # HTTP: http://host:8123/default?username=default&password=xxx
    connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"

    # Таблица для чтения (обязательно, если не указан query)
    table: source_table

    # Кастомный SQL запрос (опционально)
    query: "SELECT * FROM source_table WHERE id > 100"

    # Интервал опроса в секундах (опционально, по умолчанию: 5)
    pollInterval: 60

    # Колонка для инкрементальной пагинации и стабильного ORDER BY (опционально, по умолчанию: id)
    orderByColumn: price_id

    # Колонка для отслеживания изменений (опционально, по умолчанию: created_at)
    changeTrackingColumn: created_at

    # Лимит строк за один запрос в цикле опроса (опционально, 0 = без лимита)
    readBatchSize: 1000

Особенности ClickHouse источника

  • Опрос: Периодически опрашивает таблицу на наличие новых данных (интервал задаётся pollInterval)
  • Батчинг чтения: readBatchSize ограничивает число строк за запрос в цикле опроса (как у PostgreSQL source)
  • Инкрементальное чтение: Table mode использует composite checkpoint (changeTrackingColumn, orderByColumn) > (lastTime, lastKey) с tuple WHERE; legacy checkpoint только с lastReadOrderByValueWHERE orderByColumn > N до появления timestamp
  • Кастомные запросы: При указании query и changeTrackingColumn SQL оборачивается в subquery с composite filter; без явного changeTrackingColumn query mode выполняет SQL как есть (legacy)
  • Стабильная сортировка: Table mode — ORDER BY changeTrackingColumn, orderByColumn (по умолчанию created_at, id)
  • Персистенция checkpoint: Позиция (lastReadChangeTime, lastReadOrderByValue) сохраняется в ConfigMap; legacy lastReadID/lastReadTime мигрируются при загрузке
  • Метаданные: table и id (значение колонки orderByColumn, если есть)

Приемник (Sink)

sink:
  type: clickhouse
  config:
    connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
    table: output_table

    # Размер батча для вставок (опционально, по умолчанию: 100). 0 — сброс только по таймеру
    batchSize: 100

    # Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
    batchFlushIntervalSeconds: 10

    # Автосоздание таблицы (опционально)
    autoCreateTable: true

    # rawMode: false (по умолчанию) — создаёт таблицу по структуре первого сообщения (колоночный формат, репликация схемы источника)
    # rawMode: true — создаёт MergeTree таблицу с колонками data String, created_at (JSON-хранилище)
    rawMode: false

    # Идемпотентная запись (опционально)
    upsertMode: true
    conflictKey: id
    replacingVersionColumn: updated_at
    tableEngine: ReplacingMergeTree

Особенности ClickHouse приемника

  • Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении batchSize или по таймеру (10 с). Только по размеру: batchFlushIntervalSeconds: 0. Только по времени: batchSize: 0
  • Upsert mode: При upsertMode: true автосозданные таблицы используют ReplacingMergeTree; дедупликация при background merge. См. Отказоустойчивость.
  • Retry при транзиентных ошибках: Batch-запись автоматически повторяется при connection refused, TOO_MANY_PARTS, memory limit, HTTP 502/503 и подобных ошибках (до 5 попыток с экспоненциальным backoff)
  • Автосоздание таблиц (при autoCreateTable: true):
  • rawMode: true — таблица создаётся при подключении (Connect). Схема: data String, created_at DateTime DEFAULT now(), движок MergeTree, ORDER BY created_at. Сообщения сохраняются как JSON-строка в колонке data. С flattenMetadataColumns: truedata + отдельные колонки metadata (см. ниже)
  • rawMode: false — таблица создаётся при первой записи из структуры первого сообщения. Типы колонок выводятся автоматически (String, Int32/Int64, Float64, Decimal, DateTime и т.д.). Поддерживается формат {"value": {...}, "_metadata": {...}} — используются поля из value
  • Режим rawMode: При true ожидает/хранит JSON в колонке data. При false использует колоночный формат из сообщения (INSERT с именованными колонками)

Отказоустойчивость ClickHouse

  • Error sink: Используйте spec.errors с ClickHouse (или Kafka) для сохранения неудачных сообщений. См. Обработка ошибок.
  • Connection string: Рекомендуемые параметры: dial_timeout=10s, max_execution_time=60 (например, clickhouse://host:9000/default?dial_timeout=10s&max_execution_time=60).
  • Настройка batch: Для пропускной способности и устойчивости используйте batchSize 100–1000 и batchFlushIntervalSeconds 5–10. Большие батчи снижают частоту вставок и помогают избежать TOO_MANY_PARTS.
  • TOO_MANY_PARTS: При этой ошибке увеличьте batchSize, уменьшите частоту flush или настройте merge в ClickHouse (background_pool_size, parts_to_throw_insert). Рассмотрите ReplacingMergeTree для дедупликации. См. Отказоустойчивость.

Пример: Kafka → ClickHouse

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-clickhouse
spec:
  source:
    type: kafka
    config:
      brokers:
        - kafka:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: clickhouse
    config:
      connectionString: "clickhouse://default@clickhouse:9000/default"
      table: output_table
      batchSize: 100
      autoCreateTable: true

Пример: ClickHouse → ClickHouse (репликация схемы)

При autoCreateTable: true и rawMode: false целевая таблица создаётся автоматически по структуре источника:

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: clickhouse-to-clickhouse
spec:
  source:
    type: clickhouse
    config:
      connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
      table: products
      pollInterval: 5
  sink:
    type: clickhouse
    config:
      connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
      table: products_clone
      batchSize: 100
      autoCreateTable: true

Trino

Trino коннектор поддерживает чтение из таблиц и запись в таблицы Trino (ранее PrestoSQL). Поддерживает SQL запросы, аутентификацию через Keycloak OAuth2/OIDC и батч-вставки.

Источник (Source)

Конфигурация Trino источника:

source:
  type: trino
  config:
    # URL сервера Trino (обязательно)
    serverURL: "http://trino:8080"

    # Каталог для использования (обязательно)
    catalog: hive

    # Схема для использования (обязательно)
    schema: default

    # Таблица для чтения (обязательно, если не указан query)
    table: source_table

    # Кастомный SQL запрос (опционально)
    # Если указан, используется вместо чтения всей таблицы
    query: "SELECT * FROM hive.default.source_table WHERE id > 100"

    # Интервал опроса в секундах (опционально, по умолчанию: 5)
    # Используется для периодического чтения новых данных
    pollInterval: 60

    # Колонка для инкрементальной пагинации и стабильного ORDER BY (опционально, по умолчанию: id)
    orderByColumn: price_id

    # Лимит строк за один запрос в цикле опроса (опционально, 0 = без лимита)
    readBatchSize: 1000

    # Аутентификация Keycloak (опционально)
    keycloak:
      # Вариант 1: Использование долгоживущего токена напрямую (рекомендуется для долгоживущих токенов)
      token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."

      # Вариант 2: Использование OAuth2 flow (альтернатива прямому токену)
      # serverURL: "https://keycloak.example.com"
      # realm: myrealm
      # clientID: trino-client
      # clientSecret: client-secret
      # username: trino-user
      # password: trino-password

Особенности Trino источника

  • SQL запросы: При указании query и changeTrackingColumn SQL оборачивается с composite checkpoint filter; иначе — только ORDER BY orderByColumn (legacy)
  • Батчинг чтения: readBatchSize ограничивает число строк за запрос в цикле опроса (как у PostgreSQL source)
  • Периодический опрос: Table mode использует composite checkpoint (changeTrackingColumn, orderByColumn) > (lastTime, lastKey); legacy lastReadIDWHERE orderByColumn > N до появления timestamp
  • Персистенция checkpoint: lastReadChangeTime и lastReadOrderByValue в ConfigMap; legacy lastReadID мигрируется при загрузке
  • Метаданные: Значение колонки orderByColumn попадает в metadata id
  • Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
  • Прямой токен: Использование долгоживущего токена, полученного из Keycloak (рекомендуется для долгоживущих токенов)
  • Password Grant: Использование username/password для аутентификации
  • Client Credentials: Использование client ID/secret для сервис-к-сервис аутентификации
  • Автоматическое обновление токенов: Токены автоматически обновляются до истечения срока действия (только для OAuth2 flow, не для прямых токенов)

Получение токена из Keycloak

Для использования долгоживущего токена вы можете получить его из Keycloak следующими способами:

Способ 1: Использование curl (Password Grant)

curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=password" \
  -d "client_id=trino-client" \
  -d "client_secret=client-secret" \
  -d "username=trino-user" \
  -d "password=trino-password"

Способ 2: Использование curl (Client Credentials)

curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=client_credentials" \
  -d "client_id=trino-client" \
  -d "client_secret=client-secret"

Ответ будет содержать поле access_token. Используйте это значение токена в поле token конфигурации Keycloak.

Примечание: Для долгоживущих токенов настройте время жизни токена в настройках realm или клиента Keycloak. - Метаданные: Каждое сообщение содержит метаданные: - catalog - название каталога - schema - название схемы - table - название таблицы

Пример с кастомным запросом

source:
  type: trino
  config:
    serverURL: "http://trino:8080"
    catalog: hive
    schema: analytics
    query: |
      SELECT
        u.id,
        u.email,
        o.order_id,
        o.total
      FROM hive.analytics.users u
      JOIN hive.analytics.orders o ON u.id = o.user_id
      WHERE o.created_at > CURRENT_TIMESTAMP - INTERVAL '1' DAY
    pollInterval: 300  # Опрос каждые 5 минут
    keycloak:
      serverURL: "https://keycloak.example.com"
      realm: myrealm
      clientID: trino-client
      clientSecret: client-secret
      username: trino-user
      password: trino-password

Приемник (Sink)

Конфигурация Trino приемника:

sink:
  type: trino
  config:
    # URL сервера Trino (обязательно)
    serverURL: "http://trino:8080"

    # Каталог для использования (обязательно)
    catalog: hive

    # Схема для использования (обязательно)
    schema: default

    # Таблица для записи (обязательно)
    table: target_table

    # Размер батча для вставки (опционально, по умолчанию: 1). 0 — сброс только по таймеру
    batchSize: 100

    # Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
    batchFlushIntervalSeconds: 10

    # Автоматическое создание таблицы (опционально, по умолчанию: false)
    # Если true, создает таблицу с VARCHAR колонкой для JSON данных
    autoCreateTable: true

    # Режим rawMode (опционально, по умолчанию: false)
    # При true создает таблицу с колонкой data VARCHAR; сообщения сохраняются как JSON-строка
    # При false используется колоночный формат из сообщения
    rawMode: true

    # MERGE upsert для Iceberg-каталогов (имя catalog должно содержать "iceberg")
    upsertMode: true
    conflictKey: id
    queryTimeoutSeconds: 600

    # Аутентификация Keycloak (опционально)
    keycloak:
      serverURL: "https://keycloak.example.com"
      realm: myrealm
      clientID: trino-client
      clientSecret: client-secret
      username: trino-user
      password: trino-password

Особенности Trino приемника

  • Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении batchSize или по таймеру (10 с). Только по размеру: batchFlushIntervalSeconds: 0. Только по времени: batchSize: 0
  • Upsert mode: upsertMode: true использует MERGE INTO для Iceberg-таблиц (conflictKey обязателен). См. Отказоустойчивость.
  • Автосоздание таблиц: Автоматически создает таблицы, если они не существуют
  • Режим rawMode: При rawMode: true создает таблицу с колонкой data VARCHAR; сообщения сохраняются как JSON-строка. При false (по умолчанию) — колоночный формат из сообщения
  • Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
  • Автоматическое обновление токенов: Токены автоматически обновляются

Пример: Kafka → Trino с Keycloak

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-trino
spec:
  source:
    type: kafka
    config:
      brokers:
        - kafka:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: trino
    config:
      serverURL: "http://trino:8080"
      catalog: hive
      schema: default
      table: output_table
      batchSize: 100
      keycloak:
        # Использование долгоживущего токена, полученного из Keycloak
        token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."

        # Альтернатива: Использование OAuth2 flow
        # serverURL: "https://keycloak.example.com"
        # realm: myrealm
        # clientID: trino-client
        # clientSecret: client-secret
        # username: trino-user
        # password: trino-password

Пример: Trino с secrets

apiVersion: v1
kind: Secret
metadata:
  name: trino-credentials
  namespace: default
type: Opaque
stringData:
  serverURL: "http://trino:8080"
  catalog: "hive"
  schema: "default"
  table: "output_table"
---
apiVersion: v1
kind: Secret
metadata:
  name: keycloak-credentials
  namespace: default
type: Opaque
stringData:
  serverURL: "https://keycloak.example.com"
  realm: "myrealm"
  clientID: "trino-client"
  clientSecret: "client-secret"
  username: "trino-user"
  password: "trino-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-trino-secrets
spec:
  source:
    type: kafka
    config:
      brokers:
        - kafka:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: trino
    config:
      serverURLSecretRef:
        name: trino-credentials
        key: serverURL
      catalogSecretRef:
        name: trino-credentials
        key: catalog
      schemaSecretRef:
        name: trino-credentials
        key: schema
      tableSecretRef:
        name: trino-credentials
        key: table
      batchSize: 100
      keycloak:
        # Вариант 1: Использование токена из secret (рекомендуется для долгоживущих токенов)
        tokenSecretRef:
          name: keycloak-credentials
          key: token

        # Вариант 2: Использование OAuth2 flow из secrets (альтернатива)
        # serverURLSecretRef:
        #   name: keycloak-credentials
        #   key: serverURL
        # realmSecretRef:
        #   name: keycloak-credentials
        #   key: realm
        # clientIDSecretRef:
        #   name: keycloak-credentials
        #   key: clientID
        # clientSecretSecretRef:
        #   name: keycloak-credentials
        #   key: clientSecret
        # usernameSecretRef:
        #   name: keycloak-credentials
        #   key: username
        # passwordSecretRef:
        #   name: keycloak-credentials
        #   key: password

Nessie

Коннектор Nessie читает из таблиц Apache Iceberg и записывает в них через каталог Nessie (Iceberg REST API). Все операции выполняются в контексте ветки Nessie; метаданные и данные управляются каталогом.

Nessie Core API и Iceberg REST

  • Nessie Core API (/api/v1, /api/v2) отвечает за метаданные репозитория: ссылки, коммиты и content-объекты.
  • Iceberg REST каталог ({baseURL}/iceberg[/{branch}][|{warehouse}]) используется DataFlow для чтения и записи табличных данных.
  • Физические файлы таблиц хранятся в настроенном warehouse/object storage (например, S3-совместимом), а Nessie хранит и версионирует каталог и указатели на данные.

Для административных операций по Core API см. OpenAPI-файл в корне репозитория: nessie-openapi-0.107.5.yaml.

Источник (Source)

source:
  type: nessie
  config:
    # Базовый URL сервера Nessie (обязательно), например https://nessie:19120
    baseURL: "http://nessie:19120"

    # Ветка Nessie для чтения (опционально, по умолчанию: main)
    branch: main

    # Имя warehouse для хранилища (опционально)
    warehouse: ""

    # Namespace (схема) таблицы Iceberg (обязательно)
    namespace: my_schema

    # Имя таблицы (обязательно)
    table: my_table

    # Интервал опроса в секундах (опционально, по умолчанию: 10)
    pollInterval: 10

    # Инкрементальное чтение по Iceberg snapshot (опционально, по умолчанию: false)
    # incrementalBySnapshot: true
    # startSnapshotID: "1234567890123456789"  # при первом запуске без checkpoint
    # snapshotCheckpoints: true              # по умолчанию

    # Аутентификация (опционально)
    # authenticationType: AUTO (по умолчанию) | BEARER | BASIC | NONE
    authenticationType: BEARER
    bearerToken: "your-token"
    # Или из Secret:
    # tokenSecretRef:
    #   name: nessie-credentials
    #   key: bearerToken
    # Или Basic auth:
    # basicAuth:
    #   username: user
    #   password: pass
    # Или Basic auth из Secret:
    # basicAuth:
    #   usernameSecretRef:
    #     name: nessie-credentials
    #     key: username
    #   passwordSecretRef:
    #     name: nessie-credentials
    #     key: password

Особенности Nessie источника

  • Контекст ветки: Чтение из указанной ветки Nessie; метаданные таблицы берутся из каталога.
  • Опрос: Периодическое сканирование таблицы Iceberg для новых данных.
  • Полный scan (по умолчанию): На каждом poll выполняется полное сканирование текущего snapshot таблицы.
  • Инкрементальный режим (incrementalBySnapshot: true): Читаются только snapshot новее последнего подтверждённого Ack; позиция сохраняется в checkpoint ConfigMap (см. отказоустойчивость). Поле query в этом режиме не поддерживается.
  • Аутентификация: Bearer токен (OAuth2) или Basic auth для Nessie/Iceberg REST.
  • Без rawMode у источника: Каждая строка Iceberg отдаётся как JSON-объект с именами колонок (например data и _metadata, если таблица создана sink с rawMode: true). Инкрементальный режим уменьшает дубликаты при рестарте, но не включает несколько подов процессора — см. Горизонтальное масштабирование ниже.

Приемник (Sink)

sink:
  type: nessie
  config:
    baseURL: "http://nessie:19120"
    branch: main
    warehouse: ""
    namespace: my_schema
    table: my_table

    # Размер батча для дозаписи (опционально, по умолчанию: 100). 0 — сброс только по таймеру
    batchSize: 100

    # Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
    batchFlushIntervalSeconds: 10

    # Создать таблицу, если не существует (опционально)
    autoCreateTable: true

    # Режим rawMode (опционально, по умолчанию: false)
    # При true создаёт таблицу с колонками data и _metadata (string); plain-сообщения оборачиваются с msg.Metadata
    rawMode: false

    authenticationType: BEARER
    bearerToken: "your-token"
    # Или basicAuth: { username, password }

    # Опционально: объектное хранилище warehouse (S3-совместимое). Не путать с аутентификацией Nessie REST выше.
    # Задавайте accessKeySecretRef и secretAccessKeySecretRef вместе.
    # Если namespace не указан, считается namespace DataFlow и используется secretKeyRef в Pod (предпочтительно).
    # Если указан другой namespace, оператор читает Secret при reconcile и подставляет литералы в env Deployment (аналогично другим коннекторам с секретами в ConfigMap по модели раскрытия в etcd API).
    # s3Endpoint: "https://storage.yandexcloud.net"
    # s3Region: "ru-central1"
    # accessKeySecretRef:
    #   name: iceberg-s3-creds
    #   key: AWS_ACCESS_KEY_ID
    # secretAccessKeySecretRef:
    #   name: iceberg-s3-creds
    #   key: AWS_SECRET_ACCESS_KEY

Особенности Nessie приемника

  • Контекст ветки: Запись выполняется в указанную ветку Nessie через каталог.
  • Батч-дозапись: Группировка сообщений и дозапись батчами в Iceberg; по умолчанию сброс при достижении batchSize или по таймеру (10 с). Только по размеру: batchFlushIntervalSeconds: 0. Только по времени: batchSize: 0
  • Автосоздание таблицы: При autoCreateTable: true создаёт таблицу Iceberg при отсутствии. По умолчанию — одна колонка data (string). При rawMode: true — колонки data и _metadata (string) — см. rawMode и колонка _metadata.
  • Аутентификация: Аналогично источнику (Bearer или Basic).
  • Объектное хранилище warehouse: Опциональные статические ключи (accessKeySecretRef + secretAccessKeySecretRef) задают переменные iceberg-go / AWS SDK; при необходимости укажите s3Endpoint и s3Region (например Yandex Object Storage). В том же namespace, что DataFlow, используются secretKeyRef; для другого namespace оператор подставляет значения в env Deployment. Изменение целевого Secret приводит к reconcile через watch.

Горизонтальное масштабирование (spec.replicas)

Nessie — polling-источник. Число подов процессора задаётся полем spec.replicas (по умолчанию 1).

spec.source.type spec.replicas Поведение
kafka допускается > 1 Consumer group распределяет партиции топика между подами. Sink Nessie в таком пайплайне может работать на нескольких подах (каждый пишет батчи независимо).
nessie (или другой polling-источник) только 1 или не задан Admission webhook отклоняет replicas > 1. Несколько подов с общим checkpoint ConfigMap дублируют чтение/запись.
любой (DataFlowCron) только 1 или не задан Один Job процессора на тик расписания; replicas > 1 всегда запрещён.

incrementalBySnapshot у Nessie-источника улучшает поведение при рестарте (checkpoint по Iceberg snapshot), но не заменяет горизонтальное масштабирование как у Kafka.

Для большей пропускной способности при записи в Nessie настраивайте batchSize, batchFlushIntervalSeconds и channelBufferSize, а не увеличивайте replicas. Подробнее: отказоустойчивость — горизонтальное масштабирование, DataFlow spec — replicas.

rawMode и колонка _metadata (только sink)

rawMode задаётся в sink (sink.config.rawMode). У Nessie-источника этого параметра нет.

Схема Iceberg при autoCreateTable: true и rawMode: true:

Колонка Тип Iceberg Содержимое
data string JSON полезной нагрузки (тело сообщения)
_metadata string JSON с полями происхождения (offset, partition, topic Kafka и т.д.)

В отличие от PostgreSQL sink (data / _metadata как JSONB), Nessie хранит оба поля как string с JSON-текстом внутри.

Как сообщение попадает в таблицу при записи

  1. Обычное сообщениеmsg.Data — тело; msg.Metadata (например от Kafka) сериализуется в _metadata:
{"id": 1, "event": "login"}

data = JSON тела, _metadata = {"offset":100,"partition":0,"topic":"events",...}

  1. Уже обёрнутое сообщение — в теле есть ключи value и _metadata (типично при цепочке из другого rawMode sink):
{"value": {"id": 1}, "_metadata": {"offset": 10, "topic": "t1"}}

→ внутренний value → колонка data, внутренний _metadata → колонка _metadata (как у Trino/PostgreSQL rawMode).

Существующие таблицы

  • При rawMode: true на Connect проверяется наличие колонок data и _metadata (без учёта регистра). Иначе процессор завершится с ошибкой.
  • Таблица без _metadata несовместима с rawMode: true, пока вы не пересоздадите её или не добавите колонку вручную.

Чтение обратно (Nessie source)

Источник не разворачивает rawMode: одна строка Iceberg → один JSON с именами колонок. Строка из rawMode-таблицы выглядит так:

{"data": "{\"id\":1}", "_metadata": "{\"offset\":100,\"topic\":\"events\"}"}

При необходимости распарсите data / _metadata на стороне downstream sink.

flattenMetadataColumns — плоские колонки метаданных (sink)

Поддерживается в PostgreSQL, Trino, ClickHouse и Nessie sink при rawMode: true.

При flattenMetadataColumns: true каждый ключ из msg.Metadata записывается в отдельную колонку вместо JSON в _metadata. Опциональный префикс flattenMetadataColumnsPrefix добавляется к имени ключа (например kafka_ + offsetkafka_offset).

Параметр Описание
flattenMetadataColumns true — развернуть metadata в колонки
flattenMetadataColumnsPrefix Префикс имён колонок (например kafka_)

Схема при autoCreateTable: true: колонка data (string) + колонки по ключам metadata из первого батча (типы выводятся автоматически: int/long/string/bool/timestamp). Ключ metadata timestamp (например kafka_timestamp с префиксом kafka_) сохраняется как timestamptz (Kafka source передаёт time.Time в UTC). Строковые timestamp из старых пайплайнов парсятся при возможности. Колонка _metadata не создаётся. Новые ключи metadata в последующих батчах игнорируются (логируется предупреждение).

Таблица с колонкой _metadata несовместима с flattenMetadataColumns: true — пересоздайте таблицу или используйте новое имя.

Чтение (Nessie source): строки с колонками data и kafka_* (без _metadata) преобразуются в сообщения {"value": ..., "_metadata": ...}; поля metadata также попадают в msg.Metadata.

Пример: Kafka → Nessie (Iceberg)

apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-nessie
spec:
  source:
    type: kafka
    config:
      brokers:
        - kafka:9092
      topic: input-topic
      consumerGroup: dataflow-group
  sink:
    type: nessie
    config:
      baseURL: "http://nessie:19120"
      branch: main
      namespace: analytics
      table: events
      batchSize: 100
      autoCreateTable: true
      rawMode: true
      flattenMetadataColumns: true
      flattenMetadataColumnsPrefix: kafka_  # kafka_offset, kafka_partition, ...

Iceberg (REST Catalog)

Коннектор Iceberg читает и записывает таблицы Apache Iceberg через официальный REST Catalog API. Используйте его для универсальных REST-каталогов (Polaris, Lakekeeper, Tabular, iceberg-rest и т.д.). Для веток Nessie используйте коннектор nessie.

Source

source:
  type: iceberg
  config:
    catalogURI: "https://iceberg-catalog.example.com"
    # prefix: production
    # warehouse: main
    namespace: my_schema
    table: my_table
    pollInterval: 10
    authenticationType: BEARER
    bearerToken: "your-token"
    # oauth2ClientID / oauth2ClientSecret — альтернатива bearerToken

Sink

sink:
  type: iceberg
  config:
    catalogURI: "https://iceberg-catalog.example.com"
    namespace: my_schema
    table: my_table
    batchSize: 100
    autoCreateTable: true
    rawMode: true
    authenticationType: BEARER
    bearerToken: "your-token"
    # s3Endpoint, s3Region, accessKeySecretRef, secretAccessKeySecretRef — для S3 warehouse

Поведение совпадает с Nessie: опрос, инкрементальные snapshot, батч-дозапись, rawMode, flattenMetadataColumns. Ключ checkpoint — iceberg. Для Iceberg source spec.replicas должен быть 1.

Пример: dataflow/config/samples/kafka-to-iceberg.yaml.

Error Sink

DataFlow Operator поддерживает настройку отдельного приёмника для сообщений, которые не удалось записать в основной sink. Секция errors использует те же типы коннекторов (Kafka, PostgreSQL, Trino, ClickHouse, Nessie, Iceberg), что и основной sink.

Конфигурация, структура сообщений об ошибках и типы ошибок описаны в разделе Обработка ошибок.

Использование Secrets в Kubernetes

DataFlow Operator поддерживает конфигурацию коннекторов из Kubernetes Secrets. Это позволяет безопасно хранить чувствительные данные (пароли, токены, connection strings) без их явного указания в спецификации DataFlow.

Обзор

Вместо указания значений напрямую в конфигурации, вы можете использовать ссылки на Kubernetes Secrets через поля *SecretRef. Оператор автоматически читает значения из secrets и подставляет их в конфигурацию коннекторов.

Структура SecretRef

Каждая ссылка на secret имеет следующую структуру:

secretRef:
  name: my-secret          # Имя secret (обязательно)
  namespace: my-namespace  # Namespace secret (опционально, по умолчанию - namespace DataFlow)
  key: my-key              # Ключ в secret (обязательно)

Поддерживаемые поля

Все коннекторы поддерживают ссылки на secrets для следующих полей:

Kafka

  • brokersSecretRef - список брокеров (через запятую)
  • topicSecretRef - название топика
  • consumerGroupSecretRef - consumer group
  • sasl.usernameSecretRef - имя пользователя SASL
  • sasl.passwordSecretRef - пароль SASL
  • tls.certSecretRef - сертификат клиента
  • tls.keySecretRef - приватный ключ
  • tls.caSecretRef - CA сертификат
  • schemaRegistry.urlSecretRef - URL Schema Registry
  • schemaRegistry.basicAuth.usernameSecretRef - имя пользователя для Schema Registry
  • schemaRegistry.basicAuth.passwordSecretRef - пароль для Schema Registry
  • avroSchemaSecretRef - Avro схема из секрета (для статической схемы)

PostgreSQL

  • connectionStringSecretRef - строка подключения
  • tableSecretRef - название таблицы

PostgreSQL CDC (source)

  • connectionStringSecretRef - строка подключения
  • slotNameSecretRef - имя replication slot
  • publicationNameSecretRef - имя publication

ClickHouse

  • connectionStringSecretRef - строка подключения
  • tableSecretRef - название таблицы

Trino

  • serverURLSecretRef - URL сервера Trino
  • catalogSecretRef - название каталога
  • schemaSecretRef - название схемы
  • tableSecretRef - название таблицы
  • keycloak.serverURLSecretRef - URL сервера Keycloak
  • keycloak.realmSecretRef - название realm в Keycloak
  • keycloak.clientIDSecretRef - OAuth2 client ID
  • keycloak.clientSecretSecretRef - OAuth2 client secret
  • keycloak.usernameSecretRef - имя пользователя для password grant
  • keycloak.passwordSecretRef - пароль для password grant
  • keycloak.tokenSecretRef - OAuth2 токен (для долгоживущих токенов)

Nessie

  • baseURLSecretRef - базовый URL сервера Nessie
  • tokenSecretRef - Bearer токен для Nessie/Iceberg REST
  • namespaceSecretRef - namespace (схема)
  • tableSecretRef - название таблицы

Примеры использования

Пример 1: Kafka с SASL аутентификацией

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
  namespace: default
type: Opaque
stringData:
  brokers: "kafka1:9092,kafka2:9092,kafka3:9092"
  topic: "input-topic"
  consumerGroup: "dataflow-group"
  username: "kafka-user"
  password: "kafka-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-example
spec:
  source:
    type: kafka
    config:
      brokersSecretRef:
        name: kafka-credentials
        key: brokers
      topicSecretRef:
        name: kafka-credentials
        key: topic
      consumerGroupSecretRef:
        name: kafka-credentials
        key: consumerGroup
      securityProtocol: SASL_PLAINTEXT
      sasl:
        mechanism: "scram-sha-256"
        usernameSecretRef:
          name: kafka-credentials
          key: username
        passwordSecretRef:
          name: kafka-credentials
          key: password

Пример 2: PostgreSQL с connection string из secret

apiVersion: v1
kind: Secret
metadata:
  name: postgres-credentials
  namespace: default
type: Opaque
stringData:
  connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
  table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: postgres-example
spec:
  source:
    type: postgresql
    config:
      connectionStringSecretRef:
        name: postgres-credentials
        key: connectionString
      tableSecretRef:
        name: postgres-credentials
        key: table

Пример 3: Kafka → PostgreSQL с secrets

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
type: Opaque
stringData:
  brokers: "localhost:9092"
  topic: "input-topic"
  consumerGroup: "dataflow-group"
---
apiVersion: v1
kind: Secret
metadata:
  name: postgres-credentials
type: Opaque
stringData:
  connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
  table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-to-postgres-secrets
spec:
  source:
    type: kafka
    config:
      brokersSecretRef:
        name: kafka-credentials
        key: brokers
      topicSecretRef:
        name: kafka-credentials
        key: topic
      consumerGroupSecretRef:
        name: kafka-credentials
        key: consumerGroup
  sink:
    type: postgresql
    config:
      connectionStringSecretRef:
        name: postgres-credentials
        key: connectionString
      tableSecretRef:
        name: postgres-credentials
        key: table

Пример 4: TLS сертификаты из secrets

Для TLS конфигурации оператор автоматически определяет, является ли значение из secret путем к файлу или содержимым сертификата.

Определение типа значения: - Если значение начинается с -----BEGIN (например, -----BEGIN CERTIFICATE----- или -----BEGIN PRIVATE KEY-----), оператор считает его содержимым сертификата и создает временный файл - Если значение не начинается с -----BEGIN и существует как файл в файловой системе, оно используется как путь к файлу - Если значение не начинается с -----BEGIN и файл не существует, оно также обрабатывается как содержимое сертификата

Поддерживаемые форматы: 1. Содержимое сертификата (PEM формат):

ca.crt: |
  -----BEGIN CERTIFICATE-----
  MIIDXTCCAkWgAwIBAgIJAK...
  -----END CERTIFICATE-----

  1. Base64-кодированное содержимое (в поле data секрета):

    apiVersion: v1
    kind: Secret
    metadata:
      name: kafka-tls-certs
    type: Opaque
    data:
      ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0t...  # base64
    
    Kubernetes автоматически декодирует base64 при чтении из секрета.

  2. Путь к файлу:

    ca.crt: /etc/kafka/ca.crt
    

Пример с содержимым сертификата:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-tls-certs
type: Opaque
stringData:
  # Содержимое CA сертификата
  ca.crt: |
    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
  # Содержимое клиентского сертификата
  client.crt: |
    -----BEGIN CERTIFICATE-----
    ...
    -----END CERTIFICATE-----
  # Содержимое приватного ключа
  client.key: |
    -----BEGIN PRIVATE KEY-----
    ...
    -----END PRIVATE KEY-----
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
  name: kafka-tls-example
spec:
  source:
    type: kafka
    config:
      brokers:
        - secure-kafka:9093
      topic: secure-topic
      tls:
        caSecretRef:
          name: kafka-tls-certs
          key: ca.crt
        certSecretRef:
          name: kafka-tls-certs
          key: client.crt
        keySecretRef:
          name: kafka-tls-certs
          key: client.key

Важно: - Временные файлы создаются автоматически для содержимого сертификатов и удаляются после использования - При использовании base64-кодированных значений в поле data, Kubernetes автоматически декодирует их при чтении - Убедитесь, что сертификаты в правильном PEM формате с заголовками -----BEGIN и -----END

Приоритет значений

Если указаны и прямое значение, и SecretRef, приоритет имеет SecretRef. Значение из secret будет использовано вместо прямого значения.

Secrets в разных namespace

По умолчанию оператор ищет secrets в том же namespace, где находится DataFlow ресурс. Вы можете указать другой namespace:

connectionStringSecretRef:
  name: postgres-credentials
  namespace: shared-secrets
  key: connectionString

Безопасность

  • RBAC: Оператору требуются права на чтение secrets (get, list, watch)
  • Изоляция: Secrets должны быть в том же namespace или в namespace, к которому у оператора есть доступ
  • Временные файлы: Для TLS сертификатов оператор создает временные файлы, которые автоматически удаляются после использования
  • Файлы создаются только для содержимого сертификатов (начинающихся с -----BEGIN)
  • Файлы удаляются автоматически при завершении работы оператора
  • Временные файлы создаются в системной временной директории с уникальными именами

Устранение неполадок

Secret не найден

# Проверьте существование secret
kubectl get secret <secret-name> -n <namespace>

# Проверьте права оператора
kubectl auth can-i get secrets --as=system:serviceaccount:default:dataflow-operator

Ключ не найден в secret

# Проверьте ключи в secret
kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data}' | jq 'keys'

Ошибка разрешения secrets

Проверьте логи оператора:

kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret

Убедитесь, что: 1. Secret существует в указанном namespace 2. Указанный ключ существует в secret 3. У оператора есть права на чтение secrets

Проблемы с TLS сертификатами

Если возникают ошибки при работе с TLS сертификатами:

  1. Ошибка "file name too long": Убедитесь, что сертификат в секрете хранится правильно:
  2. Если используете stringData, сертификат должен быть в PEM формате с заголовками -----BEGIN и -----END
  3. Если используете data (base64), убедитесь, что значение корректно закодировано
  4. Оператор автоматически определяет содержимое сертификата по префиксу -----BEGIN

  5. Ошибка "failed to read CA file": Проверьте формат сертификата:

    # Проверьте содержимое секрета
    kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data.ca\.crt}' | base64 -d
    
    Убедитесь, что сертификат начинается с -----BEGIN CERTIFICATE-----

  6. Ошибка создания временного файла: Проверьте права оператора на создание файлов в временной директории

Рекомендации по производительности

Общие настройки spec

channelBufferSize

Размер буфера каналов между source, processor и sink (по умолчанию 100). При высокой нагрузке Kafka (десятки тысяч msg/s) увеличьте до 500–1000, чтобы снизить блокировки, когда sink медленнее source.

ackGranularity

Момент коммита offset / checkpoint относительно записи в sink: batch (по умолчанию) или message. При message batch sink сбрасывает по одному сообщению, Kafka source сразу коммитит offset. См. Отказоустойчивость — гранулярность ack.

checkpointSyncOnAck и checkpointSaveInterval

checkpointSyncOnAck: true сбрасывает checkpoint polling-источников в ConfigMap после ack sink (с coalescing по checkpointSaveInterval, по умолчанию 30s). Рекомендуется для migration и cron. Подробнее: Отказоустойчивость.

Kafka

  • Используйте несколько брокеров для отказоустойчивости
  • Настройте правильный размер consumer group для параллельной обработки
  • Используйте батчи при записи для повышения throughput

PostgreSQL

  • Увеличьте batchSize для приемника (рекомендуется 50-100)
  • Используйте индексы на часто запрашиваемых полях
  • Настройте pollInterval в зависимости от частоты обновления данных

Устранение неполадок

Проблемы с подключением

  1. Проверьте доступность источника данных из кластера
  2. Убедитесь, что credentials корректны
  3. Проверьте сетевые политики Kubernetes
  4. Для TLS проверьте сертификаты

Проблемы с производительностью

  1. Увеличьте channelBufferSize (500–1000) при высокой нагрузке Kafka
  2. Увеличьте размер батчей для приемников
  3. Настройте правильный pollInterval для источников
  4. Для Kafka увеличивайте spec.replicas (не больше числа партиций топика). Для Nessie и других polling-источников оставляйте replicas: 1 и настраивайте batchSize / channelBufferSize
  5. Мониторьте метрики обработки сообщений

Логирование

Включите детальное логирование для отладки:

kubectl logs -l app.kubernetes.io/name=dataflow-operator -f --tail=100

Проверьте статус DataFlow ресурса:

kubectl describe dataflow <name>