Skip to content

Transformations

DataFlow Operator supports message transformations that are applied sequentially to each message in the order specified in the configuration. Transformations use gjson JSONPath for field access.

Transformation Overview

Transformation Description Input Output
Timestamp Adds a timestamp field 1 message 1 message
Flatten Expands an array into separate messages 1 message N messages
Filter Keeps messages where a field is truthy 1 message 0 or 1 message
Mask Masks sensitive fields 1 message 1 message
Router Sends matching messages to alternate sinks 1 message 0 or 1 message
Select Keeps only specified fields 1 message 1 message
Remove Removes specified fields 1 message 1 message
SnakeCase Converts keys to snake_case 1 message 1 message
CamelCase Converts keys to CamelCase 1 message 1 message

Timestamp

Adds a timestamp field to each message. Useful for tracking message processing time.

Configuration

transformations:
  - type: timestamp
    config:
      # Field name for timestamp (optional, default: created_at)
      fieldName: created_at
      # Timestamp format (optional, default: RFC3339)
      format: RFC3339

Format

The format value is a Go time layout string. Default is RFC3339 (e.g. 2006-01-02T15:04:05Z07:00). Examples: RFC3339, RFC3339Nano, or custom layouts like 2006-01-02 15:04:05.

Examples

Basic usage

transformations:
  - type: timestamp
    config:
      fieldName: processed_at

Input message:

{
  "id": 1,
  "name": "Test"
}

Output message:

{
  "id": 1,
  "name": "Test",
  "processed_at": "2024-01-15T10:30:00Z"
}

Custom format

transformations:
  - type: timestamp
    config:
      fieldName: timestamp
      format: "2006-01-02 15:04:05"

Output message:

{
  "id": 1,
  "timestamp": "2024-01-15 10:30:00"
}

Unix timestamp

transformations:
  - type: timestamp
    config:
      fieldName: unix_time
      format: Unix

Output message:

{
  "id": 1,
  "unix_time": "1705312200"
}

Flatten

Expands an array into separate messages, preserving all other fields from the original message. Each array element is merged into the root; objects are flattened to top-level keys. If the field is not an array, the message is returned unchanged. Supports Avro-style arrays wrapped in an object with an array key.

Configuration

transformations:
  - type: flatten
    config:
      # JSONPath to the array to expand (required)
      field: items

Examples

Simple flatten

transformations:
  - type: flatten
    config:
      field: items

Input message:

{
  "order_id": 12345,
  "customer": "John Doe",
  "items": [
    {"product": "Apple", "quantity": 5},
    {"product": "Banana", "quantity": 3}
  ]
}

Output messages:

{
  "order_id": 12345,
  "customer": "John Doe",
  "product": "Apple",
  "quantity": 5
}

{
  "order_id": 12345,
  "customer": "John Doe",
  "product": "Banana",
  "quantity": 3
}

Nested arrays

transformations:
  - type: flatten
    config:
      field: orders.items

Input message:

{
  "customer_id": 100,
  "orders": {
    "items": [
      {"sku": "SKU001", "price": 10.99},
      {"sku": "SKU002", "price": 5.99}
    ]
  }
}

Output messages:

{
  "customer_id": 100,
  "orders": {},
  "sku": "SKU001",
  "price": 10.99
}

Combined with other transformations

transformations:
  - type: flatten
    config:
      field: rowsStock
  - type: timestamp
    config:
      fieldName: created_at

This creates a separate message for each array element, each with an added timestamp.

Filter

Keeps only messages where the field at the given JSONPath exists and is truthy (boolean true, non-empty string, non-zero number). Other messages are dropped. Comparison expressions (e.g. ==) are not supported; use Router for value-based routing.

Configuration

transformations:
  - type: filter
    config:
      # JSONPath to a field; message passes if field exists and is truthy (required)
      condition: "$.active"

JSONPath

Uses the gjson library: $.field, $.nested.field, $.array[0], etc.

Examples

Simple filtering

transformations:
  - type: filter
    config:
      condition: "$.active"

Input messages:

{"id": 1, "active": true}   // ✅ Passes
{"id": 2, "active": false}  // ❌ Filtered out
{"id": 3}                   // ❌ Filtered out (no field)

Filtering by value

