Connectors
DataFlow Operator поддерживает различные коннекторы для источников и приемников данных. Каждый коннектор реализует стандартный интерфейс и может использоваться как источник (source) или приемник (sink) данных.
Обзор коннекторов
| Коннектор | Источник | Приемник | Особенности |
|---|---|---|---|
| Kafka | ✅ | ✅ | Consumer groups, TLS, SASL, Avro, Schema Registry |
| PostgreSQL | ✅ | ✅ | SQL запросы, батч-вставки, автосоздание таблиц, UPSERT режим |
| Trino | ✅ | ✅ | SQL запросы, аутентификация Keycloak OAuth2, батч-вставки |
| ClickHouse | ✅ | ✅ | Опрос таблиц, батч-вставки, автосоздание MergeTree таблиц |
| Nessie | ✅ | ✅ | Iceberg через каталог Nessie, опрос, батч-дозапись, sink rawMode (data + _metadata); spec.replicas только 1 |
| Iceberg | ✅ | ✅ | Apache Iceberg через официальный REST Catalog API; опрос, батч-дозапись, Bearer/Basic/OAuth2, опционально S3; spec.replicas только 1 |
Kafka
Kafka коннектор поддерживает чтение и запись сообщений из/в топики Apache Kafka. Поддерживает consumer groups для масштабирования, TLS и SASL аутентификацию, а также работу с Avro форматом через Confluent Schema Registry или статическую схему.
Источник (Source)
Конфигурация Kafka источника:
source:
type: kafka
config:
# Список брокеров Kafka (обязательно)
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# Топик для чтения (обязательно)
topic: input-topic
# Consumer group (опционально, по умолчанию: dataflow-operator)
consumerGroup: my-group
# Протокол безопасности Kafka (опционально)
# Значения: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
# Если не указано — протокол выводится из секций tls/sasl (обратная совместимость)
securityProtocol: SASL_PLAINTEXT
# TLS конфигурация (опционально)
tls:
# Пропустить проверку сертификата (не рекомендуется для production)
insecureSkipVerify: false
# Путь к CA сертификату
caFile: /path/to/ca.crt
# Путь к клиентскому сертификату
certFile: /path/to/client.crt
# Путь к приватному ключу
keyFile: /path/to/client.key
# SASL аутентификация (опционально)
sasl:
# Механизм: plain, scram-sha-256, scram-sha-512
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
# Формат сообщений (опционально, по умолчанию: "json")
# Поддерживаемые форматы: "json", "avro"
format: json
# Конфигурация Avro (требуется если format: "avro")
# Вариант 1: Использование Confluent Schema Registry (рекомендуется)
schemaRegistry:
# URL Schema Registry (обязательно)
url: https://schema-registry:8081
# Basic Auth для Schema Registry (опционально)
basicAuth:
username: schema-user
password: schema-password
# TLS конфигурация для Schema Registry (опционально)
tls:
insecureSkipVerify: false
caFile: /path/to/schema-registry-ca.crt
# Вариант 2: Статическая Avro схема (альтернатива Schema Registry)
# avroSchema: |
# {
# "type": "record",
# "name": "MyRecord",
# "fields": [
# {"name": "id", "type": "long"},
# {"name": "name", "type": "string"}
# ]
# }
# Или путь к файлу со схемой:
# avroSchemaFile: /path/to/schema.avsc
Особенности Kafka источника
- Consumer Groups: Используйте разные consumer groups для масштабирования обработки
- Начальная позиция: По умолчанию читает с самого старого сообщения (
OffsetOldest) - Форматы данных: Поддерживает JSON (по умолчанию) и Avro форматы
- Avro поддержка:
- Confluent Schema Registry: Автоматическое получение схем по ID из сообщений (формат: magic byte + schema ID + data)
- Статическая схема: Использование предопределенной схемы из конфигурации или файла
- Кэширование схем: Схемы из Schema Registry кэшируются для повышения производительности
- Метаданные: Каждое сообщение содержит метаданные:
topic- название топикаpartition- номер партицииoffset- смещение сообщенияkey- ключ сообщения (если есть)- securityProtocol: Явная настройка Kafka
security.protocol. Поддерживаемые значения:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL(регистр и-/_нормализуются). Если поле не задано, протокол определяется неявно: толькоsasl→SASL_PLAINTEXT,tls+sasl→SASL_SSL, толькоtls→SSL, без обоих →PLAINTEXT. При явном указании webhook проверяет согласованность с секциямиtls/sasl(например,SASL_PLAINTEXTтребуетsaslи не допускаетtls).
Пример с TLS и SASL
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
consumerGroup: secure-group
securityProtocol: SASL_SSL
tls:
caFile: /etc/kafka/ca.crt
certFile: /etc/kafka/client.crt
keyFile: /etc/kafka/client.key
sasl:
mechanism: scram-sha-512
username: kafka-user
password: ${KAFKA_PASSWORD} # Используйте Secrets в Kubernetes
Пример с SASL_PLAINTEXT (без TLS)
source:
type: kafka
config:
brokers:
- kafka1:9092
- kafka2:9092
topic: input-topic
consumerGroup: my-group
securityProtocol: SASL_PLAINTEXT
sasl:
mechanism: scram-sha-256
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
Пример с Avro и Schema Registry
source:
type: kafka
config:
brokers:
- kafka:9092
topic: avro-topic
consumerGroup: avro-group
format: avro
schemaRegistry:
url: https://schema-registry:8081
basicAuth:
username: schema-user
password: schema-password
tls:
insecureSkipVerify: true # Для self-signed сертификатов
# caFile: /path/to/ca.crt # Или укажите CA сертификат
Пример с Avro и статической схемой
source:
type: kafka
config:
brokers:
- kafka:9092
topic: avro-topic
consumerGroup: avro-group
format: avro
avroSchema: |
{
"type": "record",
"name": "Stock",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "symbol", "type": "string"},
{"name": "price", "type": "double"},
{"name": "quantity", "type": "int"}
]
}
# Или используйте файл:
# avroSchemaFile: /path/to/schema.avsc
Приемник (Sink)
Конфигурация Kafka приемника:
sink:
type: kafka
config:
# Список брокеров Kafka (обязательно)
brokers:
- kafka1:9092
# Топик для записи (обязательно)
topic: output-topic
# Протокол безопасности (опционально, см. источник)
securityProtocol: SASL_PLAINTEXT
# TLS конфигурация (опционально, аналогично источнику)
tls:
caFile: /path/to/ca.crt
certFile: /path/to/client.crt
keyFile: /path/to/client.key
# SASL конфигурация (опционально, аналогично источнику)
sasl:
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
Особенности Kafka приемника
- Синхронная запись: Использует
SyncProducerдля гарантированной доставки - Ключи сообщений: Сохраняет ключи из метаданных сообщения
- Метаданные: После записи обновляет метаданные с
partitionиoffset
PostgreSQL
PostgreSQL коннектор поддерживает чтение из таблиц и запись в таблицы PostgreSQL. Поддерживает кастомные SQL запросы, периодический опрос, батч-вставки, CDC-стиль отслеживания изменений (INSERT и UPDATE), soft delete и SecretRef для учётных данных.
Для нативного CDC через logical replication (WAL / pgoutput) используйте отдельный тип источника postgresql-cdc.
Источник (Source)
Конфигурация PostgreSQL источника:
source:
type: postgresql
config:
# Connection string (обязательно, или connectionStringSecretRef)
# Формат: postgres://user:password@host:port/dbname?sslmode=mode
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Таблица для чтения (обязательно, если не указан query). Поддерживает schema.table (напр. public.products)
table: source_table
# Кастомный SQL запрос (опционально)
# Если указан, используется вместо чтения всей таблицы
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
pollInterval: 60
# Опции CDC-стиля (опционально)
readBatchSize: 1000 # Ограничение строк за один опрос (0 = без лимита)
changeTrackingColumn: updated_at # Колонка для отслеживания изменений (по умолчанию: updated_at). В query mode задайте явно для инкрементальной обёртки subquery
orderByColumn: id # Вторичный ключ сортировки для стабильной пагинации (по умолчанию: id). Пример: price_id
autoCreateTable: true # Создать таблицу, если не существует, перед чтением
# SecretRef (опционально)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Особенности PostgreSQL источника
- Периодический опрос: Регулярно опрашивает таблицу на наличие новых данных
- Кастомные запросы: Поддержка сложных SQL запросов с JOIN, WHERE, и т.д.
- Метаданные: Каждое сообщение содержит метаданные
tableиoperation(insert/update) - Размер батча чтения: Ограничивает количество строк за один опрос для снижения нагрузки на БД
- Отслеживание обновлений: По умолчанию отслеживает изменения по колонке
updated_at(илиchangeTrackingColumn), захватывает INSERT и UPDATE - Стабильная сортировка: В table mode добавляется
ORDER BY changeTrackingColumn, orderByColumn(по умолчанию вторичный ключ —id). При одновременном указанииqueryиchangeTrackingColumnSQL оборачивается в subquery с composite checkpoint(changeTrackingColumn, orderByColumn) > (lastTime, lastId)иORDER BY. Без явногоchangeTrackingColumnquery mode выполняет SQL как есть (legacy). Значение колонкиorderByColumnпопадает в metadataid. - Требования query mode: При инкрементальном query mode в SELECT должны быть колонки
changeTrackingColumnиorderByColumn, иначе checkpoint не продвигается. - Автосоздание таблицы: При
autoCreateTable: trueсоздаёт таблицу с CDC-совместимой схемой (id SERIAL PRIMARY KEY,created_at,updated_at), если не существует. Создание выполняется при подключении (Connect). - Схема таблицы: Имя таблицы поддерживает формат
schema.table(напр.public.products) - Персистенция checkpoint: По умолчанию позиция чтения (
lastReadChangeTime,lastReadOrderByValue) сохраняется в ConfigMap и при перезапуске чтение продолжается с последней позиции. ЗадайтеcheckpointPersistence: falseв spec, чтобы хранить только в памяти. Для pg→pg включитеupsertMode: trueв sink, чтобы дубликаты обновлялись, а не вставлялись повторно.
Разовая миграция (batch exhaust)
Для однократного копирования снимка (например, job DataFlowCron, который должен завершиться после чтения всех строк):
- Предпочтительно MATERIALIZED VIEW на source DB для тяжёлого бизнес-SQL:
CREATE MATERIALIZED VIEW+REFRESH, затемtableуказывает на MVIEW. changeTrackingColumnиorderByColumn— одна и та же колонка-ключ без timestamp (напримерmaterial_id), если в результате нетupdated_at.readBatchSize(например10000) для батчевого чтения.- В sink:
upsertMode: true, явныйconflictKey, совпадающие типы колонок (напримерBIGINTдля числовых ID, неTEXT). - Processor останавливается, когда следующий poll возвращает 0 строк (
source exhausted). Checkpoint продвигается черезAckпосле успешной записи в sink.
Пример source:
source:
type: postgresql
config:
table: price_calendar.mv_one_p_prices_migration
changeTrackingColumn: material_id
orderByColumn: material_id
readBatchSize: 10000
pollInterval: 5
Тот же паттерн применим к инкрементальным source Trino и ClickHouse. Kafka и Nessie используют другую модель offset/snapshot.
Пример с кастомным запросом
source:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
query: |
SELECT
u.id,
u.email,
o.order_id,
o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.created_at > NOW() - INTERVAL '1 day'
pollInterval: 300 # Опрос каждые 5 минут
Приемник (Sink)
Конфигурация PostgreSQL приемника:
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Таблица для записи. Поддерживает schema.table (напр. public.products_clone)
table: target_table
# Размер батча (опционально, по умолчанию: 1). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
autoCreateTable: true
# Режим rawMode (опционально, по умолчанию: false)
# При true ожидает сообщения в формате {"value": <данные>, "_metadata": {...}} или plain + msg.Metadata
# Таблица создаётся с колонками data JSONB и _metadata JSONB
rawMode: false
# Режим UPSERT (опционально, по умолчанию: false)
upsertMode: true
conflictKey: "id"
upsertStrategy: ifNewer # always (по умолчанию) | ifNewer
upsertVersionColumn: updated_at # обязательно при upsertStrategy: ifNewer
# Колонка для soft delete (опционально)
softDeleteColumn: "deleted_at"
# SecretRef (опционально)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Особенности PostgreSQL приемника
- Батч-вставки: Группирует сообщения для эффективной записи. Сброс при достижении
batchSizeили по таймеру. Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Автосоздание таблиц (при
autoCreateTable: true): - rawMode: true — таблица создаётся при подключении (Connect). Схема:
id SERIAL PRIMARY KEY,dataJSONB,_metadataJSONB,created_at,updated_at,deleted_at, GIN индекс поdata - rawMode: false — таблица создаётся при первой записи из структуры первого сообщения (реплицирует схему источника). Типы колонок выводятся автоматически (TEXT, BIGINT, NUMERIC, JSONB и т.д.)
- Режим rawMode: При true тело сообщения пишется в колонку
data, metadata — в_metadata(или в отдельные колонки сflattenMetadataColumns). При false использует колоночный формат из сообщения - flattenMetadataColumns: при
rawMode: trueможно развернуть metadata в отдельные колонки (см. flattenMetadataColumns) - UPSERT режим: Обновляет существующие записи при конфликте по PRIMARY KEY или
conflictKey. Batch flush выполняется в явной транзакции PostgreSQL. - Стратегия upsert:
always(по умолчанию) — обновление при каждом конфликте;ifNewer— только если входящее значениеupsertVersionColumnбольше сохранённого (защита от устаревших replay). - Soft delete: При заданном
softDeleteColumnиmetadata.operation=deleteвыполняетUPDATE ... SET deleted_at = NOW()вместо физического DELETE
Пример с автосозданием таблицы (rawMode)
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
table: events
autoCreateTable: true
rawMode: true # Таблица с data и _metadata
batchSize: 50
При rawMode: true и autoCreateTable: true таблица создаётся со структурой:
CREATE TABLE events (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL,
_metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP
);
CREATE INDEX idx_events_value ON events USING GIN (value);
При rawMode: false структура таблицы выводится из первого сообщения (колоночный формат).
Пример с UPSERT для обновления существующих записей
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
table: orders
upsertMode: true # Включает обновление существующих записей
conflictKey: "id" # Использует поле 'id' для определения конфликта
batchSize: 100
При включенном upsertMode:
- Если запись с таким же id (или указанным conflictKey) уже существует, она будет обновлена
- Если записи нет, она будет вставлена
- Это особенно полезно для синхронизации данных, когда источник периодически отправляет обновленные записи
Важно: Для работы UPSERT таблица должна иметь PRIMARY KEY или UNIQUE constraint на указанном conflictKey.
PostgreSQL CDC (logical replication)
Нативный Change Data Capture через logical replication PostgreSQL (pgoutput). Читает INSERT, UPDATE и DELETE из WAL через replication slot и publication. Пример: config/samples/postgresql-cdc-to-postgres.yaml.
Требования
PostgreSQL должен иметь включённую logical replication:
-- postgresql.conf
wal_level = logical
max_replication_slots >= 1
max_wal_senders >= 1
-- pg_hba.conf (пример)
host replication repl_user 0.0.0.0/0 scram-sha-256
-- privileges
GRANT REPLICATION ON DATABASE db TO repl_user;
Пользователю нужны права REPLICATION и создание publication/slot (или создайте их заранее). Для таблиц без PK: ALTER TABLE ... REPLICA IDENTITY FULL.
Источник (Source)
source:
type: postgresql-cdc
config:
connectionString: "postgres://repl_user:pass@pg:5432/db?sslmode=disable"
slotName: dataflow_orders_slot
publicationName: dataflow_orders_pub
tables:
- public.orders
- public.customers
snapshotMode: initial # initial | never | always (по умолчанию: initial)
createSlotIfNotExists: true
createPublicationIfNotExists: true
heartbeatIntervalSeconds: 10 # standby status при простое (0 = выключить)
primaryKeyColumn: id # metadata.id (по умолчанию: id)
includeColumns: [] # опциональный allow-list колонок
excludeColumns: []
envelopeFormat: row # row (по умолчанию) | debezium
Особенности
- Streaming WAL — continuous logical replication (не polling)
- Initial snapshot —
snapshotMode: initialкопирует существующие строки перед streaming - Checkpoint — LSN в ConfigMap (
postgresql-cdc), продвигается только послеAcksink - Metadata:
operation(insert/update/delete),table,lsn,id(при наличии PK) - Multi-table — одна publication, фильтр через список
tables - Schema evolution — relation cache обновляется при каждом
RelationMessageV2(ALTER ADD/DROP COLUMN) - Debezium envelope —
envelopeFormat: debeziumдляpayload.before/after/op/source(совместимо сdebeziumUnwrap) - replicas: только
1
Идемпотентный sink
Используйте ackGranularity: message и идемпотентный sink (upsertMode + conflictKey), чтобы минимизировать дубликаты при restart.
Отличия от Debezium
| Возможность | Debezium | postgresql-cdc |
|---|---|---|
| Initial snapshot | op: r |
То же при envelopeFormat: debezium |
| UPDATE before/after | В envelope | Оба поля, когда WAL содержит old tuple |
| Schema history topic | Да | Нет — inline refresh relation cache |
| DDL events | Отдельные сообщения | Не эмитятся (только refresh схемы) |
| Kafka Connect | Нужен | Не нужен |
ClickHouse
ClickHouse коннектор поддерживает чтение из таблиц и запись в таблицы ClickHouse. Поддерживает периодический опрос для инкрементального чтения, кастомные SQL запросы, батч-вставки и автосоздание MergeTree таблиц.
Источник (Source)
source:
type: clickhouse
config:
# Connection string (обязательно)
# Native: clickhouse://host:9000?username=default&password=xxx&database=default
# HTTP: http://host:8123/default?username=default&password=xxx
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
# Таблица для чтения (обязательно, если не указан query)
table: source_table
# Кастомный SQL запрос (опционально)
query: "SELECT * FROM source_table WHERE id > 100"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
pollInterval: 60
# Колонка для инкрементальной пагинации и стабильного ORDER BY (опционально, по умолчанию: id)
orderByColumn: price_id
# Колонка для отслеживания изменений (опционально, по умолчанию: created_at)
changeTrackingColumn: created_at
# Лимит строк за один запрос в цикле опроса (опционально, 0 = без лимита)
readBatchSize: 1000
Особенности ClickHouse источника
- Опрос: Периодически опрашивает таблицу на наличие новых данных (интервал задаётся
pollInterval) - Батчинг чтения:
readBatchSizeограничивает число строк за запрос в цикле опроса (как у PostgreSQL source) - Инкрементальное чтение: Table mode использует composite checkpoint
(changeTrackingColumn, orderByColumn) > (lastTime, lastKey)с tuple WHERE; legacy checkpoint только сlastReadOrderByValue—WHERE orderByColumn > Nдо появления timestamp - Кастомные запросы: При указании
queryиchangeTrackingColumnSQL оборачивается в subquery с composite filter; без явногоchangeTrackingColumnquery mode выполняет SQL как есть (legacy) - Стабильная сортировка: Table mode —
ORDER BY changeTrackingColumn, orderByColumn(по умолчаниюcreated_at,id) - Персистенция checkpoint: Позиция (
lastReadChangeTime,lastReadOrderByValue) сохраняется в ConfigMap; legacylastReadID/lastReadTimeмигрируются при загрузке - Метаданные:
tableиid(значение колонкиorderByColumn, если есть)
Приемник (Sink)
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
table: output_table
# Размер батча для вставок (опционально, по умолчанию: 100). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Автосоздание таблицы (опционально)
autoCreateTable: true
# rawMode: false (по умолчанию) — создаёт таблицу по структуре первого сообщения (колоночный формат, репликация схемы источника)
# rawMode: true — создаёт MergeTree таблицу с колонками data String, created_at (JSON-хранилище)
rawMode: false
# Идемпотентная запись (опционально)
upsertMode: true
conflictKey: id
replacingVersionColumn: updated_at
tableEngine: ReplacingMergeTree
Особенности ClickHouse приемника
- Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Upsert mode: При
upsertMode: trueавтосозданные таблицы используютReplacingMergeTree; дедупликация при background merge. См. Отказоустойчивость. - Retry при транзиентных ошибках: Batch-запись автоматически повторяется при connection refused, TOO_MANY_PARTS, memory limit, HTTP 502/503 и подобных ошибках (до 5 попыток с экспоненциальным backoff)
- Автосоздание таблиц (при
autoCreateTable: true): - rawMode: true — таблица создаётся при подключении (Connect). Схема:
data String,created_at DateTime DEFAULT now(), движок MergeTree, ORDER BY created_at. Сообщения сохраняются как JSON-строка в колонкеdata. СflattenMetadataColumns: true—data+ отдельные колонки metadata (см. ниже) - rawMode: false — таблица создаётся при первой записи из структуры первого сообщения. Типы колонок выводятся автоматически (String, Int32/Int64, Float64, Decimal, DateTime и т.д.). Поддерживается формат
{"value": {...}, "_metadata": {...}}— используются поля изvalue - Режим rawMode: При true ожидает/хранит JSON в колонке
data. При false использует колоночный формат из сообщения (INSERT с именованными колонками)
Отказоустойчивость ClickHouse
- Error sink: Используйте
spec.errorsс ClickHouse (или Kafka) для сохранения неудачных сообщений. См. Обработка ошибок. - Connection string: Рекомендуемые параметры:
dial_timeout=10s,max_execution_time=60(например,clickhouse://host:9000/default?dial_timeout=10s&max_execution_time=60). - Настройка batch: Для пропускной способности и устойчивости используйте
batchSize100–1000 иbatchFlushIntervalSeconds5–10. Большие батчи снижают частоту вставок и помогают избежать TOO_MANY_PARTS. - TOO_MANY_PARTS: При этой ошибке увеличьте
batchSize, уменьшите частоту flush или настройте merge в ClickHouse (background_pool_size,parts_to_throw_insert). РассмотритеReplacingMergeTreeдля дедупликации. См. Отказоустойчивость.
Пример: Kafka → ClickHouse
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-clickhouse
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default"
table: output_table
batchSize: 100
autoCreateTable: true
Пример: ClickHouse → ClickHouse (репликация схемы)
При autoCreateTable: true и rawMode: false целевая таблица создаётся автоматически по структуре источника:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: clickhouse-to-clickhouse
spec:
source:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products
pollInterval: 5
sink:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products_clone
batchSize: 100
autoCreateTable: true
Trino
Trino коннектор поддерживает чтение из таблиц и запись в таблицы Trino (ранее PrestoSQL). Поддерживает SQL запросы, аутентификацию через Keycloak OAuth2/OIDC и батч-вставки.
Источник (Source)
Конфигурация Trino источника:
source:
type: trino
config:
# URL сервера Trino (обязательно)
serverURL: "http://trino:8080"
# Каталог для использования (обязательно)
catalog: hive
# Схема для использования (обязательно)
schema: default
# Таблица для чтения (обязательно, если не указан query)
table: source_table
# Кастомный SQL запрос (опционально)
# Если указан, используется вместо чтения всей таблицы
query: "SELECT * FROM hive.default.source_table WHERE id > 100"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
# Используется для периодического чтения новых данных
pollInterval: 60
# Колонка для инкрементальной пагинации и стабильного ORDER BY (опционально, по умолчанию: id)
orderByColumn: price_id
# Лимит строк за один запрос в цикле опроса (опционально, 0 = без лимита)
readBatchSize: 1000
# Аутентификация Keycloak (опционально)
keycloak:
# Вариант 1: Использование долгоживущего токена напрямую (рекомендуется для долгоживущих токенов)
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Вариант 2: Использование OAuth2 flow (альтернатива прямому токену)
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Особенности Trino источника
- SQL запросы: При указании
queryиchangeTrackingColumnSQL оборачивается с composite checkpoint filter; иначе — толькоORDER BY orderByColumn(legacy) - Батчинг чтения:
readBatchSizeограничивает число строк за запрос в цикле опроса (как у PostgreSQL source) - Периодический опрос: Table mode использует composite checkpoint
(changeTrackingColumn, orderByColumn) > (lastTime, lastKey); legacylastReadID—WHERE orderByColumn > Nдо появления timestamp - Персистенция checkpoint:
lastReadChangeTimeиlastReadOrderByValueв ConfigMap; legacylastReadIDмигрируется при загрузке - Метаданные: Значение колонки
orderByColumnпопадает в metadataid - Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
- Прямой токен: Использование долгоживущего токена, полученного из Keycloak (рекомендуется для долгоживущих токенов)
- Password Grant: Использование username/password для аутентификации
- Client Credentials: Использование client ID/secret для сервис-к-сервис аутентификации
- Автоматическое обновление токенов: Токены автоматически обновляются до истечения срока действия (только для OAuth2 flow, не для прямых токенов)
Получение токена из Keycloak
Для использования долгоживущего токена вы можете получить его из Keycloak следующими способами:
Способ 1: Использование curl (Password Grant)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=password" \
-d "client_id=trino-client" \
-d "client_secret=client-secret" \
-d "username=trino-user" \
-d "password=trino-password"
Способ 2: Использование curl (Client Credentials)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=trino-client" \
-d "client_secret=client-secret"
Ответ будет содержать поле access_token. Используйте это значение токена в поле token конфигурации Keycloak.
Примечание: Для долгоживущих токенов настройте время жизни токена в настройках realm или клиента Keycloak.
- Метаданные: Каждое сообщение содержит метаданные:
- catalog - название каталога
- schema - название схемы
- table - название таблицы
Пример с кастомным запросом
source:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: analytics
query: |
SELECT
u.id,
u.email,
o.order_id,
o.total
FROM hive.analytics.users u
JOIN hive.analytics.orders o ON u.id = o.user_id
WHERE o.created_at > CURRENT_TIMESTAMP - INTERVAL '1' DAY
pollInterval: 300 # Опрос каждые 5 минут
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Приемник (Sink)
Конфигурация Trino приемника:
sink:
type: trino
config:
# URL сервера Trino (обязательно)
serverURL: "http://trino:8080"
# Каталог для использования (обязательно)
catalog: hive
# Схема для использования (обязательно)
schema: default
# Таблица для записи (обязательно)
table: target_table
# Размер батча для вставки (опционально, по умолчанию: 1). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Автоматическое создание таблицы (опционально, по умолчанию: false)
# Если true, создает таблицу с VARCHAR колонкой для JSON данных
autoCreateTable: true
# Режим rawMode (опционально, по умолчанию: false)
# При true создает таблицу с колонкой data VARCHAR; сообщения сохраняются как JSON-строка
# При false используется колоночный формат из сообщения
rawMode: true
# MERGE upsert для Iceberg-каталогов (имя catalog должно содержать "iceberg")
upsertMode: true
conflictKey: id
queryTimeoutSeconds: 600
# Аутентификация Keycloak (опционально)
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Особенности Trino приемника
- Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Upsert mode:
upsertMode: trueиспользуетMERGE INTOдля Iceberg-таблиц (conflictKeyобязателен). См. Отказоустойчивость. - Автосоздание таблиц: Автоматически создает таблицы, если они не существуют
- Режим rawMode: При
rawMode: trueсоздает таблицу с колонкойdata VARCHAR; сообщения сохраняются как JSON-строка. Приfalse(по умолчанию) — колоночный формат из сообщения - Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
- Автоматическое обновление токенов: Токены автоматически обновляются
Пример: Kafka → Trino с Keycloak
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: default
table: output_table
batchSize: 100
keycloak:
# Использование долгоживущего токена, полученного из Keycloak
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Альтернатива: Использование OAuth2 flow
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Пример: Trino с secrets
apiVersion: v1
kind: Secret
metadata:
name: trino-credentials
namespace: default
type: Opaque
stringData:
serverURL: "http://trino:8080"
catalog: "hive"
schema: "default"
table: "output_table"
---
apiVersion: v1
kind: Secret
metadata:
name: keycloak-credentials
namespace: default
type: Opaque
stringData:
serverURL: "https://keycloak.example.com"
realm: "myrealm"
clientID: "trino-client"
clientSecret: "client-secret"
username: "trino-user"
password: "trino-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino-secrets
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURLSecretRef:
name: trino-credentials
key: serverURL
catalogSecretRef:
name: trino-credentials
key: catalog
schemaSecretRef:
name: trino-credentials
key: schema
tableSecretRef:
name: trino-credentials
key: table
batchSize: 100
keycloak:
# Вариант 1: Использование токена из secret (рекомендуется для долгоживущих токенов)
tokenSecretRef:
name: keycloak-credentials
key: token
# Вариант 2: Использование OAuth2 flow из secrets (альтернатива)
# serverURLSecretRef:
# name: keycloak-credentials
# key: serverURL
# realmSecretRef:
# name: keycloak-credentials
# key: realm
# clientIDSecretRef:
# name: keycloak-credentials
# key: clientID
# clientSecretSecretRef:
# name: keycloak-credentials
# key: clientSecret
# usernameSecretRef:
# name: keycloak-credentials
# key: username
# passwordSecretRef:
# name: keycloak-credentials
# key: password
Nessie
Коннектор Nessie читает из таблиц Apache Iceberg и записывает в них через каталог Nessie (Iceberg REST API). Все операции выполняются в контексте ветки Nessie; метаданные и данные управляются каталогом.
Nessie Core API и Iceberg REST
- Nessie Core API (
/api/v1,/api/v2) отвечает за метаданные репозитория: ссылки, коммиты и content-объекты. - Iceberg REST каталог (
{baseURL}/iceberg[/{branch}][|{warehouse}]) используется DataFlow для чтения и записи табличных данных. - Физические файлы таблиц хранятся в настроенном warehouse/object storage (например, S3-совместимом), а Nessie хранит и версионирует каталог и указатели на данные.
Для административных операций по Core API см. OpenAPI-файл в корне репозитория: nessie-openapi-0.107.5.yaml.
Источник (Source)
source:
type: nessie
config:
# Базовый URL сервера Nessie (обязательно), например https://nessie:19120
baseURL: "http://nessie:19120"
# Ветка Nessie для чтения (опционально, по умолчанию: main)
branch: main
# Имя warehouse для хранилища (опционально)
warehouse: ""
# Namespace (схема) таблицы Iceberg (обязательно)
namespace: my_schema
# Имя таблицы (обязательно)
table: my_table
# Интервал опроса в секундах (опционально, по умолчанию: 10)
pollInterval: 10
# Инкрементальное чтение по Iceberg snapshot (опционально, по умолчанию: false)
# incrementalBySnapshot: true
# startSnapshotID: "1234567890123456789" # при первом запуске без checkpoint
# snapshotCheckpoints: true # по умолчанию
# Аутентификация (опционально)
# authenticationType: AUTO (по умолчанию) | BEARER | BASIC | NONE
authenticationType: BEARER
bearerToken: "your-token"
# Или из Secret:
# tokenSecretRef:
# name: nessie-credentials
# key: bearerToken
# Или Basic auth:
# basicAuth:
# username: user
# password: pass
# Или Basic auth из Secret:
# basicAuth:
# usernameSecretRef:
# name: nessie-credentials
# key: username
# passwordSecretRef:
# name: nessie-credentials
# key: password
Особенности Nessie источника
- Контекст ветки: Чтение из указанной ветки Nessie; метаданные таблицы берутся из каталога.
- Опрос: Периодическое сканирование таблицы Iceberg для новых данных.
- Полный scan (по умолчанию): На каждом poll выполняется полное сканирование текущего snapshot таблицы.
- Инкрементальный режим (
incrementalBySnapshot: true): Читаются только snapshot новее последнего подтверждённогоAck; позиция сохраняется в checkpoint ConfigMap (см. отказоустойчивость). Полеqueryв этом режиме не поддерживается. - Аутентификация: Bearer токен (OAuth2) или Basic auth для Nessie/Iceberg REST.
- Без
rawModeу источника: Каждая строка Iceberg отдаётся как JSON-объект с именами колонок (напримерdataи_metadata, если таблица создана sink сrawMode: true). Инкрементальный режим уменьшает дубликаты при рестарте, но не включает несколько подов процессора — см. Горизонтальное масштабирование ниже.
Приемник (Sink)
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
warehouse: ""
namespace: my_schema
table: my_table
# Размер батча для дозаписи (опционально, по умолчанию: 100). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Создать таблицу, если не существует (опционально)
autoCreateTable: true
# Режим rawMode (опционально, по умолчанию: false)
# При true создаёт таблицу с колонками data и _metadata (string); plain-сообщения оборачиваются с msg.Metadata
rawMode: false
authenticationType: BEARER
bearerToken: "your-token"
# Или basicAuth: { username, password }
# Опционально: объектное хранилище warehouse (S3-совместимое). Не путать с аутентификацией Nessie REST выше.
# Задавайте accessKeySecretRef и secretAccessKeySecretRef вместе.
# Если namespace не указан, считается namespace DataFlow и используется secretKeyRef в Pod (предпочтительно).
# Если указан другой namespace, оператор читает Secret при reconcile и подставляет литералы в env Deployment (аналогично другим коннекторам с секретами в ConfigMap по модели раскрытия в etcd API).
# s3Endpoint: "https://storage.yandexcloud.net"
# s3Region: "ru-central1"
# accessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_ACCESS_KEY_ID
# secretAccessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_SECRET_ACCESS_KEY
Особенности Nessie приемника
- Контекст ветки: Запись выполняется в указанную ветку Nessie через каталог.
- Батч-дозапись: Группировка сообщений и дозапись батчами в Iceberg; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Автосоздание таблицы: При
autoCreateTable: trueсоздаёт таблицу Iceberg при отсутствии. По умолчанию — одна колонкаdata(string). ПриrawMode: true— колонкиdataи_metadata(string) — см. rawMode и колонка_metadata. - Аутентификация: Аналогично источнику (Bearer или Basic).
- Объектное хранилище warehouse: Опциональные статические ключи (
accessKeySecretRef+secretAccessKeySecretRef) задают переменные iceberg-go / AWS SDK; при необходимости укажитеs3Endpointиs3Region(например Yandex Object Storage). В том же namespace, что DataFlow, используютсяsecretKeyRef; для другого namespace оператор подставляет значения в env Deployment. Изменение целевого Secret приводит к reconcile через watch.
Горизонтальное масштабирование (spec.replicas)
Nessie — polling-источник. Число подов процессора задаётся полем spec.replicas (по умолчанию 1).
spec.source.type |
spec.replicas |
Поведение |
|---|---|---|
kafka |
допускается > 1 |
Consumer group распределяет партиции топика между подами. Sink Nessie в таком пайплайне может работать на нескольких подах (каждый пишет батчи независимо). |
nessie (или другой polling-источник) |
только 1 или не задан |
Admission webhook отклоняет replicas > 1. Несколько подов с общим checkpoint ConfigMap дублируют чтение/запись. |
| любой (DataFlowCron) | только 1 или не задан |
Один Job процессора на тик расписания; replicas > 1 всегда запрещён. |
incrementalBySnapshot у Nessie-источника улучшает поведение при рестарте (checkpoint по Iceberg snapshot), но не заменяет горизонтальное масштабирование как у Kafka.
Для большей пропускной способности при записи в Nessie настраивайте batchSize, batchFlushIntervalSeconds и channelBufferSize, а не увеличивайте replicas. Подробнее: отказоустойчивость — горизонтальное масштабирование, DataFlow spec — replicas.
rawMode и колонка _metadata (только sink)
rawMode задаётся в sink (sink.config.rawMode). У Nessie-источника этого параметра нет.
Схема Iceberg при autoCreateTable: true и rawMode: true:
| Колонка | Тип Iceberg | Содержимое |
|---|---|---|
data |
string | JSON полезной нагрузки (тело сообщения) |
_metadata |
string | JSON с полями происхождения (offset, partition, topic Kafka и т.д.) |
В отличие от PostgreSQL sink (data / _metadata как JSONB), Nessie хранит оба поля как string с JSON-текстом внутри.
Как сообщение попадает в таблицу при записи
- Обычное сообщение —
msg.Data— тело;msg.Metadata(например от Kafka) сериализуется в_metadata:
{"id": 1, "event": "login"}
→ data = JSON тела, _metadata = {"offset":100,"partition":0,"topic":"events",...}
- Уже обёрнутое сообщение — в теле есть ключи
valueи_metadata(типично при цепочке из другого rawMode sink):
{"value": {"id": 1}, "_metadata": {"offset": 10, "topic": "t1"}}
→ внутренний value → колонка data, внутренний _metadata → колонка _metadata (как у Trino/PostgreSQL rawMode).
Существующие таблицы
- При
rawMode: trueнаConnectпроверяется наличие колонокdataи_metadata(без учёта регистра). Иначе процессор завершится с ошибкой. - Таблица без
_metadataнесовместима сrawMode: true, пока вы не пересоздадите её или не добавите колонку вручную.
Чтение обратно (Nessie source)
Источник не разворачивает rawMode: одна строка Iceberg → один JSON с именами колонок. Строка из rawMode-таблицы выглядит так:
{"data": "{\"id\":1}", "_metadata": "{\"offset\":100,\"topic\":\"events\"}"}
При необходимости распарсите data / _metadata на стороне downstream sink.
flattenMetadataColumns — плоские колонки метаданных (sink)
Поддерживается в PostgreSQL, Trino, ClickHouse и Nessie sink при rawMode: true.
При flattenMetadataColumns: true каждый ключ из msg.Metadata записывается в отдельную колонку вместо JSON в _metadata. Опциональный префикс flattenMetadataColumnsPrefix добавляется к имени ключа (например kafka_ + offset → kafka_offset).
| Параметр | Описание |
|---|---|
flattenMetadataColumns |
true — развернуть metadata в колонки |
flattenMetadataColumnsPrefix |
Префикс имён колонок (например kafka_) |
Схема при autoCreateTable: true: колонка data (string) + колонки по ключам metadata из первого батча (типы выводятся автоматически: int/long/string/bool/timestamp). Ключ metadata timestamp (например kafka_timestamp с префиксом kafka_) сохраняется как timestamptz (Kafka source передаёт time.Time в UTC). Строковые timestamp из старых пайплайнов парсятся при возможности. Колонка _metadata не создаётся. Новые ключи metadata в последующих батчах игнорируются (логируется предупреждение).
Таблица с колонкой _metadata несовместима с flattenMetadataColumns: true — пересоздайте таблицу или используйте новое имя.
Чтение (Nessie source): строки с колонками data и kafka_* (без _metadata) преобразуются в сообщения {"value": ..., "_metadata": ...}; поля metadata также попадают в msg.Metadata.
Пример: Kafka → Nessie (Iceberg)
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-nessie
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
namespace: analytics
table: events
batchSize: 100
autoCreateTable: true
rawMode: true
flattenMetadataColumns: true
flattenMetadataColumnsPrefix: kafka_ # kafka_offset, kafka_partition, ...
Iceberg (REST Catalog)
Коннектор Iceberg читает и записывает таблицы Apache Iceberg через официальный REST Catalog API. Используйте его для универсальных REST-каталогов (Polaris, Lakekeeper, Tabular, iceberg-rest и т.д.). Для веток Nessie используйте коннектор nessie.
Source
source:
type: iceberg
config:
catalogURI: "https://iceberg-catalog.example.com"
# prefix: production
# warehouse: main
namespace: my_schema
table: my_table
pollInterval: 10
authenticationType: BEARER
bearerToken: "your-token"
# oauth2ClientID / oauth2ClientSecret — альтернатива bearerToken
Sink
sink:
type: iceberg
config:
catalogURI: "https://iceberg-catalog.example.com"
namespace: my_schema
table: my_table
batchSize: 100
autoCreateTable: true
rawMode: true
authenticationType: BEARER
bearerToken: "your-token"
# s3Endpoint, s3Region, accessKeySecretRef, secretAccessKeySecretRef — для S3 warehouse
Поведение совпадает с Nessie: опрос, инкрементальные snapshot, батч-дозапись, rawMode, flattenMetadataColumns. Ключ checkpoint — iceberg. Для Iceberg source spec.replicas должен быть 1.
Пример: dataflow/config/samples/kafka-to-iceberg.yaml.
Error Sink
DataFlow Operator поддерживает настройку отдельного приёмника для сообщений, которые не удалось записать в основной sink. Секция errors использует те же типы коннекторов (Kafka, PostgreSQL, Trino, ClickHouse, Nessie, Iceberg), что и основной sink.
Конфигурация, структура сообщений об ошибках и типы ошибок описаны в разделе Обработка ошибок.
Использование Secrets в Kubernetes
DataFlow Operator поддерживает конфигурацию коннекторов из Kubernetes Secrets. Это позволяет безопасно хранить чувствительные данные (пароли, токены, connection strings) без их явного указания в спецификации DataFlow.
Обзор
Вместо указания значений напрямую в конфигурации, вы можете использовать ссылки на Kubernetes Secrets через поля *SecretRef. Оператор автоматически читает значения из secrets и подставляет их в конфигурацию коннекторов.
Структура SecretRef
Каждая ссылка на secret имеет следующую структуру:
secretRef:
name: my-secret # Имя secret (обязательно)
namespace: my-namespace # Namespace secret (опционально, по умолчанию - namespace DataFlow)
key: my-key # Ключ в secret (обязательно)
Поддерживаемые поля
Все коннекторы поддерживают ссылки на secrets для следующих полей:
Kafka
brokersSecretRef- список брокеров (через запятую)topicSecretRef- название топикаconsumerGroupSecretRef- consumer groupsasl.usernameSecretRef- имя пользователя SASLsasl.passwordSecretRef- пароль SASLtls.certSecretRef- сертификат клиентаtls.keySecretRef- приватный ключtls.caSecretRef- CA сертификатschemaRegistry.urlSecretRef- URL Schema RegistryschemaRegistry.basicAuth.usernameSecretRef- имя пользователя для Schema RegistryschemaRegistry.basicAuth.passwordSecretRef- пароль для Schema RegistryavroSchemaSecretRef- Avro схема из секрета (для статической схемы)
PostgreSQL
connectionStringSecretRef- строка подключенияtableSecretRef- название таблицы
PostgreSQL CDC (source)
connectionStringSecretRef- строка подключенияslotNameSecretRef- имя replication slotpublicationNameSecretRef- имя publication
ClickHouse
connectionStringSecretRef- строка подключенияtableSecretRef- название таблицы
Trino
serverURLSecretRef- URL сервера TrinocatalogSecretRef- название каталогаschemaSecretRef- название схемыtableSecretRef- название таблицыkeycloak.serverURLSecretRef- URL сервера Keycloakkeycloak.realmSecretRef- название realm в Keycloakkeycloak.clientIDSecretRef- OAuth2 client IDkeycloak.clientSecretSecretRef- OAuth2 client secretkeycloak.usernameSecretRef- имя пользователя для password grantkeycloak.passwordSecretRef- пароль для password grantkeycloak.tokenSecretRef- OAuth2 токен (для долгоживущих токенов)
Nessie
baseURLSecretRef- базовый URL сервера NessietokenSecretRef- Bearer токен для Nessie/Iceberg RESTnamespaceSecretRef- namespace (схема)tableSecretRef- название таблицы
Примеры использования
Пример 1: Kafka с SASL аутентификацией
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
namespace: default
type: Opaque
stringData:
brokers: "kafka1:9092,kafka2:9092,kafka3:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
username: "kafka-user"
password: "kafka-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-example
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
securityProtocol: SASL_PLAINTEXT
sasl:
mechanism: "scram-sha-256"
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
Пример 2: PostgreSQL с connection string из secret
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: default
type: Opaque
stringData:
connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-example
spec:
source:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Пример 3: Kafka → PostgreSQL с secrets
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
type: Opaque
stringData:
brokers: "localhost:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
---
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
type: Opaque
stringData:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres-secrets
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
sink:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Пример 4: TLS сертификаты из secrets
Для TLS конфигурации оператор автоматически определяет, является ли значение из secret путем к файлу или содержимым сертификата.
Определение типа значения:
- Если значение начинается с -----BEGIN (например, -----BEGIN CERTIFICATE----- или -----BEGIN PRIVATE KEY-----), оператор считает его содержимым сертификата и создает временный файл
- Если значение не начинается с -----BEGIN и существует как файл в файловой системе, оно используется как путь к файлу
- Если значение не начинается с -----BEGIN и файл не существует, оно также обрабатывается как содержимое сертификата
Поддерживаемые форматы: 1. Содержимое сертификата (PEM формат):
ca.crt: |
-----BEGIN CERTIFICATE-----
MIIDXTCCAkWgAwIBAgIJAK...
-----END CERTIFICATE-----
-
Base64-кодированное содержимое (в поле
dataсекрета):Kubernetes автоматически декодирует base64 при чтении из секрета.apiVersion: v1 kind: Secret metadata: name: kafka-tls-certs type: Opaque data: ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0t... # base64 -
Путь к файлу:
ca.crt: /etc/kafka/ca.crt
Пример с содержимым сертификата:
apiVersion: v1
kind: Secret
metadata:
name: kafka-tls-certs
type: Opaque
stringData:
# Содержимое CA сертификата
ca.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
# Содержимое клиентского сертификата
client.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
# Содержимое приватного ключа
client.key: |
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-tls-example
spec:
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
tls:
caSecretRef:
name: kafka-tls-certs
key: ca.crt
certSecretRef:
name: kafka-tls-certs
key: client.crt
keySecretRef:
name: kafka-tls-certs
key: client.key
Важно:
- Временные файлы создаются автоматически для содержимого сертификатов и удаляются после использования
- При использовании base64-кодированных значений в поле data, Kubernetes автоматически декодирует их при чтении
- Убедитесь, что сертификаты в правильном PEM формате с заголовками -----BEGIN и -----END
Приоритет значений
Если указаны и прямое значение, и SecretRef, приоритет имеет SecretRef. Значение из secret будет использовано вместо прямого значения.
Secrets в разных namespace
По умолчанию оператор ищет secrets в том же namespace, где находится DataFlow ресурс. Вы можете указать другой namespace:
connectionStringSecretRef:
name: postgres-credentials
namespace: shared-secrets
key: connectionString
Безопасность
- RBAC: Оператору требуются права на чтение secrets (
get,list,watch) - Изоляция: Secrets должны быть в том же namespace или в namespace, к которому у оператора есть доступ
- Временные файлы: Для TLS сертификатов оператор создает временные файлы, которые автоматически удаляются после использования
- Файлы создаются только для содержимого сертификатов (начинающихся с
-----BEGIN) - Файлы удаляются автоматически при завершении работы оператора
- Временные файлы создаются в системной временной директории с уникальными именами
Устранение неполадок
Secret не найден
# Проверьте существование secret
kubectl get secret <secret-name> -n <namespace>
# Проверьте права оператора
kubectl auth can-i get secrets --as=system:serviceaccount:default:dataflow-operator
Ключ не найден в secret
# Проверьте ключи в secret
kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data}' | jq 'keys'
Ошибка разрешения secrets
Проверьте логи оператора:
kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
Убедитесь, что: 1. Secret существует в указанном namespace 2. Указанный ключ существует в secret 3. У оператора есть права на чтение secrets
Проблемы с TLS сертификатами
Если возникают ошибки при работе с TLS сертификатами:
- Ошибка "file name too long": Убедитесь, что сертификат в секрете хранится правильно:
- Если используете
stringData, сертификат должен быть в PEM формате с заголовками-----BEGINи-----END - Если используете
data(base64), убедитесь, что значение корректно закодировано -
Оператор автоматически определяет содержимое сертификата по префиксу
-----BEGIN -
Ошибка "failed to read CA file": Проверьте формат сертификата:
Убедитесь, что сертификат начинается с# Проверьте содержимое секрета kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data.ca\.crt}' | base64 -d-----BEGIN CERTIFICATE----- -
Ошибка создания временного файла: Проверьте права оператора на создание файлов в временной директории
Рекомендации по производительности
Общие настройки spec
channelBufferSize
Размер буфера каналов между source, processor и sink (по умолчанию 100). При высокой нагрузке Kafka (десятки тысяч msg/s) увеличьте до 500–1000, чтобы снизить блокировки, когда sink медленнее source.
ackGranularity
Момент коммита offset / checkpoint относительно записи в sink: batch (по умолчанию) или message. При message batch sink сбрасывает по одному сообщению, Kafka source сразу коммитит offset. См. Отказоустойчивость — гранулярность ack.
checkpointSyncOnAck и checkpointSaveInterval
checkpointSyncOnAck: true сбрасывает checkpoint polling-источников в ConfigMap после ack sink (с coalescing по checkpointSaveInterval, по умолчанию 30s). Рекомендуется для migration и cron. Подробнее: Отказоустойчивость.
Kafka
- Используйте несколько брокеров для отказоустойчивости
- Настройте правильный размер consumer group для параллельной обработки
- Используйте батчи при записи для повышения throughput
PostgreSQL
- Увеличьте
batchSizeдля приемника (рекомендуется 50-100) - Используйте индексы на часто запрашиваемых полях
- Настройте
pollIntervalв зависимости от частоты обновления данных
Устранение неполадок
Проблемы с подключением
- Проверьте доступность источника данных из кластера
- Убедитесь, что credentials корректны
- Проверьте сетевые политики Kubernetes
- Для TLS проверьте сертификаты
Проблемы с производительностью
- Увеличьте
channelBufferSize(500–1000) при высокой нагрузке Kafka - Увеличьте размер батчей для приемников
- Настройте правильный
pollIntervalдля источников - Для Kafka увеличивайте
spec.replicas(не больше числа партиций топика). Для Nessie и других polling-источников оставляйтеreplicas: 1и настраивайтеbatchSize/channelBufferSize - Мониторьте метрики обработки сообщений
Логирование
Включите детальное логирование для отладки:
kubectl logs -l app.kubernetes.io/name=dataflow-operator -f --tail=100
Проверьте статус DataFlow ресурса:
kubectl describe dataflow <name>