Transformations
DataFlow Operator supports message transformations that are applied sequentially to each message in the order specified in the configuration. Transformations use gjson JSONPath for field access.
Transformation Overview
| Transformation | Description | Input | Output |
|---|---|---|---|
| Timestamp | Adds a timestamp field | 1 message | 1 message |
| Flatten | Expands an array into separate messages | 1 message | N messages |
| Filter | Keeps messages where a field is truthy | 1 message | 0 or 1 message |
| Mask | Masks sensitive fields | 1 message | 1 message |
| Router | Sends matching messages to alternate sinks | 1 message | 0 or 1 message |
| Select | Keeps only specified fields | 1 message | 1 message |
| Remove | Removes specified fields | 1 message | 1 message |
| SnakeCase | Converts keys to snake_case | 1 message | 1 message |
| CamelCase | Converts keys to CamelCase | 1 message | 1 message |
Timestamp
Adds a timestamp field to each message. Useful for tracking message processing time.
Configuration
transformations:
- type: timestamp
config:
# Field name for timestamp (optional, default: created_at)
fieldName: created_at
# Timestamp format (optional, default: RFC3339)
format: RFC3339
Format
The format value is a Go time layout string. Default is RFC3339 (e.g. 2006-01-02T15:04:05Z07:00). Examples: RFC3339, RFC3339Nano, or custom layouts like 2006-01-02 15:04:05.
Examples
Basic usage
transformations:
- type: timestamp
config:
fieldName: processed_at
Input message:
{
"id": 1,
"name": "Test"
}
Output message:
{
"id": 1,
"name": "Test",
"processed_at": "2024-01-15T10:30:00Z"
}
Custom format
transformations:
- type: timestamp
config:
fieldName: timestamp
format: "2006-01-02 15:04:05"
Output message:
{
"id": 1,
"timestamp": "2024-01-15 10:30:00"
}
Unix timestamp
transformations:
- type: timestamp
config:
fieldName: unix_time
format: Unix
Output message:
{
"id": 1,
"unix_time": "1705312200"
}
Flatten
Expands an array into separate messages, preserving all other fields from the original message. Each array element is merged into the root; objects are flattened to top-level keys. If the field is not an array, the message is returned unchanged. Supports Avro-style arrays wrapped in an object with an array key.
Configuration
transformations:
- type: flatten
config:
# JSONPath to the array to expand (required)
field: items
Examples
Simple flatten
transformations:
- type: flatten
config:
field: items
Input message:
{
"order_id": 12345,
"customer": "John Doe",
"items": [
{"product": "Apple", "quantity": 5},
{"product": "Banana", "quantity": 3}
]
}
Output messages:
{
"order_id": 12345,
"customer": "John Doe",
"product": "Apple",
"quantity": 5
}
{
"order_id": 12345,
"customer": "John Doe",
"product": "Banana",
"quantity": 3
}
Nested arrays
transformations:
- type: flatten
config:
field: orders.items
Input message:
{
"customer_id": 100,
"orders": {
"items": [
{"sku": "SKU001", "price": 10.99},
{"sku": "SKU002", "price": 5.99}
]
}
}
Output messages:
{
"customer_id": 100,
"orders": {},
"sku": "SKU001",
"price": 10.99
}
Combined with other transformations
transformations:
- type: flatten
config:
field: rowsStock
- type: timestamp
config:
fieldName: created_at
This creates a separate message for each array element, each with an added timestamp.
Filter
Keeps only messages where the field at the given JSONPath exists and is truthy (boolean true, non-empty string, non-zero number). Other messages are dropped. Comparison expressions (e.g. ==) are not supported; use Router for value-based routing.
Configuration
transformations:
- type: filter
config:
# JSONPath to a field; message passes if field exists and is truthy (required)
condition: "$.active"
JSONPath
Uses the gjson library: $.field, $.nested.field, $.array[0], etc.
Examples
Simple filtering
transformations:
- type: filter
config:
condition: "$.active"
Input messages:
{"id": 1, "active": true} // ✅ Passes
{"id": 2, "active": false} // ❌ Filtered out
{"id": 3} // ❌ Filtered out (no field)
Filtering by value
transformations:
- type: filter
config:
condition: "$.level"
Input messages:
{"level": "error"} // ✅ Passes (non-empty string)
{"level": "warning"} // ✅ Passes
{"level": ""} // ❌ Filtered out
{"level": null} // ❌ Filtered out
Filtering by numeric value
transformations:
- type: filter
config:
condition: "$.amount"
Input messages:
{"amount": 100} // ✅ Passes (non-zero)
{"amount": 0} // ❌ Filtered out
{"amount": -5} // ✅ Passes (non-zero)
Complex filtering
transformations:
- type: filter
config:
condition: "$.user.status"
Input message:
{
"user": {
"status": "active"
}
}
Result: Message passes if user.status exists and is non-empty.
Mask
Masks sensitive data in specified fields. Supports preserving length or full character replacement.
Configuration
transformations:
- type: mask
config:
# List of JSONPath expressions to fields to mask (required)
fields:
- password
- email
# Character for masking (optional, default: *)
maskChar: "*"
# Preserve original length (optional, default: false)
keepLength: true
Examples
Masking with length preservation
transformations:
- type: mask
config:
fields:
- password
- email
keepLength: true
Input message:
{
"id": 1,
"username": "john",
"password": "secret123",
"email": "john@example.com"
}
Output message:
{
"id": 1,
"username": "john",
"password": "*********",
"email": "****************"
}
Masking with fixed length
transformations:
- type: mask
config:
fields:
- password
keepLength: false
maskChar: "X"
Input message:
{
"password": "verylongpassword123"
}
Output message:
{
"password": "XXX"
}
Masking nested fields
transformations:
- type: mask
config:
fields:
- user.password
- payment.cardNumber
keepLength: true
Input message:
{
"user": {
"password": "secret"
},
"payment": {
"cardNumber": "1234567890123456"
}
}
Output message:
{
"user": {
"password": "******"
},
"payment": {
"cardNumber": "****************"
}
}
Router
Routes messages to different sinks based on conditions. The first matching route determines the sink; if none matches, the message goes to the main sink.
Condition syntax
- Truthiness:
$.field— the message matches if the field exists and is truthy (non-empty string, non-zero number,true). - Comparison:
$.field == 'value'or$.field == "value"— matches when the field equals the given string.
Conditions are evaluated in order; the first match wins.
Configuration
transformations:
- type: router
config:
routes:
- condition: "$.level == 'error'"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-topic
- condition: "$.level == 'warning'"
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: warnings
Features
- Conditions are checked in order; the first match determines the sink
- If no condition matches, the message goes to the main sink
Examples
Routing by log level
transformations:
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-logs
Input messages:
{"level": "error", "message": "Critical error"} // → error-logs topic
{"level": "info", "message": "Info message"} // → main sink
{"level": "warning", "message": "Warning"} // → main sink
Multiple routes
transformations:
- type: router
config:
routes:
- condition: "$.type"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: events-topic
- condition: "$.priority"
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: high_priority_events
Input messages:
{"type": "event", "data": "..."} // → events-topic
{"priority": "high", "data": "..."} // → high_priority_events table
{"data": "..."} // → main sink
Combined with other transformations
transformations:
- type: timestamp
config:
fieldName: processed_at
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: errors
Timestamp is added first, then the message is routed.
Select
Keeps only the specified fields; all others are dropped. Each field is taken by JSONPath; the last path segment is used as the key in the output (e.g. user.name → key name), so the result is flat.
Configuration
transformations:
- type: select
config:
# List of JSONPath expressions for fields to keep (required)
fields:
- id
- name
- email
Examples
Simple field selection
transformations:
- type: select
config:
fields:
- id
- name
- email
Input message:
{
"id": 1,
"name": "John Doe",
"email": "john@example.com",
"password": "secret",
"internal_id": 999
}
Output message:
{
"id": 1,
"name": "John Doe",
"email": "john@example.com"
}
Selecting nested fields (result is flat)
transformations:
- type: select
config:
fields:
- user.id
- user.name
- metadata.timestamp
Input message:
{
"user": {
"id": 1,
"name": "John",
"email": "john@example.com"
},
"metadata": {
"timestamp": "2024-01-15T10:30:00Z",
"source": "api"
}
}
Output message (keys are last path segments):
{
"id": 1,
"name": "John",
"timestamp": "2024-01-15T10:30:00Z"
}
Remove
Removes specified fields from a message. Useful for data cleanup before sending.
Configuration
transformations:
- type: remove
config:
# List of JSONPath expressions to fields to remove (required)
fields:
- password
- internal_id
Examples
Removing sensitive fields
transformations:
- type: remove
config:
fields:
- password
- creditCard
- ssn
Input message:
{
"id": 1,
"name": "John Doe",
"password": "secret",
"creditCard": "1234-5678-9012-3456",
"ssn": "123-45-6789"
}
Output message:
{
"id": 1,
"name": "John Doe"
}
Removing nested fields
transformations:
- type: remove
config:
fields:
- user.password
- metadata.internal
Input message:
{
"user": {
"id": 1,
"name": "John",
"password": "secret"
},
"metadata": {
"timestamp": "2024-01-15",
"internal": "secret"
}
}
Output message:
{
"user": {
"id": 1,
"name": "John"
},
"metadata": {
"timestamp": "2024-01-15"
}
}
Order of Application
Transformations are applied sequentially in the order specified in the transformations list. Each transformation receives the result of the previous one.
Example sequence
transformations:
# 1. Expand array
- type: flatten
config:
field: items
# 2. Add timestamp
- type: timestamp
config:
fieldName: created_at
# 3. Filter inactive
- type: filter
config:
condition: "$.active"
# 4. Remove internal fields
- type: remove
config:
fields:
- internal_id
- debug_info
# 5. Select only needed fields
- type: select
config:
fields:
- id
- name
- created_at
Recommended Order
- Flatten should be first if you need to expand arrays
- Filter apply early to reduce the volume of processed data
- SnakeCase/CamelCase apply after Select/Remove, but before sending to sink
- Mask/Remove apply before Select for security
- Select apply at the end for final cleanup
- Timestamp can be applied anywhere, but usually at the beginning or end
- Router usually applied at the end, after all other transformations
Combined examples
Order processing
transformations:
# Expand items into separate messages
- type: flatten
config:
field: items
# Add timestamp
- type: timestamp
config:
fieldName: processed_at
# Filter paid orders only
- type: filter
config:
condition: "$.status"
# Remove sensitive data
- type: remove
config:
fields:
- customer.creditCard
- customer.cvv
Log processing
transformations:
# Add timestamp
- type: timestamp
config:
fieldName: timestamp
# Mask IP addresses
- type: mask
config:
fields:
- ip_address
keepLength: true
# Route errors
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-logs
Field name normalization
transformations:
# Select needed fields
- type: select
config:
fields:
- firstName
- lastName
- email
- address
# Convert to snake_case for PostgreSQL
- type: snakeCase
config:
deep: true
Input message:
{
"firstName": "John",
"lastName": "Doe",
"email": "john@example.com",
"address": {
"streetName": "Main St",
"zipCode": "12345"
}
}
Output message:
{
"first_name": "John",
"last_name": "Doe",
"email": "john@example.com",
"address": {
"street_name": "Main St",
"zip_code": "12345"
}
}
JSONPath support
All transformations that work with fields support JSONPath syntax:
$.field— root field$.nested.field— nested field$.array[0]— array element by index$.array[*]— all array elements$.*— all root-level fields
Performance
- Filter — apply early to reduce data volume
- Select — reduces message size and improves performance
- Flatten — can increase message count; use with care
- Router — creates additional connections; minimize the number of routes
SnakeCase
Converts all JSON object keys to snake_case format. Useful for normalizing field names when integrating with systems using snake_case (e.g., PostgreSQL, Python API).
Configuration
transformations:
- type: snakeCase
config:
# Recursively convert nested objects (optional, default: false)
deep: true
Examples
Simple conversion
transformations:
- type: snakeCase
config:
deep: false
Input message:
{
"firstName": "John",
"lastName": "Doe",
"userName": "johndoe",
"isActive": true,
"itemCount": 42
}
Output message:
{
"first_name": "John",
"last_name": "Doe",
"user_name": "johndoe",
"is_active": true,
"item_count": 42
}
Recursive conversion
transformations:
- type: snakeCase
config:
deep: true
Input message:
{
"firstName": "John",
"address": {
"streetName": "Main St",
"houseNumber": 123,
"zipCode": "12345"
},
"items": [
{
"itemName": "Product",
"itemPrice": 99.99
}
]
}
Output message:
{
"first_name": "John",
"address": {
"street_name": "Main St",
"house_number": 123,
"zip_code": "12345"
},
"items": [
{
"item_name": "Product",
"item_price": 99.99
}
]
}
PascalCase conversion
transformations:
- type: snakeCase
config:
deep: false
Input message:
{
"FirstName": "John",
"LastName": "Doe",
"UserID": 123
}
Output message:
{
"first_name": "John",
"last_name": "Doe",
"user_id": 123
}
Features
- Converts
camelCase→snake_case - Converts
PascalCase→snake_case - Handles consecutive capitals (e.g.
XMLHttpRequest→xml_http_request) - Leaves existing snake_case keys unchanged
- With
deep: falseconverts only top-level keys - With
deep: truerecursively converts all nested objects and arrays
CamelCase
Converts all JSON object keys to CamelCase (PascalCase) format. Useful for normalizing field names when integrating with systems using CamelCase (e.g., Java, C# API).
Configuration
transformations:
- type: camelCase
config:
# Recursively convert nested objects (optional, default: false)
deep: true
Examples
Simple conversion
transformations:
- type: camelCase
config:
deep: false
Input message:
{
"first_name": "John",
"last_name": "Doe",
"user_name": "johndoe",
"is_active": true,
"item_count": 42
}
Output message:
{
"FirstName": "John",
"LastName": "Doe",
"UserName": "johndoe",
"IsActive": true,
"ItemCount": 42
}
Recursive conversion
transformations:
- type: camelCase
config:
deep: true
Input message:
{
"first_name": "John",
"address": {
"street_name": "Main St",
"house_number": 123,
"zip_code": "12345"
},
"items": [
{
"item_name": "Product",
"item_price": 99.99
}
]
}
Output message:
{
"FirstName": "John",
"Address": {
"StreetName": "Main St",
"HouseNumber": 123,
"ZipCode": "12345"
},
"Items": [
{
"ItemName": "Product",
"ItemPrice": 99.99
}
]
}
Single word conversion
transformations:
- type: camelCase
config:
deep: false
Input message:
{
"name": "John",
"id": 123
}
Output message:
{
"Name": "John",
"Id": 123
}
Features
- Converts
snake_case→CamelCase - All words start with capital letter (PascalCase)
- Leaves existing CamelCase keys unchanged
- With
deep: falseconverts only top-level keys - With
deep: truerecursively converts all nested objects and arrays
Planned transformations
The following are not yet available in the API (CRD):
- ReplaceField — rename fields (e.g.
old.path→new.path) - HeaderFrom — copy Kafka message headers into the message body
Use the Connectors and Examples for current capabilities.
Limitations
- Filter: only field existence and truthiness are checked; comparisons (e.g.
==) are not supported — use Router for value-based routing. - Router: conditions are checked in order; the first match determines the route. Supported formats:
$.field(truthiness) and$.field == 'value'. - Flatten: works only with arrays (including Avro-style wrapper with
arraykey), not arbitrary objects. - Select: result is always flat; the key is the last JSONPath segment.
- SnakeCase and CamelCase: work only with valid JSON; binary data is returned unchanged.