Connectors
DataFlow Operator supports various connectors for data sources and sinks. Each connector implements a standard interface and can be used as a data source (source) or sink.
Connector Overview
| Connector | Source | Sink | Features |
|---|---|---|---|
| Kafka | ✅ | ✅ | Consumer groups, TLS, SASL, Avro, Schema Registry |
| PostgreSQL | ✅ | ✅ | SQL queries, batch inserts, auto-create tables, UPSERT mode |
| PostgreSQL CDC | ✅ | — | Logical replication (pgoutput), initial snapshot, LSN checkpoint |
| Trino | ✅ | ✅ | SQL queries, Keycloak OAuth2 authentication, batch inserts |
| ClickHouse | ✅ | ✅ | Polling, batch inserts, auto-create MergeTree tables |
| Nessie | ✅ | ✅ | Iceberg via Nessie catalog, polling, batch appends, sink rawMode (data + _metadata); spec.replicas must be 1 |
| Iceberg | ✅ | ✅ | Apache Iceberg via official REST Catalog API; polling, batch appends, Bearer/Basic/OAuth2, optional S3 credentials; spec.replicas must be 1 |
Using Kubernetes Secrets
DataFlow Operator supports configuring connectors from Kubernetes Secrets. This allows secure storage of sensitive data (passwords, tokens, connection strings) without explicitly specifying them in the DataFlow specification.
Overview
Instead of specifying values directly in the configuration, you can use references to Kubernetes Secrets through *SecretRef fields. The operator automatically reads values from secrets and substitutes them into connector configuration.
SecretRef Structure
Each secret reference has the following structure:
secretRef:
name: my-secret # Secret name (required)
namespace: my-namespace # Secret namespace (optional, defaults to DataFlow namespace)
key: my-key # Key in secret (required)
Supported Fields
All connectors support secret references for the following fields:
Kafka
brokersSecretRef- broker list (comma-separated)topicSecretRef- topic nameconsumerGroupSecretRef- consumer groupsasl.usernameSecretRef- SASL usernamesasl.passwordSecretRef- SASL passwordtls.certSecretRef- client certificatetls.keySecretRef- private keytls.caSecretRef- CA certificateschemaRegistry.urlSecretRef- Schema Registry URLschemaRegistry.basicAuth.usernameSecretRef- Schema Registry usernameschemaRegistry.basicAuth.passwordSecretRef- Schema Registry passwordavroSchemaSecretRef- Avro schema from secret (for static schema)
PostgreSQL
connectionStringSecretRef- connection stringtableSecretRef- table name
PostgreSQL CDC (source)
connectionStringSecretRef- connection stringslotNameSecretRef- replication slot namepublicationNameSecretRef- publication name
ClickHouse
connectionStringSecretRef- connection stringtableSecretRef- table name
Trino
serverURLSecretRef- Trino server URLcatalogSecretRef- catalog nameschemaSecretRef- schema nametableSecretRef- table namekeycloak.serverURLSecretRef- Keycloak server URLkeycloak.realmSecretRef- Keycloak realm namekeycloak.clientIDSecretRef- OAuth2 client IDkeycloak.clientSecretSecretRef- OAuth2 client secretkeycloak.usernameSecretRef- username for password grantkeycloak.passwordSecretRef- password for password grantkeycloak.tokenSecretRef- OAuth2 token (for long-lived tokens)
Nessie
baseURLSecretRef- Nessie server base URLtokenSecretRef- Bearer token for Nessie/Iceberg RESTnamespaceSecretRef- namespace (schema) nametableSecretRef- table name
Usage Examples
Example 1: Kafka with SASL Authentication
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
namespace: default
type: Opaque
stringData:
brokers: "kafka1:9092,kafka2:9092,kafka3:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
username: "kafka-user"
password: "kafka-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-example
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
securityProtocol: SASL_PLAINTEXT
sasl:
mechanism: "scram-sha-256"
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
Example 2: PostgreSQL with Connection String from Secret
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: default
type: Opaque
stringData:
connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-example
spec:
source:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Priority of Values
If both a direct value and SecretRef are specified, SecretRef takes priority. The value from the secret will be used instead of the direct value.
Secrets in Different Namespaces
By default, the operator looks for secrets in the same namespace where the DataFlow resource is located. You can specify a different namespace:
connectionStringSecretRef:
name: postgres-credentials
namespace: shared-secrets
key: connectionString
TLS Certificates from Secrets
For TLS configuration, the operator automatically determines whether the value from the secret is a file path or certificate content.
How it works:
- If the value starts with -----BEGIN (e.g., -----BEGIN CERTIFICATE----- or -----BEGIN PRIVATE KEY-----), the operator recognizes it as certificate content and creates a temporary file
- If the value doesn't start with -----BEGIN and exists as a file, it's used as a file path
- If the value doesn't start with -----BEGIN and the file doesn't exist, it's also treated as certificate content
Supported formats:
1. Certificate content (PEM format) - stored in stringData or decoded from data
2. Base64-encoded content - stored in secret's data field (Kubernetes automatically decodes it)
3. File path - path to an existing certificate file
Example:
apiVersion: v1
kind: Secret
metadata:
name: kafka-tls-certs
type: Opaque
stringData:
ca.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
client.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
client.key: |
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-tls-example
spec:
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
tls:
caSecretRef:
name: kafka-tls-certs
key: ca.crt
certSecretRef:
name: kafka-tls-certs
key: client.crt
keySecretRef:
name: kafka-tls-certs
key: client.key
Important:
- Temporary files are automatically created for certificate content and cleaned up after use
- When using base64-encoded values in the data field, Kubernetes automatically decodes them when reading
- Ensure certificates are in proper PEM format with -----BEGIN and -----END headers
Security
- RBAC: The operator requires permissions to read secrets (
get,list,watch) - Isolation: Secrets should be in the same namespace or in a namespace the operator has access to
- Temporary Files: For TLS certificates, the operator creates temporary files that are automatically deleted
Troubleshooting
Secret Not Found
# Check if secret exists
kubectl get secret <secret-name> -n <namespace>
# Check operator permissions
kubectl auth can-i get secrets --as=system:serviceaccount:default:dataflow-operator
Key Not Found in Secret
# Check keys in secret
kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data}' | jq 'keys'
Secret Resolution Error
Check operator logs:
kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
Ensure: 1. Secret exists in the specified namespace 2. The specified key exists in the secret 3. The operator has permissions to read secrets
TLS Certificate Issues
If you encounter errors with TLS certificates:
- "file name too long" error: Ensure the certificate is stored correctly in the secret:
- If using
stringData, the certificate should be in PEM format with-----BEGINand-----ENDheaders - If using
data(base64), ensure the value is correctly encoded -
The operator automatically detects certificate content by the
-----BEGINprefix -
"failed to read CA file" error: Check the certificate format:
Ensure the certificate starts with# Check secret content kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data.ca\.crt}' | base64 -d-----BEGIN CERTIFICATE----- -
Temporary file creation error: Check operator permissions to create files in the temporary directory
Kafka
The Kafka connector supports reading and writing messages from/to Apache Kafka topics. It supports consumer groups for scaling, TLS and SASL authentication, as well as Avro format through Confluent Schema Registry or static schema.
Source
source:
type: kafka
config:
brokers:
- kafka1:9092
topic: input-topic
consumerGroup: my-group
# Kafka security protocol (optional)
# Values: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
# If omitted, protocol is inferred from tls/sasl sections (backward compatible)
securityProtocol: SASL_PLAINTEXT
# TLS configuration (optional)
tls:
insecureSkipVerify: false
caFile: /path/to/ca.crt
certFile: /path/to/client.crt
keyFile: /path/to/client.key
# SASL authentication (optional)
sasl:
# Mechanism: plain, scram-sha-256, scram-sha-512
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
# Message format (optional, default: "json")
# Supported formats: "json", "avro"
format: json
# Avro configuration (required if format: "avro")
# Option 1: Use Confluent Schema Registry (recommended)
schemaRegistry:
url: https://schema-registry:8081
basicAuth:
username: schema-user
password: schema-password
tls:
insecureSkipVerify: false
caFile: /path/to/schema-registry-ca.crt
# Option 2: Static Avro schema (alternative to Schema Registry)
# avroSchema: |
# {
# "type": "record",
# "name": "MyRecord",
# "fields": [
# {"name": "id", "type": "long"},
# {"name": "name", "type": "string"}
# ]
# }
# Or path to schema file:
# avroSchemaFile: /path/to/schema.avsc
Features
- Consumer Groups: Use different consumer groups for scaling processing
- Initial Offset: Reads from oldest message by default (
OffsetOldest) - Data Formats: Supports JSON (default) and Avro formats
- Avro Support:
- Confluent Schema Registry: Automatic schema retrieval by ID from messages (format: magic byte + schema ID + data)
- Static Schema: Use predefined schema from configuration or file
- Schema Caching: Schemas from Schema Registry are cached for performance
- Metadata: Each message contains metadata:
topic- topic namepartition- partition numberoffset- message offsetkey- message key (if present)- securityProtocol: Explicit Kafka
security.protocol. Supported values:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL(case and-/_are normalized). When omitted, the protocol is inferred:saslonly →SASL_PLAINTEXT,tls+sasl→SASL_SSL,tlsonly →SSL, neither →PLAINTEXT. When set explicitly, the webhook validates consistency withtls/sasl(e.g.SASL_PLAINTEXTrequiressasland forbidstls).
Sink
sink:
type: kafka
config:
brokers:
- kafka1:9092
topic: output-topic
# Security protocol (optional, same as source)
securityProtocol: SASL_PLAINTEXT
# TLS and SASL configuration similar to source
tls:
caFile: /path/to/ca.crt
sasl:
mechanism: scram-sha-256
username: kafka-user
password: kafka-password
Example: SASL_SSL
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
consumerGroup: secure-group
securityProtocol: SASL_SSL
tls:
caFile: /etc/kafka/ca.crt
certFile: /etc/kafka/client.crt
keyFile: /etc/kafka/client.key
sasl:
mechanism: scram-sha-512
username: kafka-user
password: kafka-password
Example: SASL_PLAINTEXT (no TLS)
source:
type: kafka
config:
brokers:
- kafka1:9092
topic: input-topic
consumerGroup: my-group
securityProtocol: SASL_PLAINTEXT
sasl:
mechanism: scram-sha-256
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
PostgreSQL
The PostgreSQL connector supports reading from and writing to PostgreSQL tables. It supports custom SQL queries, periodic polling, batch inserts, auto-create tables, UPSERT mode, CDC-style change tracking (inserts and updates), soft delete, and SecretRef for credentials.
For native logical replication CDC (WAL / pgoutput), use the separate postgresql-cdc source type.
Source
source:
type: postgresql
config:
# Connection string (required, or use connectionStringSecretRef)
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Table to read from (required if query not specified). Supports schema.table (e.g. public.products)
table: source_table
# Custom SQL query (optional)
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
# Poll interval in seconds (optional, default: 5)
pollInterval: 60
# CDC-style options (optional)
readBatchSize: 1000 # Limit rows per poll to reduce DB load (0 = no limit)
changeTrackingColumn: updated_at # Column to track changes (default: updated_at). In query mode, set explicitly to enable incremental subquery wrapper
orderByColumn: id # Secondary sort key for stable pagination (default: id). Example: price_id
autoCreateTable: true # Create table if it doesn't exist before reading
# SecretRef (optional) - use instead of direct values
# connectionStringSecretRef:
# name: postgres-credentials
# key: connectionString
# tableSecretRef:
# name: postgres-credentials
# key: table
Source Features
- Periodic Polling: Regularly polls the table for new data
- Custom Queries: Support for complex SQL with JOIN, WHERE, etc.
- Metadata: Each message contains
tablemetadata andoperation(insert/update) - Read Batch Size: Limits rows per poll to reduce database load when many new records appear
- Change Tracking: By default tracks changes via
updated_atcolumn (orchangeTrackingColumn), captures both INSERTs and UPDATEs - Stable ordering: Table mode adds
ORDER BY changeTrackingColumn, orderByColumn(default secondary key:id). WhenqueryandchangeTrackingColumnare both set, your SQL is wrapped in a subquery with the same composite checkpoint filter(changeTrackingColumn, orderByColumn) > (lastTime, lastId)andORDER BY. Without explicitchangeTrackingColumn, query mode runs your SQL as-is (legacy). Row value fromorderByColumnis stored in message metadata keyid. - Query mode requirements: When using incremental query mode, the SELECT must include
changeTrackingColumnandorderByColumncolumns; otherwise checkpoint advancement will not work. - Auto-create Table: When
autoCreateTable: true, creates the table with CDC-friendly schema (id SERIAL PRIMARY KEY,created_at,updated_at) if it doesn't exist. Creation happens at Connect time. - Schema notation: Table name supports
schema.tableformat (e.g.public.products) - Checkpoint persistence: By default, read position (
lastReadChangeTime,lastReadOrderByValue) is persisted to ConfigMap; on restart, reading resumes from the last position. SetcheckpointPersistence: falsein spec to store only in memory. For pg→pg flows, enableupsertMode: truein sink to update duplicates instead of inserting them again.
One-time migration (batch exhaust)
For a single snapshot copy (e.g. DataFlowCron job that must stop after all rows are read), use table mode with explicit pagination and a stable key column:
- Prefer a MATERIALIZED VIEW on the source DB when the business SQL is heavy: run
CREATE MATERIALIZED VIEW+REFRESHonce, then pointtableat the MVIEW. - Set
changeTrackingColumnandorderByColumnto the same non-timestamp key (e.g.material_id) when noupdated_atexists in the result. - Set
readBatchSize(e.g.10000) so reads are batched. - On the sink:
upsertMode: true, explicitconflictKey, matching column types (e.g.BIGINTfor numeric IDs, notTEXT). - The processor stops when the next poll returns zero rows (
source exhausted). Checkpoint must advance via messageAckafter each successful sink write.
Example source fragment:
source:
type: postgresql
config:
table: price_calendar.mv_one_p_prices_migration
changeTrackingColumn: material_id
orderByColumn: material_id
readBatchSize: 10000
pollInterval: 5
The same pattern applies to Trino and ClickHouse incremental sources (order-by-only checkpoint). Kafka and Nessie use different offset/snapshot models.
Sink
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Table to write to. Supports schema.table (e.g. public.products_clone)
table: target_table
# Batch size (optional, default: 1). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
autoCreateTable: true
# Raw mode (optional, default: false)
# When true, expects {"value": <data>, "_metadata": {...}} or plain body with msg.Metadata
# Table is created with data JSONB and _metadata JSONB columns
rawMode: false
# UPSERT mode (optional, default: false)
upsertMode: true
conflictKey: "id"
upsertStrategy: ifNewer # always (default) | ifNewer
upsertVersionColumn: updated_at # required when upsertStrategy is ifNewer
# Soft delete column (optional). When set, DELETE operations UPDATE this column instead of physical delete
softDeleteColumn: "deleted_at"
# SecretRef (optional)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Sink Features
- Batch Inserts: Groups messages for efficient writing. Flush when
batchSizereached or on timer. UsebatchFlushIntervalSeconds: 0for size-only flush;batchSize: 0for timer-only flush. - Auto-create Tables (when
autoCreateTable: true): - rawMode: true — table is created at Connect time. Schema:
id SERIAL PRIMARY KEY,dataJSONB,_metadataJSONB,created_at,updated_at,deleted_at, GIN index ondata - rawMode: false — table is created at first write from the first message structure (replicates source schema). Column types are inferred automatically (TEXT, BIGINT, NUMERIC, JSONB, etc.)
- Raw Mode: When true, stores message body in
dataand metadata in_metadata(or flattened columns). When false, uses columnar format from message. - UPSERT Mode: Updates existing records on conflict (PRIMARY KEY or
conflictKey). Batch flushes run in an explicit PostgreSQL transaction. - Upsert strategy:
always(default) updates on every conflict;ifNewerupdates only when the incomingupsertVersionColumnis greater than the stored value (prevents stale replays from overwriting newer rows). - Soft Delete: When
softDeleteColumnis set and message hasmetadata.operation=delete, performsUPDATE ... SET deleted_at = NOW()instead of physical DELETE
PostgreSQL CDC (logical replication)
Native Change Data Capture via PostgreSQL logical replication (pgoutput). Reads INSERT, UPDATE, and DELETE from the WAL through a replication slot and publication. Sample: config/samples/postgresql-cdc-to-postgres.yaml.
Requirements
PostgreSQL must have logical replication enabled:
-- postgresql.conf
wal_level = logical
max_replication_slots >= 1
max_wal_senders >= 1
The connection user needs REPLICATION privilege and permission to create publications/slots (or pre-create them). For tables without a primary key, set REPLICA IDENTITY FULL for reliable UPDATE/DELETE events.
-- pg_hba.conf (example)
host replication repl_user 0.0.0.0/0 scram-sha-256
-- privileges
GRANT REPLICATION ON DATABASE db TO repl_user;
Source
source:
type: postgresql-cdc
config:
connectionString: "postgres://repl_user:pass@pg:5432/db?sslmode=disable"
slotName: dataflow_orders_slot
publicationName: dataflow_orders_pub
tables:
- public.orders
- public.customers
snapshotMode: initial # initial | never | always (default: initial)
createSlotIfNotExists: true
createPublicationIfNotExists: true
heartbeatIntervalSeconds: 10 # standby status updates when idle (0 = disable)
primaryKeyColumn: id # metadata.id when column exists (default: id)
includeColumns: [] # optional column allow-list
excludeColumns: []
envelopeFormat: row # row (default) | debezium
Source features
- Streaming WAL: Continuous logical replication (not polling)
- Initial snapshot:
snapshotMode: initialcopies existing rows before streaming (Debezium-style) - Checkpoint: LSN stored in ConfigMap under key
postgresql-cdc; advanced only after sinkAck - Message metadata:
operation(insert/update/delete),table,lsn,id(when PK column present) - Multi-table: One publication, filter via
tableslist - Schema evolution: Relation metadata refreshed on each
RelationMessageV2from PostgreSQL - Debezium envelope:
envelopeFormat: debeziumemitspayload.before/payload.after/payload.op/payload.source(compatible withdebeziumUnwrapwithout Kafka Connect) - replicas: Must be
1(same as polling sources)
Debezium parity notes
| Feature | Debezium | postgresql-cdc |
|---|---|---|
| Initial snapshot | op: r in envelope |
Same when envelopeFormat: debezium |
| UPDATE before/after | Both in envelope | Both when old tuple available in WAL |
| Schema history topic | Yes | No — relation cache refreshed inline on DDL |
| DDL events | Separate messages | Not emitted (schema refresh only) |
| Kafka Connect admin | Required | Not required |
Idempotent sink
Use ackGranularity: message and an idempotent sink (upsertMode + conflictKey) to minimize duplicates on restart.
ClickHouse
The ClickHouse connector supports reading from and writing to ClickHouse tables. It supports polling for incremental reads, custom SQL queries, batch inserts, and auto-creation of MergeTree tables.
Source
source:
type: clickhouse
config:
# Connection string (required)
# Native: clickhouse://host:9000?username=default&password=xxx&database=default
# HTTP: http://host:8123/default?username=default&password=xxx
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
# Table to read from (required, if not using query)
table: source_table
# Custom SQL query (optional)
# If specified, used instead of reading from table
query: "SELECT * FROM source_table WHERE id > 100"
# Poll interval in seconds (optional, default: 5)
pollInterval: 60
# Column for incremental pagination and stable ORDER BY (optional, default: id)
orderByColumn: price_id
# Column used to track changes (optional, default: created_at)
changeTrackingColumn: created_at
# Limit rows per poll (optional, 0 = no limit)
readBatchSize: 1000
Source Features
- Polling: Periodically polls the table for new data (interval set by
pollInterval) - Read batching:
readBatchSizelimits rows per query within a poll cycle (same as PostgreSQL source) - Incremental Reads: Table mode uses composite checkpoint
(changeTrackingColumn, orderByColumn) > (lastTime, lastKey)with tuple WHERE; legacy checkpoints with onlylastReadOrderByValuefall back toWHERE orderByColumn > Nuntil a timestamp is recorded - Custom Queries: When
queryandchangeTrackingColumnare both set, SQL is wrapped in a subquery with the same composite filter; without explicitchangeTrackingColumn, query mode runs your SQL as-is (legacy) - Stable ordering: Table mode adds
ORDER BY changeTrackingColumn, orderByColumn(defaults:created_at,id) - Checkpoint persistence: Read position (
lastReadChangeTime,lastReadOrderByValue) is persisted to ConfigMap; legacylastReadID/lastReadTimeare migrated on load - Metadata: Each message contains
tableandid(value fromorderByColumncolumn, if present)
Sink
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
table: output_table
# Batch size for inserts (optional, default: 100). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Auto-create table (optional)
autoCreateTable: true
# rawMode: false (default) — creates table from first message structure (columnar, replicates source schema)
# rawMode: true — creates MergeTree table with data String, created_at columns (JSON storage)
rawMode: false
# Idempotent writes (optional)
upsertMode: true
conflictKey: id
replacingVersionColumn: updated_at
tableEngine: ReplacingMergeTree # default when upsertMode is true and unset
Sink Features
- Batch Inserts: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Upsert mode: With
upsertMode: true, auto-created tables useReplacingMergeTree; duplicates are resolved on background merge. See Fault Tolerance. - Retry on Transient Errors: Batch writes automatically retry on connection refused, TOO_MANY_PARTS, memory limit, HTTP 502/503, and similar transient errors (up to 5 attempts with exponential backoff)
Resilience and Fault Tolerance
- Error sink: Use
spec.errorswith ClickHouse (or Kafka) to capture failed messages for replay or analysis. See Error Handling. - Connection string: Recommended parameters:
dial_timeout=10s,max_execution_time=60(e.g.clickhouse://host:9000/default?dial_timeout=10s&max_execution_time=60). - Batch tuning: For throughput and resilience, use
batchSize100–1000 andbatchFlushIntervalSeconds5–10. Larger batches reduce insert frequency and can help avoid TOO_MANY_PARTS. - TOO_MANY_PARTS: If you see this error, increase
batchSize, decrease flush frequency, or tune ClickHouse merge settings (background_pool_size,parts_to_throw_insert). ConsiderReplacingMergeTreefor deduplication. See Fault Tolerance. - Auto-create Tables (when
autoCreateTable: true): - rawMode: true — table is created at Connect time. Schema:
data String,created_at DateTime DEFAULT now(), MergeTree engine, ORDER BY created_at. Messages are stored as JSON string indatacolumn - rawMode: false — table is created at first write from the first message structure. Column types are inferred automatically (String, Int32/Int64, Float64, Decimal, DateTime, etc.). Supports
{"value": {...}, "_metadata": {...}}format — uses fields fromvalue - Raw Mode: When true, expects/stores JSON in
datacolumn. When false, uses columnar format from message (INSERT with named columns)
Example: Kafka to ClickHouse
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-clickhouse
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default"
table: output_table
batchSize: 100
autoCreateTable: true
Example: ClickHouse to ClickHouse (schema replication)
With autoCreateTable: true and rawMode: false, the target table is created automatically from the source structure:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: clickhouse-to-clickhouse
spec:
source:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products
pollInterval: 5
sink:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products_clone
batchSize: 100
autoCreateTable: true
Trino
The Trino connector supports reading from and writing to Trino (formerly PrestoSQL) tables. It supports SQL queries, Keycloak OAuth2/OIDC authentication, and batch inserts.
Source
source:
type: trino
config:
# Trino server URL (required)
serverURL: "http://trino:8080"
# Catalog to use (required)
catalog: hive
# Schema to use (required)
schema: default
# Table to read from (required, if not using query)
table: source_table
# Custom SQL query (optional)
# If specified, used instead of reading from table
query: "SELECT * FROM hive.default.source_table WHERE id > 100"
# Poll interval in seconds (optional, default: 5)
# Used for periodic reading of new data
pollInterval: 60
# Column for incremental pagination and stable ORDER BY (optional, default: id)
orderByColumn: price_id
# Limit rows per poll (optional, 0 = no limit)
readBatchSize: 1000
# Keycloak authentication (optional)
keycloak:
# Option 1: Use long-lived token directly (recommended for long-lived tokens)
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Option 2: Use OAuth2 flow (alternative to direct token)
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Features
- SQL Queries: Support for custom SQL queries with WHERE, JOIN, etc. When
queryandchangeTrackingColumnare both set, SQL is wrapped with composite checkpoint filter; otherwise custom queries getORDER BY orderByColumnonly (legacy) - Read batching:
readBatchSizelimits rows per query within a poll cycle (same as PostgreSQL source) - Periodic Polling: Table mode uses composite checkpoint
(changeTrackingColumn, orderByColumn) > (lastTime, lastKey); legacylastReadIDcheckpoints useWHERE orderByColumn > Nuntil timestamp is recorded - Checkpoint persistence:
lastReadChangeTimeandlastReadOrderByValuein ConfigMap; legacylastReadIDmigrated on load - Metadata: Row value from
orderByColumnis stored in message metadata keyid - Keycloak Authentication: OAuth2/OIDC authentication via Keycloak
- Direct Token: Use a long-lived token obtained from Keycloak (recommended for long-lived tokens)
- Password Grant: Use username/password for authentication
- Client Credentials: Use client ID/secret for service-to-service authentication
- Automatic Token Refresh: Tokens are automatically refreshed before expiration (only for OAuth2 flow, not for direct tokens)
- Metadata: Each message contains metadata:
catalog- catalog nameschema- schema nametable- table name
Obtaining a Token from Keycloak
To use a long-lived token, you can obtain it from Keycloak using the following methods:
Method 1: Using curl (Password Grant)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=password" \
-d "client_id=trino-client" \
-d "client_secret=client-secret" \
-d "username=trino-user" \
-d "password=trino-password"
Method 2: Using curl (Client Credentials)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=trino-client" \
-d "client_secret=client-secret"
The response will contain an access_token field. Use this token value in the token field of the Keycloak configuration.
Note: For long-lived tokens, configure the token lifespan in Keycloak realm settings or client settings.
Sink
sink:
type: trino
config:
# Trino server URL (required)
serverURL: "http://trino:8080"
# Catalog to use (required)
catalog: hive
# Schema to use (required)
schema: default
# Table to write to (required)
table: target_table
# Batch size for inserts (optional, default: 1). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Auto-create table (optional, default: false)
# If true, creates table with VARCHAR column for JSON data
autoCreateTable: true
# Raw mode (optional, default: false)
# When true, creates table with data VARCHAR column; messages stored as JSON string
# When false, uses columnar format matching message keys to table columns
rawMode: true
# MERGE upsert for Iceberg catalogs (catalog name must contain "iceberg")
upsertMode: true
conflictKey: id
queryTimeoutSeconds: 600
# Keycloak authentication (optional)
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Features
- Batch Inserts: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Upsert mode:
upsertMode: trueusesMERGE INTOfor Iceberg tables (conflictKeyrequired). See Fault Tolerance. - Auto-create Tables: Automatically creates tables if they don't exist
- Raw Mode: When
rawMode: true, creates table withdata VARCHARcolumn; messages stored as JSON string. Whenfalse(default), uses columnar format matching message keys to table columns - Keycloak Authentication: OAuth2/OIDC authentication via Keycloak
- Automatic Token Refresh: Tokens are automatically refreshed
Example: Kafka to Trino with Keycloak
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: default
table: output_table
batchSize: 100
keycloak:
# Use long-lived token obtained from Keycloak
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Alternative: Use OAuth2 flow
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Nessie
The Nessie connector reads from and writes to Apache Iceberg tables via the Nessie catalog (Iceberg REST API). All operations are performed in the context of a Nessie branch; metadata and data are managed by the catalog.
Nessie Core API vs Iceberg REST
- Nessie Core API (
/api/v1,/api/v2) manages repository metadata: references, commits, and content entries. - Iceberg REST catalog API (
{baseURL}/iceberg[/{branch}][|{warehouse}]) is used by DataFlow for table reads and writes. - Physical table files are stored in the configured warehouse/object storage (for example S3-compatible storage), while Nessie tracks catalog metadata and versioned pointers.
If you need Core API details for administrative operations, use the OpenAPI document at repository root: nessie-openapi-0.107.5.yaml.
Source
source:
type: nessie
config:
# Nessie server base URL (required), e.g. https://nessie:19120
baseURL: "http://nessie:19120"
# Nessie branch to read from (optional, default: main)
branch: main
# Warehouse name for storage location (optional), e.g. for /iceberg/|warehouse
warehouse: ""
# Namespace (schema) of the Iceberg table (required)
namespace: my_schema
# Table name (required)
table: my_table
# Poll interval in seconds (optional, default: 10)
pollInterval: 10
# Incremental read by Iceberg snapshot (optional, default: false)
# incrementalBySnapshot: true
# startSnapshotID: "1234567890123456789" # first run without checkpoint
# snapshotCheckpoints: true # default
# Authentication (optional)
# authenticationType: AUTO (default) | BEARER | BASIC | NONE
authenticationType: BEARER
bearerToken: "your-token"
# Or Basic auth:
# basicAuth:
# username: user
# password: pass
Features
- Branch context: Reads from the specified Nessie branch; table metadata is resolved from the catalog.
- Polling: Periodically scans the Iceberg table for new data.
- Full scan (default): Each poll scans the table’s current snapshot in full.
- Incremental mode (
incrementalBySnapshot: true): Reads only snapshots newer than the lastAck; position is stored in the checkpoint ConfigMap (see fault tolerance). Thequeryfield is not supported in this mode. - Authentication: Bearer token (OAuth2) or Basic auth for Nessie/Iceberg REST.
- No
rawModeon source: Each Iceberg row is emitted as a JSON object keyed by column names (for exampledataand_metadataif the table was written with sinkrawMode: true). Incremental mode reduces duplicates on restart but does not enable multiple processor pods — see Horizontal scaling below.
Sink
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
warehouse: ""
namespace: my_schema
table: my_table
# Batch size for appends (optional, default: 100). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Create table if it does not exist (optional)
autoCreateTable: true
# Raw mode (optional, default: false)
# When true, creates table with data and _metadata string columns; plain messages use msg.Metadata for _metadata
rawMode: false
authenticationType: BEARER
bearerToken: "your-token"
# Or basicAuth: { username, password }
# Optional: Iceberg warehouse object storage (S3-compatible). Separate from Nessie REST auth above.
# Set accessKeySecretRef and secretAccessKeySecretRef together.
# If namespace is omitted, defaults to the DataFlow namespace and values use secretKeyRef (recommended).
# If namespace points elsewhere, the operator reads those Secrets at reconcile time and sets literal env on the Deployment (values appear in the Deployment object in the API).
# Credential refs stay as refs inside mounted spec JSON for Nessie sink (plaintext keys do not go through resolve into ConfigMap for these fields).
# s3Endpoint: "https://storage.yandexcloud.net"
# s3Region: "ru-central1"
# accessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_ACCESS_KEY_ID
# secretAccessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_SECRET_ACCESS_KEY
Features
- Branch context: Writes are committed to the specified Nessie branch via the catalog.
- Batch appends: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Auto-create table: When
autoCreateTable: true, creates an Iceberg table if missing. Default: onedata(string) column. WithrawMode: true:dataand_metadata(string) columns — see rawMode and_metadata. - Authentication: Same as source (Bearer or Basic).
- Warehouse object storage: Optional static credentials for the Parquet warehouse (
accessKeySecretRef+secretAccessKeySecretRef) set iceberg-go / AWS SDK env (AWS_S3_ENDPOINT,AWS_REGIONwhens3Endpoint/s3Regionare set). Same-namespace refs usesecretKeyRef; other namespaces are resolved by the operator into Deployment env literals. Changing a referenced Secret triggers reconcile via normal Secret watches.
Horizontal scaling (spec.replicas)
Nessie is a polling source. The processor Deployment follows spec.replicas (default 1).
spec.source.type |
spec.replicas |
Behavior |
|---|---|---|
kafka |
> 1 allowed |
Consumer group assigns topic partitions across pods. Nessie sink can run on multiple pods only in this case (each pod writes batches independently). |
nessie (or any other polling source) |
must be 1 or unset |
Admission webhook rejects replicas > 1. Multiple pods would share one checkpoint ConfigMap and duplicate reads/writes. |
| any ( DataFlowCron ) | must be 1 or unset |
One processor Job per schedule tick; replicas > 1 is always rejected. |
incrementalBySnapshot on the Nessie source improves restart behavior (checkpoint by Iceberg snapshot) but does not replace Kafka-style horizontal scaling.
For higher throughput with Nessie sink, tune batchSize, batchFlushIntervalSeconds, and channelBufferSize instead of increasing replicas. Details: fault tolerance — horizontal scaling, DataFlow spec — replicas.
rawMode and _metadata column (sink only)
rawMode is configured on the sink (sink.config.rawMode). It is not available on the Nessie source.
Iceberg schema when autoCreateTable: true and rawMode: true:
| Column | Iceberg type | Content |
|---|---|---|
data |
string | Payload JSON (message body) |
_metadata |
string | JSON object with lineage fields (Kafka offset, partition, topic, etc.) |
Unlike PostgreSQL sink raw mode (data / _metadata as JSONB), Nessie stores both fields as string columns containing JSON text.
How messages are mapped on write
- Plain message —
msg.Datais the payload;msg.Metadata(for example from Kafka source) is serialized into_metadata:
{"id": 1, "event": "login"}
→ data = body JSON, _metadata = {"offset":100,"partition":0,"topic":"events",...}
- Pre-wrapped message — body already has
valueand_metadatakeys (common when chaining from another rawMode sink):
{"value": {"id": 1}, "_metadata": {"offset": 10, "topic": "t1"}}
→ inner value → data column, inner _metadata → _metadata column (same convention as Trino/PostgreSQL rawMode).
Existing tables
- With
rawMode: true,Connectvalidates that the table has bothdataand_metadatastring-compatible columns (case-insensitive). Otherwise the processor fails fast. - A table created without
_metadatacannot be used withrawMode: trueuntil you recreate it or add the column manually.
Reading back (Nessie source)
The source does not unwrap rawMode; it emits one JSON object per row with Iceberg column names. A row from a rawMode table looks like:
{"data": "{\"id\":1}", "_metadata": "{\"offset\":100,\"topic\":\"events\"}"}
Parse data / _metadata in the downstream sink if you need structured fields.
flattenMetadataColumns — flat metadata columns (sink)
Supported on PostgreSQL, Trino, ClickHouse, and Nessie sinks when rawMode: true.
When flattenMetadataColumns: true, each key from msg.Metadata is written to a separate column instead of a JSON _metadata column. Optional flattenMetadataColumnsPrefix is prepended to column names (for example kafka_ + offset → kafka_offset).
| Parameter | Description |
|---|---|
flattenMetadataColumns |
true — expand metadata into columns |
flattenMetadataColumnsPrefix |
Column name prefix (for example kafka_) |
With autoCreateTable: true, the schema is data (string) plus columns for metadata keys from the first batch (types inferred: int/long/string/bool/timestamp). The metadata key timestamp (for example kafka_timestamp with prefix kafka_) is stored as timestamptz (Kafka source provides time.Time in UTC). Legacy string timestamps in metadata are parsed when possible. The _metadata column is not created. New metadata keys in later batches are skipped (warning logged).
Tables with an _metadata column are incompatible with flattenMetadataColumns: true — recreate the table or use a new table name.
Reading (Nessie source): rows with data and prefixed columns (without _metadata) are emitted as {"value": ..., "_metadata": ...}; metadata fields are also copied to msg.Metadata.
Example: Kafka to Nessie (Iceberg)
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-nessie
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
namespace: analytics
table: events
batchSize: 100
autoCreateTable: true
rawMode: true
flattenMetadataColumns: true
flattenMetadataColumnsPrefix: kafka_
Iceberg (REST Catalog)
The Iceberg connector reads from and writes to Apache Iceberg tables via the official REST Catalog API. Use this connector for generic REST catalog implementations (Polaris, Lakekeeper, Tabular, iceberg-rest, etc.). For Nessie branch semantics, use the nessie connector instead.
Source
source:
type: iceberg
config:
# REST catalog base URL (required), e.g. https://catalog:8181
catalogURI: "https://iceberg-catalog.example.com"
# Optional REST path prefix (multi-tenant /v1/{prefix}/...)
# prefix: production
# Optional warehouse identifier (passed to GET /v1/config and catalog client)
# warehouse: main
namespace: my_schema
table: my_table
pollInterval: 10
# Incremental read by Iceberg snapshot (optional)
# incrementalBySnapshot: true
# startSnapshotID: "1234567890123456789"
# snapshotCheckpoints: true
# Authentication (optional)
authenticationType: BEARER
bearerToken: "your-token"
# OAuth2 client credentials (alternative to bearerToken):
# oauth2ClientID: client-id
# oauth2ClientSecret: client-secret
# oauth2ServerURI: https://auth.example.com/oauth/token
# oauth2Scope: catalog
Sink
sink:
type: iceberg
config:
catalogURI: "https://iceberg-catalog.example.com"
namespace: my_schema
table: my_table
batchSize: 100
autoCreateTable: true
rawMode: true
authenticationType: BEARER
bearerToken: "your-token"
# Optional S3-compatible warehouse credentials (injected as pod env)
# s3Endpoint: https://storage.example.com
# s3Region: us-east-1
# accessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_ACCESS_KEY_ID
# secretAccessKeySecretRef:
# name: iceberg-s3-creds
# key: AWS_SECRET_ACCESS_KEY
Behavior matches the Nessie connector for polling, incremental snapshots, batch appends, rawMode, and flattenMetadataColumns. Checkpoint store key is iceberg. spec.replicas must be 1 for Iceberg sources (same as Nessie polling sources).
Example: Kafka to Iceberg
See sample manifest: dataflow/config/samples/kafka-to-iceberg.yaml.
Error Sink
DataFlow Operator supports configuring a separate sink for messages that failed to be written to the main sink. The errors section uses the same connector types (Kafka, PostgreSQL, Trino, ClickHouse, Nessie, Iceberg) as the main sink.
For configuration, error message structure, and error types, see Error Handling.
Performance Recommendations
Spec-level settings
channelBufferSize
Buffer size for message channels between source, processor, and sink (default 100). For high Kafka throughput (tens of thousands msg/s), increase to 500–1000 to reduce blocking when the sink is slower than the source.
ackGranularity
When source offsets / checkpoints are committed relative to sink writes: batch (default) or message. With message, batch sinks flush one row at a time and Kafka source commits offsets immediately after each ack. See Fault Tolerance — Ack granularity.
checkpointSyncOnAck and checkpointSaveInterval
checkpointSyncOnAck: true flushes polling checkpoints to the ConfigMap after each sink ack (coalesced by checkpointSaveInterval, default 30s). Recommended for migration and cron workloads. Details: Fault Tolerance.
Kafka
- Use multiple brokers for fault tolerance
- Configure an appropriate consumer group size for parallel processing
- Use batch writes for higher throughput
PostgreSQL
- Increase
batchSizefor the sink (recommended 50–100) - Add indexes on frequently queried columns
- Tune
pollIntervalbased on data update frequency
Troubleshooting
Connection issues
- Verify data source accessibility from the cluster
- Ensure credentials are correct
- Check Kubernetes network policies
- For TLS, verify certificates
Performance issues
- Increase
channelBufferSize(500–1000) for high Kafka load - Increase batch sizes for sinks
- Tune
pollIntervalfor sources - For Kafka sources, increase
spec.replicas(up to topic partition count). For Nessie and other polling sources, keepreplicas: 1and tunebatchSize/channelBufferSizeinstead - Monitor message processing metrics