transformations:
  - type: filter
    config:
      condition: "$.level"

Input messages:

{"level": "error"}    // ✅ Passes (non-empty string)
{"level": "warning"}  // ✅ Passes
{"level": ""}         // ❌ Filtered out
{"level": null}       // ❌ Filtered out

Filtering by numeric value

transformations:
  - type: filter
    config:
      condition: "$.amount"

Input messages:

{"amount": 100}  // ✅ Passes (non-zero)
{"amount": 0}    // ❌ Filtered out
{"amount": -5}   // ✅ Passes (non-zero)

Complex filtering

transformations:
  - type: filter
    config:
      condition: "$.user.status"

Input message:

{
  "user": {
    "status": "active"
  }
}

Result: Message passes if user.status exists and is non-empty.

Mask

Masks sensitive data in specified fields. Supports preserving length or full character replacement.

Configuration

transformations:
  - type: mask
    config:
      # List of JSONPath expressions to fields to mask (required)
      fields:
        - password
        - email
      # Character for masking (optional, default: *)
      maskChar: "*"
      # Preserve original length (optional, default: false)
      keepLength: true

Examples

Masking with length preservation

transformations:
  - type: mask
    config:
      fields:
        - password
        - email
      keepLength: true

Input message:

{
  "id": 1,
  "username": "john",
  "password": "secret123",
  "email": "john@example.com"
}

Output message:

{
  "id": 1,
  "username": "john",
  "password": "*********",
  "email": "****************"
}

Masking with fixed length

transformations:
  - type: mask
    config:
      fields:
        - password
      keepLength: false
      maskChar: "X"

Input message:

{
  "password": "verylongpassword123"
}

Output message:

{
  "password": "XXX"
}

Masking nested fields

transformations:
  - type: mask
    config:
      fields:
        - user.password
        - payment.cardNumber
      keepLength: true

Input message:

{
  "user": {
    "password": "secret"
  },
  "payment": {
    "cardNumber": "1234567890123456"
  }
}

Output message:

{
  "user": {
    "password": "******"
  },
  "payment": {
    "cardNumber": "****************"
  }
}

Router

Routes messages to different sinks based on conditions. The first matching route determines the sink; if none matches, the message goes to the main sink.

Condition syntax

  • Truthiness: $.field — the message matches if the field exists and is truthy (non-empty string, non-zero number, true).
  • Comparison: $.field == 'value' or $.field == "value" — matches when the field equals the given string.

Conditions are evaluated in order; the first match wins.

Configuration

transformations:
  - type: router
    config:
      routes:
        - condition: "$.level == 'error'"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-topic
        - condition: "$.level == 'warning'"
          sink:
            type: postgresql
            config:
              connectionString: "postgres://..."
              table: warnings

Features

  • Conditions are checked in order; the first match determines the sink
  • If no condition matches, the message goes to the main sink

Examples

Routing by log level

transformations:
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-logs

Input messages:

{"level": "error", "message": "Critical error"}     // → error-logs topic
{"level": "info", "message": "Info message"}       // → main sink
{"level": "warning", "message": "Warning"}         // → main sink

Multiple routes

transformations:
  - type: router
    config:
      routes:
        - condition: "$.type"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: events-topic
        - condition: "$.priority"
          sink:
            type: postgresql
            config:
              connectionString: "postgres://..."
              table: high_priority_events

Input messages:

{"type": "event", "data": "..."}           // → events-topic
{"priority": "high", "data": "..."}        // → high_priority_events table
{"data": "..."}                            // → main sink

Combined with other transformations

transformations:
  - type: timestamp
    config:
      fieldName: processed_at
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: errors

Timestamp is added first, then the message is routed.

Select

Keeps only the specified fields; all others are dropped. Each field is taken by JSONPath; the last path segment is used as the key in the output (e.g. user.name → key name), so the result is flat.

Configuration

transformations:
  - type: select
    config:
      # List of JSONPath expressions for fields to keep (required)
      fields:
        - id
        - name
        - email

Examples

Simple field selection

transformations:
  - type: select
    config:
      fields:
        - id
        - name
        - email

Input message:

{
  "id": 1,
  "name": "John Doe",
  "email": "john@example.com",
  "password": "secret",
  "internal_id": 999
}

