Fault Tolerance and Data Consistency
DataFlow Operator processes messages with at-least-once delivery semantics. When the processor pod crashes or restarts, some messages may be re-read and written again. This document explains the behavior, risks of data desynchronization, and how to configure idempotent sinks to prevent duplicates.
Delivery Semantics
- At-least-once: Each message is delivered at least once. Duplicates are possible on processor restart or crash.
- Exactly-once: Not supported natively. Use idempotent sinks to achieve effectively-once semantics.
Source Behavior on Restart
| Source | State storage | On restart |
|---|---|---|
| Kafka | Consumer group (Kafka) | Resumes from last committed offset. No duplicates if offset was committed after sink write. |
| PostgreSQL | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
| ClickHouse | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
| Trino | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
Kafka Source
The Kafka consumer commits offset only after the message is successfully written to the sink (via msg.Ack()). If the processor crashes:
- Before sink write: Offset not committed. On restart, message is re-read. No duplicate in sink.
- After sink write, before Ack: Data may be in sink, offset not committed. On restart, re-read → duplicate in sink.
- After Ack: Offset committed. On restart, resume from next message. No duplicate.
Polling Sources (PostgreSQL, ClickHouse, Trino)
By default, read position (lastReadID, lastReadChangeTime) is stored only in memory. On pod crash:
- State is lost.
- On restart, the source re-reads from the beginning (or from a wrong position).
- Duplicates or gaps are possible depending on when the crash occurred.
Checkpoint persistence is enabled by default. The read position is persisted to a ConfigMap. On restart, the source resumes from the last committed position, reducing duplicates. Set checkpointPersistence: false in spec to disable.
Idempotent sink required
For polling sources, always configure an idempotent sink (UPSERT, ReplacingMergeTree) to handle duplicates safely.
Batch Sink Behavior
PostgreSQL, ClickHouse, and Trino sinks write in batches. The flow is:
- Accumulate messages in batch
- Execute
Commit(transaction) - Call
Ack()for each message (commits Kafka offset, if applicable)
If the processor crashes between Commit and the last Ack:
- Data is already in the sink
- Kafka offset may not be committed
- On restart: re-read from Kafka → duplicate writes to sink
Reduce duplicate window
Use a smaller batchSize to reduce the number of messages at risk of duplication on crash.
Idempotent Sink Configuration
PostgreSQL Sink
Enable UPSERT mode so that duplicate inserts update existing rows instead of failing:
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: output_table
upsertMode: true
conflictKey: ["id"] # Optional; defaults to PRIMARY KEY
Requires the table to have a PRIMARY KEY or UNIQUE constraint on the conflict columns.
ClickHouse Sink
Use ReplacingMergeTree engine for automatic deduplication by a version column:
CREATE TABLE output_table (
id UInt64,
data String,
created_at DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(created_at)
ORDER BY id;
Or create the table with autoCreateTable: true and rawMode: false — the connector infers column types. For deduplication, create the table manually with ReplacingMergeTree(version_column) and ORDER BY on the deduplication key.
Kafka Sink
The Kafka producer uses RequiredAcks = WaitForAll and Producer.Idempotent = true for durability and to prevent duplicate messages on retry. Consumers should still handle potential duplicates (e.g., by idempotent processing or deduplication by key) for end-to-end exactly-once semantics.
Best Practices
- Use idempotent sinks for PostgreSQL (UPSERT) and ClickHouse (ReplacingMergeTree) when using polling sources or when duplicates are possible.
- Kafka source: Consumer group stores offset; at-least-once is preserved. Idempotent sink recommended for batch sinks.
- batchSize: Smaller batches reduce the duplicate window on crash. Balance with throughput.
- batchFlushIntervalSeconds: Shorter intervals flush more frequently, reducing in-flight data at risk.
- Error sink: Configure
spec.errorsto capture failed messages for replay or analysis.
Graceful Shutdown
On SIGTERM (e.g., pod eviction, node drain):
- The processor receives the signal and cancels the context.
- Sinks flush in-flight batches before exiting.
PreStop: sleep 5gives time for the load balancer to stop routing traffic.
Ensure terminationGracePeriodSeconds is sufficient for large batches to flush (default: 600 seconds).
Checkpoint Persistence
Enabled by default
The checkpointPersistence field in the DataFlow spec defaults to true. You do not need to set it explicitly — checkpoint persistence is enabled for all DataFlows with polling sources.
Checkpoint persistence is enabled by default. The read position (lastReadID, lastReadChangeTime) is persisted to ConfigMap dataflow-<name>-checkpoint. On processor restart, polling sources (PostgreSQL, ClickHouse, Trino) resume from the last committed position, reducing duplicates.
To disable, set checkpointPersistence: false:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: my-dataflow
spec:
checkpointPersistence: false # Disable (default: true)
source:
type: postgresql
# ...
The controller creates the ConfigMap and RBAC (ServiceAccount, Role, RoleBinding) for the processor. Checkpoint is saved with debounce (every 30 seconds) and on graceful shutdown.
Summary Checklist
| Scenario | Recommendation |
|---|---|
| PostgreSQL sink | Enable upsertMode: true with PRIMARY KEY or conflictKey |
| ClickHouse sink | Use ReplacingMergeTree with ORDER BY on deduplication key |
| Kafka source | Consumer group persists offset; idempotent sink recommended |
| Polling sources | Always use idempotent sink; checkpoint persistence enabled by default |
| batchSize | Consider smaller values to reduce duplicate window |