Skip to content

DataFlowCron Examples

YAML samples for scheduled pipelines. Replace connection strings, table names, and service hosts with your own. Inside Job pods use Kubernetes service DNS (e.g. postgres.default.svc.cluster.local), not localhost.

Status fields

  • phase — high-level state (RunningTriggers, Completed, Failed, etc.).
  • currentRunID — identifier for the current logical run (used to name trigger Jobs).
  • currentTriggerIndex — which trigger step is active, when applicable.
  • activeJobName — Kubernetes Job name involved in the current failure or step.
  • lastScheduleTime, lastSuccessfulTime, lastFailedTime — timestamps for scheduling and outcomes.
  • conditions — standard Kubernetes-style conditions when populated.
kubectl get dataflowcron
kubectl describe dataflowcron <name>
kubectl get cronjob,job -l dataflow.dataflow.io/dataflow-cron=<name>

Processor only (no triggers)

PostgreSQL polling on a schedule and writing to another table. When the source batch is exhausted, the processor Job exits successfully.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlowCron
metadata:
  name: pg-nightly-sync
spec:
  schedule: "0 2 * * *"   # daily 02:00 (timezone = kube-controller-manager zone)
  concurrencyPolicy: Forbid
  source:
    type: postgresql
    config:
      connectionString: "postgres://user:pass@postgres:5432/db?sslmode=disable"
      table: public.orders_staging
      pollInterval: 5
  sink:
    type: postgresql
    config:
      connectionString: "postgres://user:pass@postgres:5432/db?sslmode=disable"
      table: public.orders_warehouse
      autoCreateTable: true
      batchSize: 200

ClickHouse → ClickHouse on a schedule

Periodic copy between tables (same layout as dataflow/config/samples/clickhouse-to-clickhouse.yaml, wrapped in DataFlowCron).

apiVersion: dataflow.dataflow.io/v1
kind: DataFlowCron
metadata:
  name: ch-hourly-copy
spec:
  schedule: "0 * * * *"
  concurrencyPolicy: Forbid
  source:
    type: clickhouse
    config:
      connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
      table: events_raw
      pollInterval: 10
  sink:
    type: clickhouse
    config:
      connectionString: "clickhouse://default@clickhouse:9000/default?dial_timeout=10s"
      table: events_hourly
      batchSize: 500
      autoCreateTable: true

Nessie (Iceberg) → Kafka

Export a Nessie-backed table to a topic on a schedule. See Nessie.

apiVersion: dataflow.dataflow.io/v1
kind: DataFlowCron
metadata:
  name: nessie-to-kafka-hourly
spec:
  schedule: "0 * * * *"
  source:
    type: nessie
    config:
      baseURL: "http://nessie:19120"
      branch: main
      authenticationType: BEARER
      bearerToken: "replace-with-token"
      namespace: analytics
      table: events
      pollInterval: 30
  sink:
    type: kafka
    config:
      brokers:
        - kafka:9092
      topic: iceberg-snapshot

With transformations

The embedded DataFlowSpec supports the same transformations chain as DataFlow:

  schedule: "*/30 * * * *"
  source:
    type: postgresql
    config:
      connectionString: "postgres://user:pass@postgres:5432/db?sslmode=disable"
      table: public.events
      pollInterval: 5
  transformations:
    - type: timestamp
      config: {}
    - type: select
      config:
        fields: ["id", "payload", "created_at"]
  sink:
    type: kafka
    config:
      brokers: [kafka:9092]
      topic: curated-events

One trigger after a successful run

HTTP webhook once the processor Job has completed successfully:

  schedule: "0 6 * * *"
  concurrencyPolicy: Forbid
  source:
    type: postgresql
    config:
      connectionString: "postgres://user:pass@postgres:5432/db?sslmode=disable"
      table: public.daily_export
      pollInterval: 5
  sink:
    type: postgresql
    config:
      connectionString: "postgres://user:pass@postgres:5432/db?sslmode=disable"
      table: public.daily_export_archive
      batchSize: 1000
  triggers:
    - name: notify-done
      image: curlimages/curl:8.8.0
      command: ["curl", "-fsS", "-X", "POST"]
      args: ["https://hooks.example.com/job-done"]

More trigger patterns: Triggers.

Sample manifest in the repo

Kafka source, Nessie sink, and two triggers (kubectl and curl with a Secret):

kubectl apply -f dataflow/config/samples/dataflowcron-example.yaml

File: dataflow/config/samples/dataflowcron-example.yaml. A shorter walkthrough is in Examples — DataFlowCron.

See also