Output message:

{
  "id": 1,
  "name": "John Doe",
  "email": "john@example.com"
}

Selecting nested fields (result is flat)

transformations:
  - type: select
    config:
      fields:
        - user.id
        - user.name
        - metadata.timestamp

Input message:

{
  "user": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "metadata": {
    "timestamp": "2024-01-15T10:30:00Z",
    "source": "api"
  }
}

Output message (keys are last path segments):

{
  "id": 1,
  "name": "John",
  "timestamp": "2024-01-15T10:30:00Z"
}

Remove

Removes specified fields from a message. Useful for data cleanup before sending.

Configuration

transformations:
  - type: remove
    config:
      # List of JSONPath expressions to fields to remove (required)
      fields:
        - password
        - internal_id

Examples

Removing sensitive fields

transformations:
  - type: remove
    config:
      fields:
        - password
        - creditCard
        - ssn

Input message:

{
  "id": 1,
  "name": "John Doe",
  "password": "secret",
  "creditCard": "1234-5678-9012-3456",
  "ssn": "123-45-6789"
}

Output message:

{
  "id": 1,
  "name": "John Doe"
}

Removing nested fields

transformations:
  - type: remove
    config:
      fields:
        - user.password
        - metadata.internal

Input message:

{
  "user": {
    "id": 1,
    "name": "John",
    "password": "secret"
  },
  "metadata": {
    "timestamp": "2024-01-15",
    "internal": "secret"
  }
}

Output message:

{
  "user": {
    "id": 1,
    "name": "John"
  },
  "metadata": {
    "timestamp": "2024-01-15"
  }
}

Order of Application

Transformations are applied sequentially in the order specified in the transformations list. Each transformation receives the result of the previous one.

Example sequence

transformations:
  # 1. Expand array
  - type: flatten
    config:
      field: items

  # 2. Add timestamp
  - type: timestamp
    config:
      fieldName: created_at

  # 3. Filter inactive
  - type: filter
    config:
      condition: "$.active"

  # 4. Remove internal fields
  - type: remove
    config:
      fields:
        - internal_id
        - debug_info

  # 5. Select only needed fields
  - type: select
    config:
      fields:
        - id
        - name
        - created_at
  1. Flatten should be first if you need to expand arrays
  2. Filter apply early to reduce the volume of processed data
  3. SnakeCase/CamelCase apply after Select/Remove, but before sending to sink
  4. Mask/Remove apply before Select for security
  5. Select apply at the end for final cleanup
  6. Timestamp can be applied anywhere, but usually at the beginning or end
  7. Router usually applied at the end, after all other transformations

Combined examples

Order processing

transformations:
  # Expand items into separate messages
  - type: flatten
    config:
      field: items

  # Add timestamp
  - type: timestamp
    config:
      fieldName: processed_at

  # Filter paid orders only
  - type: filter
    config:
      condition: "$.status"

  # Remove sensitive data
  - type: remove
    config:
      fields:
        - customer.creditCard
        - customer.cvv

Log processing

transformations:
  # Add timestamp
  - type: timestamp
    config:
      fieldName: timestamp

  # Mask IP addresses
  - type: mask
    config:
      fields:
        - ip_address
      keepLength: true

  # Route errors
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-logs

Field name normalization

transformations:
  # Select needed fields
  - type: select
    config:
      fields:
        - firstName
        - lastName
        - email
        - address

  # Convert to snake_case for PostgreSQL
  - type: snakeCase
    config:
      deep: true

Input message:

{
  "firstName": "John",
  "lastName": "Doe",
  "email": "john@example.com",
  "address": {
    "streetName": "Main St",
    "zipCode": "12345"
  }
}

Output message:

{
  "first_name": "John",
  "last_name": "Doe",
  "email": "john@example.com",
  "address": {
    "street_name": "Main St",
    "zip_code": "12345"
  }
}

JSONPath support

All transformations that work with fields support JSONPath syntax:

  • $.field — root field
  • $.nested.field — nested field
  • $.array[0] — array element by index
  • $.array[*] — all array elements
  • $.* — all root-level fields

Performance

  • Filter — apply early to reduce data volume
  • Select — reduces message size and improves performance
  • Flatten — can increase message count; use with care
  • Router — creates additional connections; minimize the number of routes

