Architecture
This section describes how the DataFlow Operator works: its role in Kubernetes, the reconciliation loop, and the runtime data flow inside each processor.
Overview
The DataFlow Operator provides declarative management of data pipelines via a Kubernetes Custom Resource (CRD). You define a DataFlow resource with a source, optional transformations, and a sink; the operator ensures that a processor workload runs in the cluster and continuously reads from the source, applies transformations, and writes to the sink (and optionally to an error sink).
High-level flow:
- You create or update a DataFlow (e.g. with
kubectl apply). - The operator watches DataFlow resources and, for each one, creates or updates a ConfigMap (with the resolved spec) and a Deployment (one pod running the processor binary).
- Each processor pod reads the spec from the mounted ConfigMap, connects to the configured source and sink(s), and runs the pipeline: read → transform → write.
Data Flow Pipeline (Conceptual)
The data flow in each processor follows a linear pipeline: Source → Transformations → Sink. Optionally, failed writes go to an Error Sink.
flowchart LR
subgraph Input[" "]
Source["Source\n(Kafka / PostgreSQL / Trino / ClickHouse / Nessie)"]
end
subgraph Transform[" "]
T1["Transform 1"]
T2["Transform 2"]
TN["Transform N"]
T1 --> T2 --> TN
end
subgraph Output[" "]
MainSink["Main Sink"]
ErrSink["Error Sink\n(optional)"]
end
Source -->|"read"| T1
TN -->|"write"| MainSink
TN -.->|"on failure"| ErrSink
Transformations are applied in order: timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase. Each message passes through the chain before being written to the sink.
Kubernetes Architecture
Custom Resource Definition (CRD)
- API group:
dataflow.dataflow.io - Resource:
dataflows(kindDataFlow), namespaced. - Spec includes:
- Source: type (e.g.
kafka,postgresql,trino) and type-specific config (brokers, topic, connection string, etc.). - Sink: type and config for the main destination.
- Transformations: ordered list (timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase, etc.).
- Errors: optional sink for messages that fail to be written to the main sink.
- Resources: optional CPU/memory for the processor pod.
- Scheduling: optional
nodeSelector,affinity,tolerations. - CheckpointPersistence: optional; defaults to
true. When enabled, polling sources (PostgreSQL, ClickHouse, Trino) persist read position to a ConfigMap, reducing duplicates on restart. Set tofalseto disable. - ChannelBufferSize: optional; defaults to
100. Buffer size for message channels between source, processor, and sink. Use 500–1000 for high Kafka throughput to reduce blocking when the sink is slower than the source.
Secrets can be referenced via SecretRef in the spec; the operator resolves them before writing the spec into the ConfigMap.
DataFlow CRD Structure
The following diagram shows the main fields of the DataFlow spec for quick reference:
flowchart TB
subgraph DataFlow["DataFlow"]
Spec["spec"]
Status["status"]
end
subgraph SpecFields["spec fields"]
Source["source (required)"]
Sink["sink (required)"]
Trans["transformations (optional)"]
Errors["errors (optional)"]
Resources["resources (optional)"]
Scheduling["scheduling (optional)"]
Checkpoint["checkpointPersistence (optional)"]
ChannelBuffer["channelBufferSize (optional)"]
Image["processorImage / processorVersion (optional)"]
end
Source --> SourceTypes["type: kafka | postgresql | trino | clickhouse | nessie"]
Sink --> SinkTypes["type: kafka | postgresql | trino | clickhouse | nessie"]
Trans --> TransTypes["timestamp | flatten | filter | mask | router | select | remove | snakeCase | camelCase"]
Spec --> Source
Spec --> Sink
Spec --> Trans
Spec --> Errors
Spec --> Resources
Spec --> Scheduling
Spec --> Checkpoint
Spec --> ChannelBuffer
Spec --> Image
Operator Deployment
The operator runs as a single Deployment in the cluster (e.g. installed via Helm). It uses controller-runtime and a single controller, DataFlowReconciler, which reconciles DataFlow resources. Leader election (ID dataflow-operator.dataflow.io) is enabled so that with multiple replicas only one active leader runs the reconciliation loop (high availability).
Controller
The DataFlowReconciler:
- Watches:
DataFlow(primary), and ownsDeploymentandConfigMapfor each DataFlow (so their lifecycle is tied to the DataFlow). - Optionally watches the operator’s own Deployment (when
OPERATOR_DEPLOYMENT_NAMEandOPERATOR_NAMESPACEare set). When that Deployment is updated (e.g. new image), the controller triggers reconciliation for all DataFlow resources so that processor Deployments can be updated to use the new processor image.
Resources Created per DataFlow
For each DataFlow <name> in a namespace:
| Resource | Name | Purpose |
|---|---|---|
| ConfigMap | dataflow-<name>-spec |
Holds spec.json (resolved spec with secrets inlined). |
| ConfigMap | dataflow-<name>-checkpoint |
Stores read position for polling sources (default). Omitted when checkpointPersistence: false. |
| Deployment | dataflow-<name> |
One replica; pod runs the processor container. |
| ServiceAccount, Role, RoleBinding | dataflow-<name>-processor |
RBAC for processor to read/write checkpoint ConfigMap (default). Omitted when checkpointPersistence: false. |
The processor container:
- Image: from env
PROCESSOR_IMAGE(often same as operator image/tag). - Args:
--spec-path=/etc/dataflow/spec.json,--namespace=...,--name=.... - Volume: ConfigMap
dataflow-<name>-specmounted at/etc/dataflow(read-only). - Env:
LOG_LEVEL(e.g. fromPROCESSOR_LOG_LEVEL).
The controller sets owner references from the DataFlow to the ConfigMap and Deployment so they are deleted when the DataFlow is deleted.
RBAC
The operator uses a ClusterRole (and ClusterRoleBinding to its ServiceAccount) with permissions to:
- Read/write DataFlow and status (CRD).
- Create/patch events.
- Read secrets (for resolution).
- Create/update/delete ConfigMaps and Deployments in the same namespaces as DataFlow resources.
- When checkpoint persistence is enabled: create ServiceAccounts, Roles, and RoleBindings for processor pods to access the checkpoint ConfigMap.
See the Helm templates (e.g. clusterrole.yaml, clusterrolebinding.yaml) for the exact rules.
Optional: GUI
The Helm chart can deploy an optional GUI (separate Deployment, Service, and optionally Ingress) for viewing and managing data flows. It is independent of the core operator and processor architecture.
Admission Webhook (Validating)
The operator can serve a Validating Admission Webhook: when a DataFlow resource is created or updated, the Kubernetes API server sends the object to the operator on port 9443; the operator validates the spec (source/sink types, required fields, allowed transformations, etc.) and either allows the operation or rejects it with a clear error message.
Why it matters: without the webhook, an invalid spec (wrong source/sink type, missing required fields, unknown transformation) only fails at runtime — after the controller has created the ConfigMap and Deployment and the processor fails to build a connector or transformer on startup. Users see the error in the DataFlow status or pod logs, not at kubectl apply. With the webhook enabled, invalid specs are rejected before being stored in etcd: kubectl apply and the GUI get a 4xx response with the error text, no extra resources are created, and no processor pod is started with a broken configuration.
Optional: the webhook is controlled only at the Helm level (value webhook.enabled). By default it is disabled: no ValidatingWebhookConfiguration is created, and the API server does not call the operator on DataFlow create/update. See Configuring the Validating Webhook in the development guide for TLS, caBundle, and production setup.
Architecture Diagram (Kubernetes)
The following diagram shows how the user, API server, operator, and processor pods interact.
flowchart LR
User["User (kubectl)"]
API["API Server"]
CRD["DataFlow CRD"]
Operator["Operator Pod"]
CMSpec["ConfigMap spec"]
CMCheckpoint["ConfigMap checkpoint"]
Dep["Deployment"]
Proc["Processor Pod"]
Ext["Kafka / PostgreSQL / Trino / Nessie"]
User -->|"apply DataFlow"| API
API --> CRD
Operator -->|watch| CRD
Operator -->|create/update| CMSpec
Operator -->|create/update| CMCheckpoint
Operator -->|create/update| Dep
Dep --> Proc
Proc -->|mount spec| CMSpec
Proc -->|read/write checkpoint| CMCheckpoint
Proc -->|connect| Ext
Reconciliation Loop
For each DataFlow, the controller runs the following steps (on create, update, or when owned resources change):
-
Get DataFlow
If not found, return. If DeletionTimestamp is set: delete the Deployment, ConfigMaps (spec and checkpoint), and processor RBAC (cleanup), update status toStopped, then return. -
Resolve secrets
Use SecretResolver to substitute allSecretReffields in the spec with values from Kubernetes Secrets. Result: resolved spec. -
ConfigMap
Create or update the ConfigMapdataflow-<name>-specwith keyspec.json= JSON of the resolved spec. Set controller reference to the DataFlow. -
Checkpoint ConfigMap and RBAC (when
checkpointPersistenceis notfalse, default: enabled)
Create ConfigMapdataflow-<name>-checkpointand RBAC (ServiceAccount, Role, RoleBinding) so the processor pod can read/write the checkpoint. The processor persists source read position (lastReadID, lastReadChangeTime) there, reducing duplicates on restart. -
Deployment
Create or update the Deploymentdataflow-<name>: processor image, volume from the spec ConfigMap, args and env as above. When checkpoint persistence is enabled, setserviceAccountNameso the pod uses the dedicated ServiceAccount. Use resources/affinity from DataFlow spec if set. Set controller reference to the DataFlow. -
Deployment status
Read the Deployment; set DataFlow status Phase and Message from it (e.g.RunningwhenReadyReplicas > 0,Pendingwhen replicas are starting,Errorwhen no replicas). -
Update DataFlow status
Write Phase, Message, and other status fields back to the DataFlow resource (with retry on conflict).
Reconcile Loop Diagram
flowchart TD
A[Get DataFlow] --> B{Deleted?}
B -->|Yes| C[Cleanup Deployment, ConfigMaps, RBAC]
C --> D[Update Status Stopped]
B -->|No| E[Resolve Secrets]
E --> F[Create or Update ConfigMap]
F --> F2{CheckpointPersistence?}
F2 -->|Yes| F3[Create Checkpoint ConfigMap and RBAC]
F2 -->|No| G
F3 --> G[Create or Update Deployment]
G --> H[Read Deployment Status]
H --> I[Update DataFlow Status]
Data Processor (Runtime)
The processor is the component that actually moves data: it reads from the source, applies transformations, and writes to the sink(s). It runs inside the pod created by the operator.
Entrypoint
The processor binary (built from cmd/processor/main.go) is started with:
--spec-path(default/etc/dataflow/spec.json)--namespace,--name(DataFlow namespace and name for logging and metrics)
It reads the spec from the file, builds a Processor from it, and runs Processor.Start(ctx) until the context is cancelled (e.g. SIGTERM).
Processor Structure
The Processor (in internal/processor/processor.go) is built from the spec and contains:
- Source: a SourceConnector (Kafka, PostgreSQL, Trino, or Nessie) —
Connect,Read,Close. By default, polling sources load initial checkpoint from ConfigMap and save it after each successful sink write (debounced). Disable withcheckpointPersistence: false. - Sink: a SinkConnector for the main destination —
Connect,Write,Close. - Error sink (optional): another SinkConnector for failed writes.
- Transformations: an ordered list of Transformer implementations (timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase).
- Router sinks: if a
routertransformation is used, a map from condition to SinkSpec for dynamic sinks.
Connectors are created by a factory from spec.Source and spec.Sink (and spec.Errors); see internal/connectors/.
Execution Flow
-
Connect
source.Connect(ctx), thensink.Connect(ctx), then optionallyerrorSink.Connect(ctx). -
Read
source.Read(ctx)returns a channel of Message (each message has payload and optional metadata, e.g.routed_conditionfrom the router). -
Process
A goroutine runs processMessages: for each message from the channel, it applies transformations in order. Each transformer takes one or more messages and returns one or more (e.g. filter drops some, flatten can emit many, router attachesrouted_conditionand can emit to different logical routes). Output messages are sent to an internal channel. -
Write
writeMessages consumes that channel: - If there are router sinks: messages are routed by metadata to the matching route sink or the default (main) sink; each route may have its own connector (created and connected on demand).
- Otherwise: all messages go to the main sink.
- On write failure, if an error sink is configured, the failed message can be sent there.
Connector interfaces are defined in internal/connectors/interface.go: SourceConnector (Connect, Read, Close), SinkConnector (Connect, Write, Close).
Connectors and Transformations
- Source/Sink types: Kafka, PostgreSQL, Trino, Nessie (selected by
spec.source.typeandspec.sink.type). - Transformations (order matters): timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase. They are applied in the order listed in
spec.transformations.
Data Flow in the Processor (Diagram)
flowchart LR
Src[Source Connector]
ReadChan[Read Channel]
Trans[Transform 1 .. N]
Write[writeMessages]
MainSink[Main Sink]
ErrSink[Error Sink]
RouteSinks[Router Sinks]
Src -->|Connect, Read| ReadChan
ReadChan --> Trans
Trans --> Write
Write --> MainSink
Write --> ErrSink
Write --> RouteSinks
Summary
- Kubernetes: You declare a DataFlow CR; the operator reconciles it into a ConfigMap (spec) and a Deployment (processor pod). By default, a second ConfigMap and RBAC are created for checkpoint storage (set
checkpointPersistence: falseto disable). RBAC and optional GUI complete the picture. - Reconciliation: Get DataFlow → resolve secrets → update ConfigMap → update Deployment → reflect Deployment status in DataFlow status.
- Runtime: Each processor pod runs a single pipeline: source → read channel → transformations → write to main (and optionally error and router) sinks, using pluggable connectors and a fixed set of transformations.