Fault Tolerance and Data Consistency
DataFlow Operator processes messages with at-least-once delivery semantics. When the processor pod crashes or restarts, some messages may be re-read and written again. This document explains the behavior, risks of data desynchronization, and how to configure idempotent sinks to prevent duplicates.
Delivery Semantics
- At-least-once: Each message is delivered at least once. Duplicates are possible on processor restart or crash.
- Exactly-once: Not supported natively. Use idempotent sinks to achieve effectively-once semantics.
Source Behavior on Restart
| Source | State storage | On restart |
|---|---|---|
| Kafka | Consumer group (Kafka) | Resumes from last committed offset. No duplicates if offset was committed after sink write. |
| PostgreSQL | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
| PostgreSQL CDC | ConfigMap (lastAckedLSN) |
Resumes logical replication from last acked LSN after sink write. |
| ClickHouse | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
| Trino | ConfigMap (default); in-memory when checkpointPersistence: false |
By default resumes from last position. Without persistence: re-reads from beginning. |
| Nessie | ConfigMap when incrementalBySnapshot: true and checkpointPersistence (default) |
Incremental reads along the Iceberg snapshot chain; without incrementalBySnapshot, full scan on every poll (no checkpoint). |
| Iceberg | ConfigMap when incrementalBySnapshot: true and checkpointPersistence (default) |
Same as Nessie; checkpoint store key is iceberg. |
Horizontal scaling (spec.replicas)
- Kafka: you may set
spec.replicas > 1. All pods share one consumer group; parallelism is capped by the topic partition count. - PostgreSQL, PostgreSQL CDC, ClickHouse, Trino, Nessie:
replicasmust be1(or unset). Multiple pods with a shared checkpoint ConfigMap will duplicate data. - DataFlowCron:
replicas > 1is not supported (one processor Job per schedule tick).
Kafka Source
The Kafka consumer marks offset only after the message is successfully written to the sink (via msg.Ack()). With ackGranularity: message (see below), offsets are also committed to the consumer group immediately after each mark.
If the processor crashes:
- Before sink write: Offset not committed. On restart, message is re-read. No duplicate in sink.
- After sink write, before Ack: Data may be in sink, offset not committed. On restart, re-read → duplicate in sink.
- After Ack: Offset marked (and committed when
ackGranularity: message). On restart, resume from next message. No duplicate.
Nessie source (incremental mode)
When source.config.incrementalBySnapshot: true, the processor reads only new Iceberg snapshots since the last Ack. Checkpoint (lastAckedSnapshotID, lastAckedSnapshotSequence) is stored in a ConfigMap when snapshotCheckpoints (default) and spec.checkpointPersistence are enabled.
See nessie-incremental-snapshots-design.md.
Polling Sources (PostgreSQL, ClickHouse, Trino)
Checkpoint persistence is enabled by default. The read position (lastReadChangeTime, lastReadOrderByValue) is persisted to ConfigMap df-<name>-checkpoint. On restart, the source resumes from the last committed position after sink Ack, reducing duplicates. Set checkpointPersistence: false in spec to store checkpoint only in memory (lost on pod crash).
Legacy checkpoint keys (lastReadID, lastReadTime) are migrated on load; see the migration table below.
Idempotent sink required
For polling sources, always configure an idempotent sink (UPSERT, ReplacingMergeTree) to handle duplicates safely.
Batch Sink Behavior
PostgreSQL, ClickHouse, and Trino sinks write in batches. The flow is:
- Accumulate messages in a batch
- Execute the batch write (PostgreSQL wraps all statements in a single transaction and commits atomically)
- Call
Ack()for each message in the batch (commits Kafka offset / advances polling checkpoint)
If the processor crashes after a successful batch commit but before Ack:
- Data is already in the sink
- Source offset / checkpoint may not be advanced
- On restart: re-read → duplicate writes to sink (safe with an idempotent sink)
Reduce duplicate window
Set ackGranularity: message to ack after each message (effective batchSize: 1 for batch sinks), or use a smaller batchSize with ackGranularity: batch (default).
Ack Granularity (spec.ackGranularity)
Controls when source offsets are committed relative to sink writes:
| Value | Behavior |
|---|---|
batch (default) |
Batch sinks ack all messages after a successful batch flush. Kafka source relies on consumer auto-commit interval after MarkMessage. |
message |
Each message is acked immediately after a successful write. Batch sinks flush one message at a time. Kafka source calls Commit() after each mark for faster offset persistence. |
Recommended for Kafka → batch sink pipelines where you want a smaller re-read window without tuning batchSize manually:
spec:
ackGranularity: message
sink:
type: postgresql
config:
upsertMode: true
conflictKey: material_id
Kafka sink always acks per message regardless of this setting.
Trino long-running INSERTs
For large JSON payloads and Iceberg/Nessie tables, keep batchSize low (often 1) and set sink.config.queryTimeoutSeconds to cover the full Trino execution window (including nextUri polling).
Timeouts during nextUri follow can happen after Trino already started processing the INSERT, so retries may produce duplicates.
Idempotent Sink Configuration
PostgreSQL Sink
Enable UPSERT mode so that duplicate inserts update existing rows instead of failing. Batch writes run inside an explicit transaction (all-or-nothing per flush).
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: output_table
upsertMode: true
conflictKey: id # Optional; defaults to PRIMARY KEY
# Optional: skip stale replays when a version column exists in the payload
upsertStrategy: ifNewer # always (default) | ifNewer
upsertVersionColumn: updated_at # required when upsertStrategy is ifNewer
Requires the table to have a PRIMARY KEY or UNIQUE constraint on the conflict columns. With upsertStrategy: ifNewer, updates apply only when EXCLUDED.<version> > target.<version>.
ClickHouse Sink
Enable upsertMode for idempotent writes via ReplacingMergeTree (auto-created tables use this engine when upsertMode: true):
sink:
type: clickhouse
config:
connectionString: "clickhouse://..."
table: output_table
upsertMode: true
conflictKey: id
replacingVersionColumn: updated_at # optional version column for ReplacingMergeTree
tableEngine: ReplacingMergeTree # optional; default when upsertMode is true
Or create the table manually:
CREATE TABLE output_table (
id UInt64,
data String,
created_at DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(created_at)
ORDER BY id;
Duplicates may be visible until background merge; use FINAL or rely on merge for read-time deduplication.
Trino Sink
For Iceberg catalogs, enable MERGE-based upsert:
sink:
type: trino
config:
serverURL: "http://trino:8080"
catalog: iceberg # catalog name must contain "iceberg"
schema: default
table: output_table
upsertMode: true
conflictKey: id
On match, rows are updated; there is no ifNewer version guard for Trino yet.
Kafka Sink
The Kafka producer uses RequiredAcks = WaitForAll and Producer.Idempotent = true for durability and to prevent duplicate messages on retry. Consumers should still handle potential duplicates (e.g., by idempotent processing or deduplication by key) for end-to-end exactly-once semantics.
Best Practices
- Use idempotent sinks for PostgreSQL (UPSERT), ClickHouse (
upsertMode/ ReplacingMergeTree), and Trino Iceberg (MERGE) when using polling sources or when duplicates are possible. - Kafka source: Consumer group stores offset; at-least-once is preserved. Idempotent sink recommended for batch sinks. Use
ackGranularity: messageto shrink the re-read window. - batchSize / ackGranularity: Smaller batches or
ackGranularity: messagereduce the duplicate window on crash. Balance with throughput. - Migration / cron workloads: combine
checkpointSyncOnAck: true, idempotent sink, and optionallyupsertStrategy: ifNewerwhen a version column exists. - Trino
queryTimeoutSeconds: Use a timeout large enough for peak load; too low values increase false failures on long INSERTs. - batchFlushIntervalSeconds: Shorter intervals flush more frequently, reducing in-flight data at risk.
- Error sink: Configure
spec.errorsto capture failed messages for replay or analysis.
Graceful Shutdown
On SIGTERM (e.g., pod eviction, node drain):
- The processor receives the signal and cancels the context.
- Sinks flush in-flight batches before exiting.
PreStop: sleep 5gives time for the load balancer to stop routing traffic.
Ensure terminationGracePeriodSeconds is sufficient for large batches to flush (default: 600 seconds).
Checkpoint Persistence
Enabled by default
The checkpointPersistence field in the DataFlow spec defaults to true. You do not need to set it explicitly — checkpoint persistence is enabled for all DataFlows with polling sources.
Checkpoint persistence is enabled by default. The read position (lastReadChangeTime, lastReadOrderByValue) is persisted to ConfigMap df-<name>-checkpoint. On processor restart, polling sources (PostgreSQL, ClickHouse, Trino) resume from the last committed position, reducing duplicates.
Canonical checkpoint JSON per source type:
{
"lastReadChangeTime": "2024-06-01T12:00:00.123456789Z",
"lastReadOrderByValue": 5042
}
Legacy formats are normalized on load:
| Legacy | Canonical |
|---|---|
Trino: {"lastReadID": 100} |
{"lastReadOrderByValue": 100} |
ClickHouse: {"lastReadID": 100, "lastReadTime": "..."} |
composite fields above |
Time-only: {"lastReadChangeTime": "..."} |
unchanged (single-column WHERE until order key appears) |
After restart, Trino/ClickHouse checkpoints that only had lastReadID use order-key filtering (WHERE orderByColumn > N) until the first ack with a timestamp; then tuple filtering (changeTrackingColumn, orderByColumn) > (time, key) applies.
To disable, set checkpointPersistence: false:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: my-dataflow
spec:
checkpointPersistence: false # Disable (default: true)
source:
type: postgresql
# ...
The controller creates the ConfigMap and RBAC (ServiceAccount, Role, RoleBinding) for the processor. Checkpoint is saved with debounce (every 30 seconds by default) and on graceful shutdown.
Sync checkpoint on ack (spec.checkpointSyncOnAck)
By default, pending checkpoints are flushed to the ConfigMap on a debounce timer (checkpointSaveInterval, default 30s) and on graceful shutdown. After a pod crash, polling sources may re-read up to one debounce interval of data.
Set checkpointSyncOnAck: true to flush the checkpoint immediately after each sink batch ack (coalesced, not more often than checkpointSaveInterval). Recommended for migration and cron workloads:
spec:
checkpointSyncOnAck: true
checkpointSaveInterval: 5s
source:
type: postgresql
# ...
sink:
type: postgresql
config:
upsertMode: true
conflictKey: material_id
Checkpoint reset
To re-run a migration or cron job from the beginning without manually editing df-<name>-checkpoint:
spec:
checkpointReset: true # one-shot; cleared by the controller after the next reconcile
Or set an annotation on the DataFlow:
metadata:
annotations:
dataflow.dataflow.io/reset-checkpoint: "true"
The processor clears the persisted checkpoint for the source type on startup, then reads from the beginning.
Strict idempotency (spec.strictIdempotency)
When strictIdempotency: true, admission rejects polling sources paired with a main sink that does not have upsertMode enabled. When false (default), a warning is emitted instead.
Summary Checklist
| Scenario | Recommendation |
|---|---|
| PostgreSQL sink | upsertMode: true + conflictKey; upsertStrategy: ifNewer when version column exists |
| ClickHouse sink | upsertMode: true or manual ReplacingMergeTree + ORDER BY dedup key |
| Trino sink (Iceberg) | upsertMode: true + conflictKey |
| Kafka → batch sink | ackGranularity: message or smaller batchSize + idempotent sink |
| Kafka source | Idempotent sink; ackGranularity: message for faster offset commit |
| Polling sources | Idempotent sink; checkpointSyncOnAck: true for migration/cron |
| batchSize | Smaller values or ackGranularity: message to reduce duplicate window |