Connectors
DataFlow Operator поддерживает различные коннекторы для источников и приемников данных. Каждый коннектор реализует стандартный интерфейс и может использоваться как источник (source) или приемник (sink) данных.
Обзор коннекторов
| Коннектор | Источник | Приемник | Особенности |
|---|---|---|---|
| Kafka | ✅ | ✅ | Consumer groups, TLS, SASL, Avro, Schema Registry |
| PostgreSQL | ✅ | ✅ | SQL запросы, батч-вставки, автосоздание таблиц, UPSERT режим |
| Trino | ✅ | ✅ | SQL запросы, аутентификация Keycloak OAuth2, батч-вставки |
| ClickHouse | ✅ | ✅ | Опрос таблиц, батч-вставки, автосоздание MergeTree таблиц |
| Nessie | ✅ | ✅ | Таблицы Iceberg через каталог Nessie, ветки, Basic/Bearer auth, опрос, батч-дозапись |
Kafka
Kafka коннектор поддерживает чтение и запись сообщений из/в топики Apache Kafka. Поддерживает consumer groups для масштабирования, TLS и SASL аутентификацию, а также работу с Avro форматом через Confluent Schema Registry или статическую схему.
Источник (Source)
Конфигурация Kafka источника:
source:
type: kafka
config:
# Список брокеров Kafka (обязательно)
brokers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
# Топик для чтения (обязательно)
topic: input-topic
# Consumer group (опционально, по умолчанию: dataflow-operator)
consumerGroup: my-group
# TLS конфигурация (опционально)
tls:
# Пропустить проверку сертификата (не рекомендуется для production)
insecureSkipVerify: false
# Путь к CA сертификату
caFile: /path/to/ca.crt
# Путь к клиентскому сертификату
certFile: /path/to/client.crt
# Путь к приватному ключу
keyFile: /path/to/client.key
# SASL аутентификация (опционально)
sasl:
# Механизм: plain, scram-sha-256, scram-sha-512
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
# Формат сообщений (опционально, по умолчанию: "json")
# Поддерживаемые форматы: "json", "avro"
format: json
# Конфигурация Avro (требуется если format: "avro")
# Вариант 1: Использование Confluent Schema Registry (рекомендуется)
schemaRegistry:
# URL Schema Registry (обязательно)
url: https://schema-registry:8081
# Basic Auth для Schema Registry (опционально)
basicAuth:
username: schema-user
password: schema-password
# TLS конфигурация для Schema Registry (опционально)
tls:
insecureSkipVerify: false
caFile: /path/to/schema-registry-ca.crt
# Вариант 2: Статическая Avro схема (альтернатива Schema Registry)
# avroSchema: |
# {
# "type": "record",
# "name": "MyRecord",
# "fields": [
# {"name": "id", "type": "long"},
# {"name": "name", "type": "string"}
# ]
# }
# Или путь к файлу со схемой:
# avroSchemaFile: /path/to/schema.avsc
Особенности Kafka источника
- Consumer Groups: Используйте разные consumer groups для масштабирования обработки
- Начальная позиция: По умолчанию читает с самого старого сообщения (
OffsetOldest) - Форматы данных: Поддерживает JSON (по умолчанию) и Avro форматы
- Avro поддержка:
- Confluent Schema Registry: Автоматическое получение схем по ID из сообщений (формат: magic byte + schema ID + data)
- Статическая схема: Использование предопределенной схемы из конфигурации или файла
- Кэширование схем: Схемы из Schema Registry кэшируются для повышения производительности
- Метаданные: Каждое сообщение содержит метаданные:
topic- название топикаpartition- номер партицииoffset- смещение сообщенияkey- ключ сообщения (если есть)
Пример с TLS и SASL
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
consumerGroup: secure-group
tls:
caFile: /etc/kafka/ca.crt
certFile: /etc/kafka/client.crt
keyFile: /etc/kafka/client.key
sasl:
mechanism: scram-sha-512
username: kafka-user
password: ${KAFKA_PASSWORD} # Используйте Secrets в Kubernetes
Пример с Avro и Schema Registry
source:
type: kafka
config:
brokers:
- kafka:9092
topic: avro-topic
consumerGroup: avro-group
format: avro
schemaRegistry:
url: https://schema-registry:8081
basicAuth:
username: schema-user
password: schema-password
tls:
insecureSkipVerify: true # Для self-signed сертификатов
# caFile: /path/to/ca.crt # Или укажите CA сертификат
Пример с Avro и статической схемой
source:
type: kafka
config:
brokers:
- kafka:9092
topic: avro-topic
consumerGroup: avro-group
format: avro
avroSchema: |
{
"type": "record",
"name": "Stock",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "long"},
{"name": "symbol", "type": "string"},
{"name": "price", "type": "double"},
{"name": "quantity", "type": "int"}
]
}
# Или используйте файл:
# avroSchemaFile: /path/to/schema.avsc
Приемник (Sink)
Конфигурация Kafka приемника:
sink:
type: kafka
config:
# Список брокеров Kafka (обязательно)
brokers:
- kafka1:9092
# Топик для записи (обязательно)
topic: output-topic
# TLS конфигурация (опционально, аналогично источнику)
tls:
caFile: /path/to/ca.crt
certFile: /path/to/client.crt
keyFile: /path/to/client.key
# SASL конфигурация (опционально, аналогично источнику)
sasl:
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
Особенности Kafka приемника
- Синхронная запись: Использует
SyncProducerдля гарантированной доставки - Ключи сообщений: Сохраняет ключи из метаданных сообщения
- Метаданные: После записи обновляет метаданные с
partitionиoffset
PostgreSQL
PostgreSQL коннектор поддерживает чтение из таблиц и запись в таблицы PostgreSQL. Поддерживает кастомные SQL запросы, периодический опрос, батч-вставки, CDC-стиль отслеживания изменений (INSERT и UPDATE), soft delete и SecretRef для учётных данных.
Источник (Source)
Конфигурация PostgreSQL источника:
source:
type: postgresql
config:
# Connection string (обязательно, или connectionStringSecretRef)
# Формат: postgres://user:password@host:port/dbname?sslmode=mode
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Таблица для чтения (обязательно, если не указан query). Поддерживает schema.table (напр. public.products)
table: source_table
# Кастомный SQL запрос (опционально)
# Если указан, используется вместо чтения всей таблицы
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
pollInterval: 60
# Опции CDC-стиля (опционально)
readBatchSize: 1000 # Ограничение строк за один опрос (0 = без лимита)
changeTrackingColumn: updated_at # Колонка для отслеживания изменений (по умолчанию: updated_at). Не используется при заданном query
autoCreateTable: true # Создать таблицу, если не существует, перед чтением
# SecretRef (опционально)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Особенности PostgreSQL источника
- Периодический опрос: Регулярно опрашивает таблицу на наличие новых данных
- Кастомные запросы: Поддержка сложных SQL запросов с JOIN, WHERE, и т.д.
- Метаданные: Каждое сообщение содержит метаданные
tableиoperation(insert/update) - Размер батча чтения: Ограничивает количество строк за один опрос для снижения нагрузки на БД
- Отслеживание обновлений: По умолчанию отслеживает изменения по колонке
updated_at(илиchangeTrackingColumn), захватывает INSERT и UPDATE - Автосоздание таблицы: При
autoCreateTable: trueсоздаёт таблицу с CDC-совместимой схемой (id SERIAL PRIMARY KEY,created_at,updated_at), если не существует. Создание выполняется при подключении (Connect). - Схема таблицы: Имя таблицы поддерживает формат
schema.table(напр.public.products) - Персистенция checkpoint: По умолчанию позиция чтения (lastReadChangeTime) сохраняется в ConfigMap и при перезапуске чтение продолжается с последней позиции. Задайте
checkpointPersistence: falseв spec, чтобы хранить только в памяти. Для pg→pg включитеupsertMode: trueв sink, чтобы дубликаты обновлялись, а не вставлялись повторно.
Пример с кастомным запросом
source:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
query: |
SELECT
u.id,
u.email,
o.order_id,
o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.created_at > NOW() - INTERVAL '1 day'
pollInterval: 300 # Опрос каждые 5 минут
Приемник (Sink)
Конфигурация PostgreSQL приемника:
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Таблица для записи. Поддерживает schema.table (напр. public.products_clone)
table: target_table
# Размер батча (опционально, по умолчанию: 1). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
autoCreateTable: true
# Режим rawMode (опционально, по умолчанию: false)
# При true ожидает сообщения в формате {"value": <данные>, "_metadata": {...}}
# Таблица создаётся с колонками value JSONB и _metadata JSONB
rawMode: false
# Режим UPSERT (опционально, по умолчанию: false)
upsertMode: true
conflictKey: "id"
# Колонка для soft delete (опционально)
softDeleteColumn: "deleted_at"
# SecretRef (опционально)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Особенности PostgreSQL приемника
- Батч-вставки: Группирует сообщения для эффективной записи. Сброс при достижении
batchSizeили по таймеру. Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Автосоздание таблиц (при
autoCreateTable: true): - rawMode: true — таблица создаётся при подключении (Connect). Схема:
id SERIAL PRIMARY KEY,valueJSONB,_metadataJSONB,created_at,updated_at,deleted_at, GIN индекс поvalue - rawMode: false — таблица создаётся при первой записи из структуры первого сообщения (реплицирует схему источника). Типы колонок выводятся автоматически (TEXT, BIGINT, NUMERIC, JSONB и т.д.)
- Режим rawMode: При true ожидает
{"value": <данные>, "_metadata": {...}}и создаёт таблицу с колонками value/_metadata. При false использует колоночный формат из сообщения - UPSERT режим: Обновляет существующие записи при конфликте по PRIMARY KEY или указанному
conflictKey - Soft delete: При заданном
softDeleteColumnиmetadata.operation=deleteвыполняетUPDATE ... SET deleted_at = NOW()вместо физического DELETE
Пример с автосозданием таблицы (rawMode)
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
table: events
autoCreateTable: true
rawMode: true # Таблица с value и _metadata
batchSize: 50
При rawMode: true и autoCreateTable: true таблица создаётся со структурой:
CREATE TABLE events (
id SERIAL PRIMARY KEY,
value JSONB NOT NULL,
_metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP
);
CREATE INDEX idx_events_value ON events USING GIN (value);
При rawMode: false структура таблицы выводится из первого сообщения (колоночный формат).
Пример с UPSERT для обновления существующих записей
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/analytics"
table: orders
upsertMode: true # Включает обновление существующих записей
conflictKey: "id" # Использует поле 'id' для определения конфликта
batchSize: 100
При включенном upsertMode:
- Если запись с таким же id (или указанным conflictKey) уже существует, она будет обновлена
- Если записи нет, она будет вставлена
- Это особенно полезно для синхронизации данных, когда источник периодически отправляет обновленные записи
Важно: Для работы UPSERT таблица должна иметь PRIMARY KEY или UNIQUE constraint на указанном conflictKey.
ClickHouse
ClickHouse коннектор поддерживает чтение из таблиц и запись в таблицы ClickHouse. Поддерживает периодический опрос для инкрементального чтения, кастомные SQL запросы, батч-вставки и автосоздание MergeTree таблиц.
Источник (Source)
source:
type: clickhouse
config:
# Connection string (обязательно)
# Native: clickhouse://host:9000?username=default&password=xxx&database=default
# HTTP: http://host:8123/default?username=default&password=xxx
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
# Таблица для чтения (обязательно, если не указан query)
table: source_table
# Кастомный SQL запрос (опционально)
query: "SELECT * FROM source_table WHERE id > 100"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
pollInterval: 60
Особенности ClickHouse источника
- Опрос: Периодически опрашивает таблицу на наличие новых данных (интервал задаётся
pollInterval) - Инкрементальное чтение: При отсутствии
queryиспользует колонкиidилиcreated_atдля фильтрации уже прочитанных строк (WHERE id > lastReadIDилиWHERE created_at > lastReadTime), что избегает дубликатов при перезапуске - Кастомные запросы: При указании
queryиспользуется только он; инкрементальная логика не применяется - Метаданные: Каждое сообщение содержит
tableиid(если колонка есть)
Приемник (Sink)
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
table: output_table
# Размер батча для вставок (опционально, по умолчанию: 100). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Автосоздание таблицы (опционально)
autoCreateTable: true
# rawMode: false (по умолчанию) — создаёт таблицу по структуре первого сообщения (колоночный формат, репликация схемы источника)
# rawMode: true — создаёт MergeTree таблицу с колонками data String, created_at (JSON-хранилище)
rawMode: false
Особенности ClickHouse приемника
- Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Retry при транзиентных ошибках: Batch-запись автоматически повторяется при connection refused, TOO_MANY_PARTS, memory limit, HTTP 502/503 и подобных ошибках (до 5 попыток с экспоненциальным backoff)
- Автосоздание таблиц (при
autoCreateTable: true): - rawMode: true — таблица создаётся при подключении (Connect). Схема:
data String,created_at DateTime DEFAULT now(), движок MergeTree, ORDER BY created_at. Сообщения сохраняются как JSON-строка в колонкеdata - rawMode: false — таблица создаётся при первой записи из структуры первого сообщения. Типы колонок выводятся автоматически (String, Int32/Int64, Float64, Decimal, DateTime и т.д.). Поддерживается формат
{"value": {...}, "_metadata": {...}}— используются поля изvalue - Режим rawMode: При true ожидает/хранит JSON в колонке
data. При false использует колоночный формат из сообщения (INSERT с именованными колонками)
Отказоустойчивость ClickHouse
- Error sink: Используйте
spec.errorsс ClickHouse (или Kafka) для сохранения неудачных сообщений. См. Обработка ошибок. - Connection string: Рекомендуемые параметры:
dial_timeout=10s,max_execution_time=60(например,clickhouse://host:9000/default?dial_timeout=10s&max_execution_time=60). - Настройка batch: Для пропускной способности и устойчивости используйте
batchSize100–1000 иbatchFlushIntervalSeconds5–10. Большие батчи снижают частоту вставок и помогают избежать TOO_MANY_PARTS. - TOO_MANY_PARTS: При этой ошибке увеличьте
batchSize, уменьшите частоту flush или настройте merge в ClickHouse (background_pool_size,parts_to_throw_insert). РассмотритеReplacingMergeTreeдля дедупликации. См. Отказоустойчивость.
Пример: Kafka → ClickHouse
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-clickhouse
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default"
table: output_table
batchSize: 100
autoCreateTable: true
Пример: ClickHouse → ClickHouse (репликация схемы)
При autoCreateTable: true и rawMode: false целевая таблица создаётся автоматически по структуре источника:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: clickhouse-to-clickhouse
spec:
source:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products
pollInterval: 5
sink:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products_clone
batchSize: 100
autoCreateTable: true
Trino
Trino коннектор поддерживает чтение из таблиц и запись в таблицы Trino (ранее PrestoSQL). Поддерживает SQL запросы, аутентификацию через Keycloak OAuth2/OIDC и батч-вставки.
Источник (Source)
Конфигурация Trino источника:
source:
type: trino
config:
# URL сервера Trino (обязательно)
serverURL: "http://trino:8080"
# Каталог для использования (обязательно)
catalog: hive
# Схема для использования (обязательно)
schema: default
# Таблица для чтения (обязательно, если не указан query)
table: source_table
# Кастомный SQL запрос (опционально)
# Если указан, используется вместо чтения всей таблицы
query: "SELECT * FROM hive.default.source_table WHERE id > 100"
# Интервал опроса в секундах (опционально, по умолчанию: 5)
# Используется для периодического чтения новых данных
pollInterval: 60
# Аутентификация Keycloak (опционально)
keycloak:
# Вариант 1: Использование долгоживущего токена напрямую (рекомендуется для долгоживущих токенов)
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Вариант 2: Использование OAuth2 flow (альтернатива прямому токену)
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Особенности Trino источника
- SQL запросы: Поддержка кастомных SQL запросов с WHERE, JOIN и т.д.
- Периодический опрос: Регулярно опрашивает таблицы на наличие новых данных
- Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
- Прямой токен: Использование долгоживущего токена, полученного из Keycloak (рекомендуется для долгоживущих токенов)
- Password Grant: Использование username/password для аутентификации
- Client Credentials: Использование client ID/secret для сервис-к-сервис аутентификации
- Автоматическое обновление токенов: Токены автоматически обновляются до истечения срока действия (только для OAuth2 flow, не для прямых токенов)
Получение токена из Keycloak
Для использования долгоживущего токена вы можете получить его из Keycloak следующими способами:
Способ 1: Использование curl (Password Grant)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=password" \
-d "client_id=trino-client" \
-d "client_secret=client-secret" \
-d "username=trino-user" \
-d "password=trino-password"
Способ 2: Использование curl (Client Credentials)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=trino-client" \
-d "client_secret=client-secret"
Ответ будет содержать поле access_token. Используйте это значение токена в поле token конфигурации Keycloak.
Примечание: Для долгоживущих токенов настройте время жизни токена в настройках realm или клиента Keycloak.
- Метаданные: Каждое сообщение содержит метаданные:
- catalog - название каталога
- schema - название схемы
- table - название таблицы
Пример с кастомным запросом
source:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: analytics
query: |
SELECT
u.id,
u.email,
o.order_id,
o.total
FROM hive.analytics.users u
JOIN hive.analytics.orders o ON u.id = o.user_id
WHERE o.created_at > CURRENT_TIMESTAMP - INTERVAL '1' DAY
pollInterval: 300 # Опрос каждые 5 минут
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Приемник (Sink)
Конфигурация Trino приемника:
sink:
type: trino
config:
# URL сервера Trino (обязательно)
serverURL: "http://trino:8080"
# Каталог для использования (обязательно)
catalog: hive
# Схема для использования (обязательно)
schema: default
# Таблица для записи (обязательно)
table: target_table
# Размер батча для вставки (опционально, по умолчанию: 1). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Автоматическое создание таблицы (опционально, по умолчанию: false)
# Если true, создает таблицу с VARCHAR колонкой для JSON данных
autoCreateTable: true
# Режим rawMode (опционально, по умолчанию: false)
# При true создает таблицу с колонкой data VARCHAR; сообщения сохраняются как JSON-строка
# При false используется колоночный формат из сообщения
rawMode: true
# Аутентификация Keycloak (опционально)
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Особенности Trino приемника
- Батч-вставки: Группирует сообщения; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Автосоздание таблиц: Автоматически создает таблицы, если они не существуют
- Режим rawMode: При
rawMode: trueсоздает таблицу с колонкойdata VARCHAR; сообщения сохраняются как JSON-строка. Приfalse(по умолчанию) — колоночный формат из сообщения - Аутентификация Keycloak: OAuth2/OIDC аутентификация через Keycloak
- Автоматическое обновление токенов: Токены автоматически обновляются
Пример: Kafka → Trino с Keycloak
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: default
table: output_table
batchSize: 100
keycloak:
# Использование долгоживущего токена, полученного из Keycloak
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Альтернатива: Использование OAuth2 flow
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Пример: Trino с secrets
apiVersion: v1
kind: Secret
metadata:
name: trino-credentials
namespace: default
type: Opaque
stringData:
serverURL: "http://trino:8080"
catalog: "hive"
schema: "default"
table: "output_table"
---
apiVersion: v1
kind: Secret
metadata:
name: keycloak-credentials
namespace: default
type: Opaque
stringData:
serverURL: "https://keycloak.example.com"
realm: "myrealm"
clientID: "trino-client"
clientSecret: "client-secret"
username: "trino-user"
password: "trino-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino-secrets
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURLSecretRef:
name: trino-credentials
key: serverURL
catalogSecretRef:
name: trino-credentials
key: catalog
schemaSecretRef:
name: trino-credentials
key: schema
tableSecretRef:
name: trino-credentials
key: table
batchSize: 100
keycloak:
# Вариант 1: Использование токена из secret (рекомендуется для долгоживущих токенов)
tokenSecretRef:
name: keycloak-credentials
key: token
# Вариант 2: Использование OAuth2 flow из secrets (альтернатива)
# serverURLSecretRef:
# name: keycloak-credentials
# key: serverURL
# realmSecretRef:
# name: keycloak-credentials
# key: realm
# clientIDSecretRef:
# name: keycloak-credentials
# key: clientID
# clientSecretSecretRef:
# name: keycloak-credentials
# key: clientSecret
# usernameSecretRef:
# name: keycloak-credentials
# key: username
# passwordSecretRef:
# name: keycloak-credentials
# key: password
Nessie
Коннектор Nessie читает из таблиц Apache Iceberg и записывает в них через каталог Nessie (Iceberg REST API). Все операции выполняются в контексте ветки Nessie; метаданные и данные управляются каталогом.
Источник (Source)
source:
type: nessie
config:
# Базовый URL сервера Nessie (обязательно), например https://nessie:19120
baseURL: "http://nessie:19120"
# Ветка Nessie для чтения (опционально, по умолчанию: main)
branch: main
# Имя warehouse для хранилища (опционально)
warehouse: ""
# Namespace (схема) таблицы Iceberg (обязательно)
namespace: my_schema
# Имя таблицы (обязательно)
table: my_table
# Интервал опроса в секундах (опционально, по умолчанию: 10)
pollInterval: 10
# Аутентификация (опционально)
bearerToken: "your-token"
# Или Basic auth:
# basicAuth:
# username: user
# password: pass
Особенности Nessie источника
- Контекст ветки: Чтение из указанной ветки Nessie; метаданные таблицы берутся из каталога.
- Опрос: Периодическое сканирование таблицы Iceberg для новых данных.
- Аутентификация: Bearer токен (OAuth2) или Basic auth для Nessie/Iceberg REST.
Приемник (Sink)
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
warehouse: ""
namespace: my_schema
table: my_table
# Размер батча для дозаписи (опционально, по умолчанию: 100). 0 — сброс только по таймеру
batchSize: 100
# Интервал сброса батча в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
batchFlushIntervalSeconds: 10
# Создать таблицу, если не существует (опционально); создаётся таблица с одной колонкой "data" (string)
autoCreateTable: true
bearerToken: "your-token"
# Или basicAuth: { username, password }
Особенности Nessie приемника
- Контекст ветки: Запись выполняется в указанную ветку Nessie через каталог.
- Батч-дозапись: Группировка сообщений и дозапись батчами в Iceberg; по умолчанию сброс при достижении
batchSizeили по таймеру (10 с). Только по размеру:batchFlushIntervalSeconds: 0. Только по времени:batchSize: 0 - Автосоздание таблицы: Создание таблицы Iceberg с одной колонкой
data(string) для JSON при отсутствии таблицы. - Аутентификация: Аналогично источнику (Bearer или Basic).
Пример: Kafka → Nessie (Iceberg)
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-nessie
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
namespace: analytics
table: events
batchSize: 100
autoCreateTable: true
Error Sink
DataFlow Operator поддерживает настройку отдельного приёмника для сообщений, которые не удалось записать в основной sink. Секция errors использует те же типы коннекторов (Kafka, PostgreSQL, Trino, ClickHouse, Nessie), что и основной sink.
Конфигурация, структура сообщений об ошибках и типы ошибок описаны в разделе Обработка ошибок.
Использование Secrets в Kubernetes
DataFlow Operator поддерживает конфигурацию коннекторов из Kubernetes Secrets. Это позволяет безопасно хранить чувствительные данные (пароли, токены, connection strings) без их явного указания в спецификации DataFlow.
Обзор
Вместо указания значений напрямую в конфигурации, вы можете использовать ссылки на Kubernetes Secrets через поля *SecretRef. Оператор автоматически читает значения из secrets и подставляет их в конфигурацию коннекторов.
Структура SecretRef
Каждая ссылка на secret имеет следующую структуру:
secretRef:
name: my-secret # Имя secret (обязательно)
namespace: my-namespace # Namespace secret (опционально, по умолчанию - namespace DataFlow)
key: my-key # Ключ в secret (обязательно)
Поддерживаемые поля
Все коннекторы поддерживают ссылки на secrets для следующих полей:
Kafka
brokersSecretRef- список брокеров (через запятую)topicSecretRef- название топикаconsumerGroupSecretRef- consumer groupsasl.usernameSecretRef- имя пользователя SASLsasl.passwordSecretRef- пароль SASLtls.certSecretRef- сертификат клиентаtls.keySecretRef- приватный ключtls.caSecretRef- CA сертификатschemaRegistry.urlSecretRef- URL Schema RegistryschemaRegistry.basicAuth.usernameSecretRef- имя пользователя для Schema RegistryschemaRegistry.basicAuth.passwordSecretRef- пароль для Schema RegistryavroSchemaSecretRef- Avro схема из секрета (для статической схемы)
PostgreSQL
connectionStringSecretRef- строка подключенияtableSecretRef- название таблицы
ClickHouse
connectionStringSecretRef- строка подключенияtableSecretRef- название таблицы
Trino
serverURLSecretRef- URL сервера TrinocatalogSecretRef- название каталогаschemaSecretRef- название схемыtableSecretRef- название таблицыkeycloak.serverURLSecretRef- URL сервера Keycloakkeycloak.realmSecretRef- название realm в Keycloakkeycloak.clientIDSecretRef- OAuth2 client IDkeycloak.clientSecretSecretRef- OAuth2 client secretkeycloak.usernameSecretRef- имя пользователя для password grantkeycloak.passwordSecretRef- пароль для password grantkeycloak.tokenSecretRef- OAuth2 токен (для долгоживущих токенов)
Nessie
baseURLSecretRef- базовый URL сервера NessietokenSecretRef- Bearer токен для Nessie/Iceberg RESTnamespaceSecretRef- namespace (схема)tableSecretRef- название таблицы
Примеры использования
Пример 1: Kafka с SASL аутентификацией
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
namespace: default
type: Opaque
stringData:
brokers: "kafka1:9092,kafka2:9092,kafka3:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
username: "kafka-user"
password: "kafka-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-example
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
sasl:
mechanism: "scram-sha-256"
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
Пример 2: PostgreSQL с connection string из secret
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: default
type: Opaque
stringData:
connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-example
spec:
source:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Пример 3: Kafka → PostgreSQL с secrets
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
type: Opaque
stringData:
brokers: "localhost:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
---
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
type: Opaque
stringData:
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres-secrets
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
sink:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Пример 4: TLS сертификаты из secrets
Для TLS конфигурации оператор автоматически определяет, является ли значение из secret путем к файлу или содержимым сертификата.
Определение типа значения:
- Если значение начинается с -----BEGIN (например, -----BEGIN CERTIFICATE----- или -----BEGIN PRIVATE KEY-----), оператор считает его содержимым сертификата и создает временный файл
- Если значение не начинается с -----BEGIN и существует как файл в файловой системе, оно используется как путь к файлу
- Если значение не начинается с -----BEGIN и файл не существует, оно также обрабатывается как содержимое сертификата
Поддерживаемые форматы: 1. Содержимое сертификата (PEM формат):
ca.crt: |
-----BEGIN CERTIFICATE-----
MIIDXTCCAkWgAwIBAgIJAK...
-----END CERTIFICATE-----
-
Base64-кодированное содержимое (в поле
dataсекрета):Kubernetes автоматически декодирует base64 при чтении из секрета.apiVersion: v1 kind: Secret metadata: name: kafka-tls-certs type: Opaque data: ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0t... # base64 -
Путь к файлу:
ca.crt: /etc/kafka/ca.crt
Пример с содержимым сертификата:
apiVersion: v1
kind: Secret
metadata:
name: kafka-tls-certs
type: Opaque
stringData:
# Содержимое CA сертификата
ca.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
# Содержимое клиентского сертификата
client.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
# Содержимое приватного ключа
client.key: |
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-tls-example
spec:
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
tls:
caSecretRef:
name: kafka-tls-certs
key: ca.crt
certSecretRef:
name: kafka-tls-certs
key: client.crt
keySecretRef:
name: kafka-tls-certs
key: client.key
Важно:
- Временные файлы создаются автоматически для содержимого сертификатов и удаляются после использования
- При использовании base64-кодированных значений в поле data, Kubernetes автоматически декодирует их при чтении
- Убедитесь, что сертификаты в правильном PEM формате с заголовками -----BEGIN и -----END
Приоритет значений
Если указаны и прямое значение, и SecretRef, приоритет имеет SecretRef. Значение из secret будет использовано вместо прямого значения.
Secrets в разных namespace
По умолчанию оператор ищет secrets в том же namespace, где находится DataFlow ресурс. Вы можете указать другой namespace:
connectionStringSecretRef:
name: postgres-credentials
namespace: shared-secrets
key: connectionString
Безопасность
- RBAC: Оператору требуются права на чтение secrets (
get,list,watch) - Изоляция: Secrets должны быть в том же namespace или в namespace, к которому у оператора есть доступ
- Временные файлы: Для TLS сертификатов оператор создает временные файлы, которые автоматически удаляются после использования
- Файлы создаются только для содержимого сертификатов (начинающихся с
-----BEGIN) - Файлы удаляются автоматически при завершении работы оператора
- Временные файлы создаются в системной временной директории с уникальными именами
Устранение неполадок
Secret не найден
# Проверьте существование secret
kubectl get secret <secret-name> -n <namespace>
# Проверьте права оператора
kubectl auth can-i get secrets --as=system:serviceaccount:default:dataflow-operator
Ключ не найден в secret
# Проверьте ключи в secret
kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data}' | jq 'keys'
Ошибка разрешения secrets
Проверьте логи оператора:
kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
Убедитесь, что: 1. Secret существует в указанном namespace 2. Указанный ключ существует в secret 3. У оператора есть права на чтение secrets
Проблемы с TLS сертификатами
Если возникают ошибки при работе с TLS сертификатами:
- Ошибка "file name too long": Убедитесь, что сертификат в секрете хранится правильно:
- Если используете
stringData, сертификат должен быть в PEM формате с заголовками-----BEGINи-----END - Если используете
data(base64), убедитесь, что значение корректно закодировано -
Оператор автоматически определяет содержимое сертификата по префиксу
-----BEGIN -
Ошибка "failed to read CA file": Проверьте формат сертификата:
Убедитесь, что сертификат начинается с# Проверьте содержимое секрета kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data.ca\.crt}' | base64 -d-----BEGIN CERTIFICATE----- -
Ошибка создания временного файла: Проверьте права оператора на создание файлов в временной директории
Рекомендации по производительности
Общие настройки spec
channelBufferSize
Размер буфера каналов между source, processor и sink (по умолчанию 100). При высокой нагрузке Kafka (десятки тысяч msg/s) увеличьте до 500–1000, чтобы снизить блокировки, когда sink медленнее source.
Kafka
- Используйте несколько брокеров для отказоустойчивости
- Настройте правильный размер consumer group для параллельной обработки
- Используйте батчи при записи для повышения throughput
PostgreSQL
- Увеличьте
batchSizeдля приемника (рекомендуется 50-100) - Используйте индексы на часто запрашиваемых полях
- Настройте
pollIntervalв зависимости от частоты обновления данных
Устранение неполадок
Проблемы с подключением
- Проверьте доступность источника данных из кластера
- Убедитесь, что credentials корректны
- Проверьте сетевые политики Kubernetes
- Для TLS проверьте сертификаты
Проблемы с производительностью
- Увеличьте
channelBufferSize(500–1000) при высокой нагрузке Kafka - Увеличьте размер батчей для приемников
- Настройте правильный
pollIntervalдля источников - Используйте несколько инстансов оператора для масштабирования
- Мониторьте метрики обработки сообщений
Логирование
Включите детальное логирование для отладки:
kubectl logs -l app.kubernetes.io/name=dataflow-operator -f --tail=100
Проверьте статус DataFlow ресурса:
kubectl describe dataflow <name>