Connector Protocol (stdin/stdout)
DataFlow supports running connectors as separate binaries that communicate with the processor via stdin/stdout using JSON Lines (one JSON object per line).
Overview
When DATAFLOW_USE_SUBPROCESS_CONNECTORS=1 is set and a connector binary is found (see Connector Discovery), the processor spawns the binary and exchanges commands and responses over pipes.
flowchart LR
subgraph Processor [Processor]
Core[Core]
end
subgraph Connector [Connector Binary]
Bin[dataflow-connector-kafka]
end
Core -->|stdin: JSON Lines| Bin
Bin -->|stdout: JSON Lines| Core
Protocol Format
- Encoding: UTF-8
- Format: One JSON object per line, newline-separated
- Direction: Processor writes to connector stdin, reads from connector stdout
- Flush: Processor flushes stdin after each command
Commands (Processor → Connector)
| cmd | Description | Fields |
|---|---|---|
init |
Initialize with config | role, type, config, options |
connect |
Establish connection | — |
read |
Read messages (source only) | limit (optional) |
write |
Write messages (sink only) | messages |
close |
Close connection | — |
ping |
Liveness check | — |
Responses (Connector → Processor)
| type | Description | Fields |
|---|---|---|
ready |
Ready for next command | — |
message |
Single message | data, metadata, timestamp |
done |
Operation complete | — |
error |
Error occurred | message, code |
checkpoint |
Checkpoint data (source) | data |
pong |
Response to ping | — |
Message Format (wire)
{
"data": "{\"id\":1,\"name\":\"foo\"}",
"metadata": {
"kafka_partition": 0,
"kafka_offset": 42
},
"timestamp": "2024-01-15T10:30:00Z"
}
data: JSON string (raw payload)metadata: Object (partition, offset, etc.)timestamp: RFC3339
Lifecycle
Source Connector
- Processor:
{"cmd":"init","role":"source","type":"kafka","config":{...},"options":{...}} - Connector:
{"type":"ready"} - Processor:
{"cmd":"connect"} - Connector:
{"type":"ready"}or{"type":"error","message":"..."} - Loop: Processor:
{"cmd":"read"}→ Connector:{"type":"message",...}× N,{"type":"done"} - (Optional) Connector:
{"type":"checkpoint","data":"..."}for polling sources - Processor:
{"cmd":"close"} - Connector:
{"type":"done"}
Sink Connector
- Processor:
{"cmd":"init","role":"sink","type":"kafka","config":{...}} - Connector:
{"type":"ready"} - Processor:
{"cmd":"connect"} - Connector:
{"type":"ready"} - Processor:
{"cmd":"write","messages":[...]} - Connector:
{"type":"done"}or{"type":"error","message":"..."} - Processor:
{"cmd":"close"} - Connector:
{"type":"done"}
Connector Discovery
The processor searches for connector binaries in this order:
- DATAFLOW_CONNECTOR_PATH — Colon-separated (Unix) or semicolon-separated (Windows) list of directories
- ./connectors/ — Relative to working directory
- /usr/local/lib/dataflow/connectors/
- PATH — Binary name
dataflow-connector-<type>
Example:
export DATAFLOW_CONNECTOR_PATH=/opt/dataflow/connectors:/plugins
export DATAFLOW_USE_SUBPROCESS_CONNECTORS=1
Enabling Subprocess Connectors
Set the environment variable in the processor pod:
DATAFLOW_USE_SUBPROCESS_CONNECTORS=1
Ensure the connector binary is available (in PATH or DATAFLOW_CONNECTOR_PATH). If the binary is not found, the processor falls back to in-process connectors.
Building Connector Binaries
dataflow-connector-kafka
cd dataflow
task build-connector-kafka
# Output: bin/dataflow-connector-kafka
Add to PATH or DATAFLOW_CONNECTOR_PATH:
export PATH="$PATH:$(pwd)/bin"
# or
mkdir -p connectors && cp bin/dataflow-connector-kafka connectors/
See Also
- Connector Development — Adding in-process connectors
- Subprocess connectors: design notes — Goals and tradeoffs only; the wire format on this page is authoritative