Connectors
DataFlow Operator supports various connectors for data sources and sinks. Each connector implements a standard interface and can be used as a data source (source) or sink.
Connector Overview
| Connector | Source | Sink | Features |
|---|---|---|---|
| Kafka | ✅ | ✅ | Consumer groups, TLS, SASL, Avro, Schema Registry |
| PostgreSQL | ✅ | ✅ | SQL queries, batch inserts, auto-create tables, UPSERT mode |
| Trino | ✅ | ✅ | SQL queries, Keycloak OAuth2 authentication, batch inserts |
| ClickHouse | ✅ | ✅ | Polling, batch inserts, auto-create MergeTree tables |
| Nessie | ✅ | ✅ | Iceberg tables via Nessie catalog, branches, Basic/Bearer auth, polling, batch appends |
Using Kubernetes Secrets
DataFlow Operator supports configuring connectors from Kubernetes Secrets. This allows secure storage of sensitive data (passwords, tokens, connection strings) without explicitly specifying them in the DataFlow specification.
Overview
Instead of specifying values directly in the configuration, you can use references to Kubernetes Secrets through *SecretRef fields. The operator automatically reads values from secrets and substitutes them into connector configuration.
SecretRef Structure
Each secret reference has the following structure:
secretRef:
name: my-secret # Secret name (required)
namespace: my-namespace # Secret namespace (optional, defaults to DataFlow namespace)
key: my-key # Key in secret (required)
Supported Fields
All connectors support secret references for the following fields:
Kafka
brokersSecretRef- broker list (comma-separated)topicSecretRef- topic nameconsumerGroupSecretRef- consumer groupsasl.usernameSecretRef- SASL usernamesasl.passwordSecretRef- SASL passwordtls.certSecretRef- client certificatetls.keySecretRef- private keytls.caSecretRef- CA certificateschemaRegistry.urlSecretRef- Schema Registry URLschemaRegistry.basicAuth.usernameSecretRef- Schema Registry usernameschemaRegistry.basicAuth.passwordSecretRef- Schema Registry passwordavroSchemaSecretRef- Avro schema from secret (for static schema)
PostgreSQL
connectionStringSecretRef- connection stringtableSecretRef- table name
ClickHouse
connectionStringSecretRef- connection stringtableSecretRef- table name
Trino
serverURLSecretRef- Trino server URLcatalogSecretRef- catalog nameschemaSecretRef- schema nametableSecretRef- table namekeycloak.serverURLSecretRef- Keycloak server URLkeycloak.realmSecretRef- Keycloak realm namekeycloak.clientIDSecretRef- OAuth2 client IDkeycloak.clientSecretSecretRef- OAuth2 client secretkeycloak.usernameSecretRef- username for password grantkeycloak.passwordSecretRef- password for password grantkeycloak.tokenSecretRef- OAuth2 token (for long-lived tokens)
Nessie
baseURLSecretRef- Nessie server base URLtokenSecretRef- Bearer token for Nessie/Iceberg RESTnamespaceSecretRef- namespace (schema) nametableSecretRef- table name
Usage Examples
Example 1: Kafka with SASL Authentication
apiVersion: v1
kind: Secret
metadata:
name: kafka-credentials
namespace: default
type: Opaque
stringData:
brokers: "kafka1:9092,kafka2:9092,kafka3:9092"
topic: "input-topic"
consumerGroup: "dataflow-group"
username: "kafka-user"
password: "kafka-password"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-example
spec:
source:
type: kafka
config:
brokersSecretRef:
name: kafka-credentials
key: brokers
topicSecretRef:
name: kafka-credentials
key: topic
consumerGroupSecretRef:
name: kafka-credentials
key: consumerGroup
sasl:
mechanism: "scram-sha-256"
usernameSecretRef:
name: kafka-credentials
key: username
passwordSecretRef:
name: kafka-credentials
key: password
Example 2: PostgreSQL with Connection String from Secret
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
namespace: default
type: Opaque
stringData:
connectionString: "postgres://user:password@postgres:5432/dbname?sslmode=disable"
table: "output_table"
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: postgres-example
spec:
source:
type: postgresql
config:
connectionStringSecretRef:
name: postgres-credentials
key: connectionString
tableSecretRef:
name: postgres-credentials
key: table
Priority of Values
If both a direct value and SecretRef are specified, SecretRef takes priority. The value from the secret will be used instead of the direct value.
Secrets in Different Namespaces
By default, the operator looks for secrets in the same namespace where the DataFlow resource is located. You can specify a different namespace:
connectionStringSecretRef:
name: postgres-credentials
namespace: shared-secrets
key: connectionString
TLS Certificates from Secrets
For TLS configuration, the operator automatically determines whether the value from the secret is a file path or certificate content.
How it works:
- If the value starts with -----BEGIN (e.g., -----BEGIN CERTIFICATE----- or -----BEGIN PRIVATE KEY-----), the operator recognizes it as certificate content and creates a temporary file
- If the value doesn't start with -----BEGIN and exists as a file, it's used as a file path
- If the value doesn't start with -----BEGIN and the file doesn't exist, it's also treated as certificate content
Supported formats:
1. Certificate content (PEM format) - stored in stringData or decoded from data
2. Base64-encoded content - stored in secret's data field (Kubernetes automatically decodes it)
3. File path - path to an existing certificate file
Example:
apiVersion: v1
kind: Secret
metadata:
name: kafka-tls-certs
type: Opaque
stringData:
ca.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
client.crt: |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
client.key: |
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
---
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-tls-example
spec:
source:
type: kafka
config:
brokers:
- secure-kafka:9093
topic: secure-topic
tls:
caSecretRef:
name: kafka-tls-certs
key: ca.crt
certSecretRef:
name: kafka-tls-certs
key: client.crt
keySecretRef:
name: kafka-tls-certs
key: client.key
Important:
- Temporary files are automatically created for certificate content and cleaned up after use
- When using base64-encoded values in the data field, Kubernetes automatically decodes them when reading
- Ensure certificates are in proper PEM format with -----BEGIN and -----END headers
Security
- RBAC: The operator requires permissions to read secrets (
get,list,watch) - Isolation: Secrets should be in the same namespace or in a namespace the operator has access to
- Temporary Files: For TLS certificates, the operator creates temporary files that are automatically deleted
Troubleshooting
Secret Not Found
# Check if secret exists
kubectl get secret <secret-name> -n <namespace>
# Check operator permissions
kubectl auth can-i get secrets --as=system:serviceaccount:default:dataflow-operator
Key Not Found in Secret
# Check keys in secret
kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data}' | jq 'keys'
Secret Resolution Error
Check operator logs:
kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
Ensure: 1. Secret exists in the specified namespace 2. The specified key exists in the secret 3. The operator has permissions to read secrets
TLS Certificate Issues
If you encounter errors with TLS certificates:
- "file name too long" error: Ensure the certificate is stored correctly in the secret:
- If using
stringData, the certificate should be in PEM format with-----BEGINand-----ENDheaders - If using
data(base64), ensure the value is correctly encoded -
The operator automatically detects certificate content by the
-----BEGINprefix -
"failed to read CA file" error: Check the certificate format:
Ensure the certificate starts with# Check secret content kubectl get secret <secret-name> -n <namespace> -o jsonpath='{.data.ca\.crt}' | base64 -d-----BEGIN CERTIFICATE----- -
Temporary file creation error: Check operator permissions to create files in the temporary directory
Kafka
The Kafka connector supports reading and writing messages from/to Apache Kafka topics. It supports consumer groups for scaling, TLS and SASL authentication, as well as Avro format through Confluent Schema Registry or static schema.
Source
source:
type: kafka
config:
brokers:
- kafka1:9092
topic: input-topic
consumerGroup: my-group
# Message format (optional, default: "json")
# Supported formats: "json", "avro"
format: json
# Avro configuration (required if format: "avro")
# Option 1: Use Confluent Schema Registry (recommended)
schemaRegistry:
url: https://schema-registry:8081
basicAuth:
username: schema-user
password: schema-password
tls:
insecureSkipVerify: false
caFile: /path/to/schema-registry-ca.crt
# Option 2: Static Avro schema (alternative to Schema Registry)
# avroSchema: |
# {
# "type": "record",
# "name": "MyRecord",
# "fields": [
# {"name": "id", "type": "long"},
# {"name": "name", "type": "string"}
# ]
# }
# Or path to schema file:
# avroSchemaFile: /path/to/schema.avsc
Features
- Consumer Groups: Use different consumer groups for scaling processing
- Initial Offset: Reads from oldest message by default (
OffsetOldest) - Data Formats: Supports JSON (default) and Avro formats
- Avro Support:
- Confluent Schema Registry: Automatic schema retrieval by ID from messages (format: magic byte + schema ID + data)
- Static Schema: Use predefined schema from configuration or file
- Schema Caching: Schemas from Schema Registry are cached for performance
- Metadata: Each message contains metadata:
topic- topic namepartition- partition numberoffset- message offsetkey- message key (if present)
Sink
sink:
type: kafka
config:
brokers:
- kafka1:9092
topic: output-topic
# TLS and SASL configuration similar to source
PostgreSQL
The PostgreSQL connector supports reading from and writing to PostgreSQL tables. It supports custom SQL queries, periodic polling, batch inserts, auto-create tables, UPSERT mode, CDC-style change tracking (inserts and updates), soft delete, and SecretRef for credentials.
Source
source:
type: postgresql
config:
# Connection string (required, or use connectionStringSecretRef)
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Table to read from (required if query not specified). Supports schema.table (e.g. public.products)
table: source_table
# Custom SQL query (optional)
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
# Poll interval in seconds (optional, default: 5)
pollInterval: 60
# CDC-style options (optional)
readBatchSize: 1000 # Limit rows per poll to reduce DB load (0 = no limit)
changeTrackingColumn: updated_at # Column to track changes (default: updated_at). Not used when query is specified
autoCreateTable: true # Create table if it doesn't exist before reading
# SecretRef (optional) - use instead of direct values
# connectionStringSecretRef:
# name: postgres-credentials
# key: connectionString
# tableSecretRef:
# name: postgres-credentials
# key: table
Source Features
- Periodic Polling: Regularly polls the table for new data
- Custom Queries: Support for complex SQL with JOIN, WHERE, etc.
- Metadata: Each message contains
tablemetadata andoperation(insert/update) - Read Batch Size: Limits rows per poll to reduce database load when many new records appear
- Change Tracking: By default tracks changes via
updated_atcolumn (orchangeTrackingColumn), captures both INSERTs and UPDATEs - Auto-create Table: When
autoCreateTable: true, creates the table with CDC-friendly schema (id SERIAL PRIMARY KEY,created_at,updated_at) if it doesn't exist. Creation happens at Connect time. - Schema notation: Table name supports
schema.tableformat (e.g.public.products) - Checkpoint persistence: By default, read position (lastReadChangeTime) is persisted to ConfigMap; on restart, reading resumes from the last position. Set
checkpointPersistence: falsein spec to store only in memory. For pg→pg flows, enableupsertMode: truein sink to update duplicates instead of inserting them again.
Sink
sink:
type: postgresql
config:
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
# Table to write to. Supports schema.table (e.g. public.products_clone)
table: target_table
# Batch size (optional, default: 1). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
autoCreateTable: true
# Raw mode (optional, default: false)
# When true, expects messages in format {"value": <data>, "_metadata": {...}}
# Table is created with value JSONB and _metadata JSONB columns
rawMode: false
# UPSERT mode (optional, default: false)
upsertMode: true
conflictKey: "id"
# Soft delete column (optional). When set, DELETE operations UPDATE this column instead of physical delete
softDeleteColumn: "deleted_at"
# SecretRef (optional)
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
# tableSecretRef: { name: postgres-credentials, key: table }
Sink Features
- Batch Inserts: Groups messages for efficient writing. Flush when
batchSizereached or on timer. UsebatchFlushIntervalSeconds: 0for size-only flush;batchSize: 0for timer-only flush. - Auto-create Tables (when
autoCreateTable: true): - rawMode: true — table is created at Connect time. Schema:
id SERIAL PRIMARY KEY,valueJSONB,_metadataJSONB,created_at,updated_at,deleted_at, GIN index onvalue - rawMode: false — table is created at first write from the first message structure (replicates source schema). Column types are inferred automatically (TEXT, BIGINT, NUMERIC, JSONB, etc.)
- Raw Mode: When true, expects
{"value": <data>, "_metadata": {...}}and creates table with value/_metadata columns. When false, uses columnar format from message. - UPSERT Mode: Updates existing records on conflict (PRIMARY KEY or
conflictKey) - Soft Delete: When
softDeleteColumnis set and message hasmetadata.operation=delete, performsUPDATE ... SET deleted_at = NOW()instead of physical DELETE
ClickHouse
The ClickHouse connector supports reading from and writing to ClickHouse tables. It supports polling for incremental reads, custom SQL queries, batch inserts, and auto-creation of MergeTree tables.
Source
source:
type: clickhouse
config:
# Connection string (required)
# Native: clickhouse://host:9000?username=default&password=xxx&database=default
# HTTP: http://host:8123/default?username=default&password=xxx
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
# Table to read from (required, if not using query)
table: source_table
# Custom SQL query (optional)
# If specified, used instead of reading from table
query: "SELECT * FROM source_table WHERE id > 100"
# Poll interval in seconds (optional, default: 5)
pollInterval: 60
Source Features
- Polling: Periodically polls the table for new data (interval set by
pollInterval) - Incremental Reads: When
queryis not specified, usesidorcreated_atcolumns to filter already-read rows (WHERE id > lastReadIDorWHERE created_at > lastReadTime), avoiding duplicates on restart - Custom Queries: When
queryis specified, only that query is used; incremental logic is not applied - Metadata: Each message contains
tableandid(if column exists)
Sink
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
table: output_table
# Batch size for inserts (optional, default: 100). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Auto-create table (optional)
autoCreateTable: true
# rawMode: false (default) — creates table from first message structure (columnar, replicates source schema)
# rawMode: true — creates MergeTree table with data String, created_at columns (JSON storage)
rawMode: false
Sink Features
- Batch Inserts: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Retry on Transient Errors: Batch writes automatically retry on connection refused, TOO_MANY_PARTS, memory limit, HTTP 502/503, and similar transient errors (up to 5 attempts with exponential backoff)
Resilience and Fault Tolerance
- Error sink: Use
spec.errorswith ClickHouse (or Kafka) to capture failed messages for replay or analysis. See Error Handling. - Connection string: Recommended parameters:
dial_timeout=10s,max_execution_time=60(e.g.clickhouse://host:9000/default?dial_timeout=10s&max_execution_time=60). - Batch tuning: For throughput and resilience, use
batchSize100–1000 andbatchFlushIntervalSeconds5–10. Larger batches reduce insert frequency and can help avoid TOO_MANY_PARTS. - TOO_MANY_PARTS: If you see this error, increase
batchSize, decrease flush frequency, or tune ClickHouse merge settings (background_pool_size,parts_to_throw_insert). ConsiderReplacingMergeTreefor deduplication. See Fault Tolerance. - Auto-create Tables (when
autoCreateTable: true): - rawMode: true — table is created at Connect time. Schema:
data String,created_at DateTime DEFAULT now(), MergeTree engine, ORDER BY created_at. Messages are stored as JSON string indatacolumn - rawMode: false — table is created at first write from the first message structure. Column types are inferred automatically (String, Int32/Int64, Float64, Decimal, DateTime, etc.). Supports
{"value": {...}, "_metadata": {...}}format — uses fields fromvalue - Raw Mode: When true, expects/stores JSON in
datacolumn. When false, uses columnar format from message (INSERT with named columns)
Example: Kafka to ClickHouse
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-clickhouse
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: clickhouse
config:
connectionString: "clickhouse://default@clickhouse:9000/default"
table: output_table
batchSize: 100
autoCreateTable: true
Example: ClickHouse to ClickHouse (schema replication)
With autoCreateTable: true and rawMode: false, the target table is created automatically from the source structure:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: clickhouse-to-clickhouse
spec:
source:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products
pollInterval: 5
sink:
type: clickhouse
config:
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
table: products_clone
batchSize: 100
autoCreateTable: true
Trino
The Trino connector supports reading from and writing to Trino (formerly PrestoSQL) tables. It supports SQL queries, Keycloak OAuth2/OIDC authentication, and batch inserts.
Source
source:
type: trino
config:
# Trino server URL (required)
serverURL: "http://trino:8080"
# Catalog to use (required)
catalog: hive
# Schema to use (required)
schema: default
# Table to read from (required, if not using query)
table: source_table
# Custom SQL query (optional)
# If specified, used instead of reading from table
query: "SELECT * FROM hive.default.source_table WHERE id > 100"
# Poll interval in seconds (optional, default: 5)
# Used for periodic reading of new data
pollInterval: 60
# Keycloak authentication (optional)
keycloak:
# Option 1: Use long-lived token directly (recommended for long-lived tokens)
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Option 2: Use OAuth2 flow (alternative to direct token)
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Features
- SQL Queries: Support for custom SQL queries with WHERE, JOIN, etc.
- Periodic Polling: Regularly polls tables for new data
- Keycloak Authentication: OAuth2/OIDC authentication via Keycloak
- Direct Token: Use a long-lived token obtained from Keycloak (recommended for long-lived tokens)
- Password Grant: Use username/password for authentication
- Client Credentials: Use client ID/secret for service-to-service authentication
- Automatic Token Refresh: Tokens are automatically refreshed before expiration (only for OAuth2 flow, not for direct tokens)
- Metadata: Each message contains metadata:
catalog- catalog nameschema- schema nametable- table name
Obtaining a Token from Keycloak
To use a long-lived token, you can obtain it from Keycloak using the following methods:
Method 1: Using curl (Password Grant)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=password" \
-d "client_id=trino-client" \
-d "client_secret=client-secret" \
-d "username=trino-user" \
-d "password=trino-password"
Method 2: Using curl (Client Credentials)
curl -X POST "https://keycloak.example.com/realms/myrealm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=trino-client" \
-d "client_secret=client-secret"
The response will contain an access_token field. Use this token value in the token field of the Keycloak configuration.
Note: For long-lived tokens, configure the token lifespan in Keycloak realm settings or client settings.
Sink
sink:
type: trino
config:
# Trino server URL (required)
serverURL: "http://trino:8080"
# Catalog to use (required)
catalog: hive
# Schema to use (required)
schema: default
# Table to write to (required)
table: target_table
# Batch size for inserts (optional, default: 1). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Auto-create table (optional, default: false)
# If true, creates table with VARCHAR column for JSON data
autoCreateTable: true
# Raw mode (optional, default: false)
# When true, creates table with data VARCHAR column; messages stored as JSON string
# When false, uses columnar format matching message keys to table columns
rawMode: true
# Keycloak authentication (optional)
keycloak:
serverURL: "https://keycloak.example.com"
realm: myrealm
clientID: trino-client
clientSecret: client-secret
username: trino-user
password: trino-password
Features
- Batch Inserts: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Auto-create Tables: Automatically creates tables if they don't exist
- Raw Mode: When
rawMode: true, creates table withdata VARCHARcolumn; messages stored as JSON string. Whenfalse(default), uses columnar format matching message keys to table columns - Keycloak Authentication: OAuth2/OIDC authentication via Keycloak
- Automatic Token Refresh: Tokens are automatically refreshed
Example: Kafka to Trino with Keycloak
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-trino
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: trino
config:
serverURL: "http://trino:8080"
catalog: hive
schema: default
table: output_table
batchSize: 100
keycloak:
# Use long-lived token obtained from Keycloak
token: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
# Alternative: Use OAuth2 flow
# serverURL: "https://keycloak.example.com"
# realm: myrealm
# clientID: trino-client
# clientSecret: client-secret
# username: trino-user
# password: trino-password
Nessie
The Nessie connector reads from and writes to Apache Iceberg tables via the Nessie catalog (Iceberg REST API). All operations are performed in the context of a Nessie branch; metadata and data are managed by the catalog.
Source
source:
type: nessie
config:
# Nessie server base URL (required), e.g. https://nessie:19120
baseURL: "http://nessie:19120"
# Nessie branch to read from (optional, default: main)
branch: main
# Warehouse name for storage location (optional), e.g. for /iceberg/|warehouse
warehouse: ""
# Namespace (schema) of the Iceberg table (required)
namespace: my_schema
# Table name (required)
table: my_table
# Poll interval in seconds (optional, default: 10)
pollInterval: 10
# Authentication (optional)
bearerToken: "your-token"
# Or Basic auth:
# basicAuth:
# username: user
# password: pass
Features
- Branch context: Reads from the specified Nessie branch; table metadata is resolved from the catalog.
- Polling: Periodically scans the Iceberg table for new data.
- Authentication: Bearer token (OAuth2) or Basic auth for Nessie/Iceberg REST.
Sink
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
warehouse: ""
namespace: my_schema
table: my_table
# Batch size for appends (optional, default: 100). 0 = flush only by timer
batchSize: 100
# Flush interval in seconds (optional, default: 10). 0 = disable timer
batchFlushIntervalSeconds: 10
# Create table if it does not exist (optional); creates table with single "data" (string) column
autoCreateTable: true
bearerToken: "your-token"
# Or basicAuth: { username, password }
Features
- Branch context: Writes are committed to the specified Nessie branch via the catalog.
- Batch appends: Groups messages; flush when batch size or timer (10s) is reached. Size only:
batchFlushIntervalSeconds: 0. Timer only:batchSize: 0 - Auto-create table: Creates an Iceberg table with one
data(string) column for JSON payloads when the table does not exist. - Authentication: Same as source (Bearer or Basic).
Example: Kafka to Nessie (Iceberg)
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-nessie
spec:
source:
type: kafka
config:
brokers:
- kafka:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: nessie
config:
baseURL: "http://nessie:19120"
branch: main
namespace: analytics
table: events
batchSize: 100
autoCreateTable: true
Error Sink
DataFlow Operator supports configuring a separate sink for messages that failed to be written to the main sink. The errors section uses the same connector types (Kafka, PostgreSQL, Trino, ClickHouse, Nessie) as the main sink.
For configuration, error message structure, and error types, see Error Handling.
Performance Recommendations
Spec-level settings
channelBufferSize
Buffer size for message channels between source, processor, and sink (default 100). For high Kafka throughput (tens of thousands msg/s), increase to 500–1000 to reduce blocking when the sink is slower than the source.
Kafka
- Use multiple brokers for fault tolerance
- Configure an appropriate consumer group size for parallel processing
- Use batch writes for higher throughput
PostgreSQL
- Increase
batchSizefor the sink (recommended 50–100) - Add indexes on frequently queried columns
- Tune
pollIntervalbased on data update frequency
Troubleshooting
Connection issues
- Verify data source accessibility from the cluster
- Ensure credentials are correct
- Check Kubernetes network policies
- For TLS, verify certificates
Performance issues
- Increase
channelBufferSize(500–1000) for high Kafka load - Increase batch sizes for sinks
- Tune
pollIntervalfor sources - Scale operator instances if needed
- Monitor message processing metrics