SnakeCase

Converts all JSON object keys to snake_case format. Useful for normalizing field names when integrating with systems using snake_case (e.g., PostgreSQL, Python API).

Configuration

transformations:
  - type: snakeCase
    config:
      # Recursively convert nested objects (optional, default: false)
      deep: true

Examples

Simple conversion

transformations:
  - type: snakeCase
    config:
      deep: false

Input message:

{
  "firstName": "John",
  "lastName": "Doe",
  "userName": "johndoe",
  "isActive": true,
  "itemCount": 42
}

Output message:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_name": "johndoe",
  "is_active": true,
  "item_count": 42
}

Recursive conversion

transformations:
  - type: snakeCase
    config:
      deep: true

Input message:

{
  "firstName": "John",
  "address": {
    "streetName": "Main St",
    "houseNumber": 123,
    "zipCode": "12345"
  },
  "items": [
    {
      "itemName": "Product",
      "itemPrice": 99.99
    }
  ]
}

Output message:

{
  "first_name": "John",
  "address": {
    "street_name": "Main St",
    "house_number": 123,
    "zip_code": "12345"
  },
  "items": [
    {
      "item_name": "Product",
      "item_price": 99.99
    }
  ]
}

PascalCase conversion

transformations:
  - type: snakeCase
    config:
      deep: false

Input message:

{
  "FirstName": "John",
  "LastName": "Doe",
  "UserID": 123
}

Output message:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_id": 123
}

Features

  • Converts camelCasesnake_case
  • Converts PascalCasesnake_case
  • Handles consecutive capitals (e.g. XMLHttpRequestxml_http_request)
  • Leaves existing snake_case keys unchanged
  • With deep: false converts only top-level keys
  • With deep: true recursively converts all nested objects and arrays

CamelCase

Converts all JSON object keys to CamelCase (PascalCase) format. Useful for normalizing field names when integrating with systems using CamelCase (e.g., Java, C# API).

Configuration

transformations:
  - type: camelCase
    config:
      # Recursively convert nested objects (optional, default: false)
      deep: true

Examples

Simple conversion

transformations:
  - type: camelCase
    config:
      deep: false

Input message:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_name": "johndoe",
  "is_active": true,
  "item_count": 42
}

Output message:

{
  "FirstName": "John",
  "LastName": "Doe",
  "UserName": "johndoe",
  "IsActive": true,
  "ItemCount": 42
}

Recursive conversion

transformations:
  - type: camelCase
    config:
      deep: true

Input message:

{
  "first_name": "John",
  "address": {
    "street_name": "Main St",
    "house_number": 123,
    "zip_code": "12345"
  },
  "items": [
    {
      "item_name": "Product",
      "item_price": 99.99
    }
  ]
}

Output message:

{
  "FirstName": "John",
  "Address": {
    "StreetName": "Main St",
    "HouseNumber": 123,
    "ZipCode": "12345"
  },
  "Items": [
    {
      "ItemName": "Product",
      "ItemPrice": 99.99
    }
  ]
}

Single word conversion

transformations:
  - type: camelCase
    config:
      deep: false

Input message:

{
  "name": "John",
  "id": 123
}

Output message:

{
  "Name": "John",
  "Id": 123
}

Features

  • Converts snake_caseCamelCase
  • All words start with capital letter (PascalCase)
  • Leaves existing CamelCase keys unchanged
  • With deep: false converts only top-level keys
  • With deep: true recursively converts all nested objects and arrays

Planned transformations

The following are not yet available in the API (CRD):

  • ReplaceField — rename fields (e.g. old.pathnew.path)
  • HeaderFrom — copy Kafka message headers into the message body

Use the Connectors and Examples for current capabilities.

Limitations

  • Filter: only field existence and truthiness are checked; comparisons (e.g. ==) are not supported — use Router for value-based routing.
  • Router: conditions are checked in order; the first match determines the route. Supported formats: $.field (truthiness) and $.field == 'value'.
  • Flatten: works only with arrays (including Avro-style wrapper with array key), not arbitrary objects.
  • Select: result is always flat; the key is the last JSONPath segment.
  • SnakeCase and CamelCase: work only with valid JSON; binary data is returned unchanged.