Skip to content

Transformations

DataFlow Operator поддерживает трансформации сообщений, которые применяются последовательно к каждому сообщению в порядке, указанном в конфигурации. Для доступа к полям используется gjson JSONPath.

Обзор трансформаций

Трансформация Описание Вход Выход
Timestamp Добавляет поле с временной меткой 1 сообщение 1 сообщение
Flatten Разворачивает массив в отдельные сообщения 1 сообщение N сообщений
Filter Оставляет сообщения, где поле «истинно» 1 сообщение 0 или 1 сообщение
Mask Маскирует указанные поля 1 сообщение 1 сообщение
Router Отправляет совпадающие сообщения в альтернативные приёмники 1 сообщение 0 или 1 сообщение
Select Оставляет только указанные поля 1 сообщение 1 сообщение
Remove Удаляет указанные поля 1 сообщение 1 сообщение
SnakeCase Преобразует ключи в snake_case 1 сообщение 1 сообщение
CamelCase Преобразует ключи в CamelCase 1 сообщение 1 сообщение

Timestamp

Добавляет поле с временной меткой к каждому сообщению. Полезно для отслеживания времени обработки сообщений.

Конфигурация

transformations:
  - type: timestamp
    config:
      # Имя поля для временной метки (опционально, по умолчанию: created_at)
      fieldName: created_at

      # Формат временной метки (опционально, по умолчанию: RFC3339)
      # Поддерживаются все форматы Go time package
      format: RFC3339

Формат

Поле format задаёт формат времени Go. По умолчанию — RFC3339 (например, 2006-01-02T15:04:05Z07:00). Можно указать RFC3339, RFC3339Nano или свой шаблон, например 2006-01-02 15:04:05.

Примеры

Базовое использование

transformations:
  - type: timestamp
    config:
      fieldName: processed_at

Входное сообщение:

{
  "id": 1,
  "name": "Test"
}

Выходное сообщение:

{
  "id": 1,
  "name": "Test",
  "processed_at": "2024-01-15T10:30:00Z"
}

Кастомный формат

transformations:
  - type: timestamp
    config:
      fieldName: timestamp
      format: "2006-01-02 15:04:05"

Выходное сообщение:

{
  "id": 1,
  "timestamp": "2024-01-15 10:30:00"
}

Unix timestamp

transformations:
  - type: timestamp
    config:
      fieldName: unix_time
      format: Unix

Выходное сообщение:

{
  "id": 1,
  "unix_time": "1705312200"
}

Flatten

Разворачивает массив в отдельные сообщения, сохраняя остальные поля исходного сообщения. Каждый элемент массива подставляется в корень; объекты разворачиваются в ключи верхнего уровня. Если поле не массив — сообщение возвращается без изменений. Поддерживаются массивы в формате Avro (объект с ключом array).

Конфигурация

transformations:
  - type: flatten
    config:
      # JSONPath к массиву для развертывания (обязательно)
      field: items

Примеры

Простое развертывание

transformations:
  - type: flatten
    config:
      field: items

Входное сообщение:

{
  "order_id": 12345,
  "customer": "John Doe",
  "items": [
    {"product": "Apple", "quantity": 5},
    {"product": "Banana", "quantity": 3}
  ]
}

Выходные сообщения:

{
  "order_id": 12345,
  "customer": "John Doe",
  "product": "Apple",
  "quantity": 5
}

{
  "order_id": 12345,
  "customer": "John Doe",
  "product": "Banana",
  "quantity": 3
}

Вложенные массивы

transformations:
  - type: flatten
    config:
      field: orders.items

Входное сообщение:

{
  "customer_id": 100,
  "orders": {
    "items": [
      {"sku": "SKU001", "price": 10.99},
      {"sku": "SKU002", "price": 5.99}
    ]
  }
}

Выходные сообщения:

{
  "customer_id": 100,
  "orders": {},
  "sku": "SKU001",
  "price": 10.99
}

Комбинация с другими трансформациями

transformations:
  - type: flatten
    config:
      field: rowsStock
  - type: timestamp
    config:
      fieldName: created_at

Это создаст отдельное сообщение для каждого элемента массива, каждое с добавленной временной меткой.

Filter

Оставляет только те сообщения, у которых поле по указанному JSONPath существует и является «истинным» (булево true, непустая строка, ненулевое число). Остальные сообщения отбрасываются. Выражения сравнения (например ==) не поддерживаются; для маршрутизации по значению используйте Router.

Конфигурация

transformations:
  - type: filter
    config:
      # JSONPath к полю; сообщение проходит, если поле есть и истинно (обязательно)
      condition: "$.active"

JSONPath

Используется библиотека gjson: $.field, $.nested.field, $.array[0] и т.д.

Примеры

Простая фильтрация

transformations:
  - type: filter
    config:
      condition: "$.active"

Входные сообщения:

{"id": 1, "active": true}   // ✅ Проходит
{"id": 2, "active": false}  // ❌ Отфильтровывается
{"id": 3}                   // ❌ Отфильтровывается (нет поля)

Фильтрация по значению

transformations:
  - type: filter
    config:
      condition: "$.level"

Входные сообщения:

{"level": "error"}    // ✅ Проходит (непустая строка)
{"level": "warning"}  // ✅ Проходит
{"level": ""}        // ❌ Отфильтровывается
{"level": null}      // ❌ Отфильтровывается

Фильтрация по числовому значению

transformations:
  - type: filter
    config:
      condition: "$.amount"

Входные сообщения:

{"amount": 100}  // ✅ Проходит (не ноль)
{"amount": 0}    // ❌ Отфильтровывается
{"amount": -5}   // ✅ Проходит (не ноль)

Комплексная фильтрация

transformations:
  - type: filter
    config:
      condition: "$.user.status"

Входное сообщение:

{
  "user": {
    "status": "active"
  }
}

Результат: Сообщение проходит, если user.status существует и не пусто.

Mask

Маскирует чувствительные данные в указанных полях. Поддерживает сохранение длины или полное замещение символами.

Конфигурация

transformations:
  - type: mask
    config:
      # Список JSONPath выражений к полям для маскирования (обязательно)
      fields:
        - password
        - email
        - creditCard

      # Символ для маскирования (опционально, по умолчанию: *)
      maskChar: "*"

      # Сохранять оригинальную длину (опционально, по умолчанию: false)
      keepLength: true

Примеры

Маскирование с сохранением длины

transformations:
  - type: mask
    config:
      fields:
        - password
        - email
      keepLength: true

Входное сообщение:

{
  "id": 1,
  "username": "john",
  "password": "secret123",
  "email": "john@example.com"
}

Выходное сообщение:

{
  "id": 1,
  "username": "john",
  "password": "*********",
  "email": "****************"
}

Маскирование с фиксированной длиной

transformations:
  - type: mask
    config:
      fields:
        - password
      keepLength: false
      maskChar: "X"

Входное сообщение:

{
  "password": "verylongpassword123"
}

Выходное сообщение:

{
  "password": "XXX"
}

Маскирование вложенных полей

transformations:
  - type: mask
    config:
      fields:
        - user.password
        - payment.cardNumber
      keepLength: true

Входное сообщение:

{
  "user": {
    "password": "secret"
  },
  "payment": {
    "cardNumber": "1234567890123456"
  }
}

Выходное сообщение:

{
  "user": {
    "password": "******"
  },
  "payment": {
    "cardNumber": "****************"
  }
}

Router

Маршрутизирует сообщения в разные приёмники по условиям. Первое совпавшее условие задаёт приёмник; если ни одно не подошло — используется основной sink.

Синтаксис условий

  • Истинность: $.field — совпадение, если поле есть и «истинно» (непустая строка, ненулевое число, true).
  • Сравнение: $.field == 'value' или $.field == "value" — совпадение, когда значение поля равно заданной строке.

Условия проверяются по порядку; первое совпадение определяет маршрут.

Конфигурация

transformations:
  - type: router
    config:
      routes:
        - condition: "$.level == 'error'"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-topic
        - condition: "$.level == 'warning'"
          sink:
            type: postgresql
            config:
              connectionString: "..."
              table: warnings

Особенности

  • Условия проверяются в порядке указания; первое совпадение определяет приёмник
  • Если ни одно условие не совпало, сообщение идёт в основной sink

Примеры

Маршрутизация по уровню логирования

transformations:
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-logs

Входные сообщения:

{"level": "error", "message": "Critical error"}     // → error-logs топик
{"level": "info", "message": "Info message"}       // → основной приемник
{"level": "warning", "message": "Warning"}          // → основной приемник

Множественная маршрутизация

transformations:
  - type: router
    config:
      routes:
        - condition: "$.type"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: events-topic
        - condition: "$.priority"
          sink:
            type: postgresql
            config:
              connectionString: "postgres://..."
              table: high_priority_events

Входные сообщения:

{"type": "event", "data": "..."}           // → events-topic
{"priority": "high", "data": "..."}       // → high_priority_events таблица
{"data": "..."}                           // → основной приемник

Комбинация с другими трансформациями

transformations:
  - type: timestamp
    config:
      fieldName: processed_at
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: errors

Сначала добавляется временная метка, затем сообщение маршрутизируется.

Select

Оставляет только указанные поля; остальные удаляются. Для каждого поля используется JSONPath; в выводе ключом становится последний сегмент пути (например, user.name → ключ name), поэтому результат плоский.

Конфигурация

transformations:
  - type: select
    config:
      # Список JSONPath к полям, которые нужно оставить (обязательно)
      fields:
        - id
        - name
        - email

Примеры

Простой выбор полей

transformations:
  - type: select
    config:
      fields:
        - id
        - name
        - email

Входное сообщение:

{
  "id": 1,
  "name": "John Doe",
  "email": "john@example.com",
  "password": "secret",
  "internal_id": 999
}

Выходное сообщение:

{
  "id": 1,
  "name": "John Doe",
  "email": "john@example.com"
}

Выбор вложенных полей (результат — плоский)

transformations:
  - type: select
    config:
      fields:
        - user.id
        - user.name
        - metadata.timestamp

Входное сообщение:

{
  "user": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "metadata": {
    "timestamp": "2024-01-15T10:30:00Z",
    "source": "api"
  }
}

Выходное сообщение (ключи — последние сегменты путей):

{
  "id": 1,
  "name": "John",
  "timestamp": "2024-01-15T10:30:00Z"
}

Remove

Удаляет указанные поля из сообщения. Полезно для очистки данных перед отправкой.

Конфигурация

transformations:
  - type: remove
    config:
      # Список JSONPath выражений к полям для удаления (обязательно)
      fields:
        - password
        - internal_id
        - secret_token

Примеры

Удаление чувствительных полей

transformations:
  - type: remove
    config:
      fields:
        - password
        - creditCard
        - ssn

Входное сообщение:

{
  "id": 1,
  "name": "John Doe",
  "password": "secret",
  "creditCard": "1234-5678-9012-3456",
  "ssn": "123-45-6789"
}

Выходное сообщение:

{
  "id": 1,
  "name": "John Doe"
}

Удаление вложенных полей

transformations:
  - type: remove
    config:
      fields:
        - user.password
        - metadata.internal

Входное сообщение:

{
  "user": {
    "id": 1,
    "name": "John",
    "password": "secret"
  },
  "metadata": {
    "timestamp": "2024-01-15",
    "internal": "secret"
  }
}

Выходное сообщение:

{
  "user": {
    "id": 1,
    "name": "John"
  },
  "metadata": {
    "timestamp": "2024-01-15"
  }
}

Порядок применения

Трансформации применяются последовательно в порядке, указанном в списке transformations. Каждая трансформация получает результат предыдущей.

Пример последовательности

transformations:
  # 1. Развернуть массив
  - type: flatten
    config:
      field: items

  # 2. Добавить временную метку
  - type: timestamp
    config:
      fieldName: created_at

  # 3. Отфильтровать неактивные
  - type: filter
    config:
      condition: "$.active"

  # 4. Удалить внутренние поля
  - type: remove
    config:
      fields:
        - internal_id
        - debug_info

  # 5. Выбрать только нужные поля
  - type: select
    config:
      fields:
        - id
        - name
        - created_at

Рекомендации по порядку

  1. Flatten должен быть первым, если нужно развернуть массивы
  2. Filter применяйте рано, чтобы уменьшить объем обрабатываемых данных
  3. SnakeCase/CamelCase применяйте после Select/Remove, но перед отправкой в приемник
  4. Mask/Remove применяйте перед Select для безопасности
  5. Select применяйте в конце для финальной очистки
  6. Timestamp можно применять в любом месте, но обычно в начале или конце
  7. Router обычно применяется в конце, после всех других трансформаций

Комбинированные примеры

Обработка заказов

transformations:
  # Развернуть товары в отдельные сообщения
  - type: flatten
    config:
      field: items

  # Добавить временную метку
  - type: timestamp
    config:
      fieldName: processed_at

  # Фильтровать только оплаченные заказы
  - type: filter
    config:
      condition: "$.status"

  # Удалить чувствительные данные
  - type: remove
    config:
      fields:
        - customer.creditCard
        - customer.cvv

Обработка логов

transformations:
  # Добавить временную метку
  - type: timestamp
    config:
      fieldName: timestamp

  # Маскировать IP адреса
  - type: mask
    config:
      fields:
        - ip_address
      keepLength: true

  # Маршрутизировать ошибки
  - type: router
    config:
      routes:
        - condition: "$.level"
          sink:
            type: kafka
            config:
              brokers: ["localhost:9092"]
              topic: error-logs

Нормализация имен полей

transformations:
  # Выбрать нужные поля
  - type: select
    config:
      fields:
        - firstName
        - lastName
        - email
        - address

  # Преобразовать в snake_case для PostgreSQL
  - type: snakeCase
    config:
      deep: true

Входное сообщение:

{
  "firstName": "John",
  "lastName": "Doe",
  "email": "john@example.com",
  "address": {
    "streetName": "Main St",
    "zipCode": "12345"
  }
}

Выходное сообщение:

{
  "first_name": "John",
  "last_name": "Doe",
  "email": "john@example.com",
  "address": {
    "street_name": "Main St",
    "zip_code": "12345"
  }
}

JSONPath поддержка

Все трансформации, работающие с полями, поддерживают JSONPath синтаксис:

  • $.field - корневое поле
  • $.nested.field - вложенное поле
  • $.array[0] - элемент массива по индексу
  • $.array[*] - все элементы массива
  • $.* - все поля корневого уровня

Производительность

  • Filter - применяйте рано для уменьшения объема данных
  • Select - уменьшает размер сообщений и повышает производительность
  • Flatten - может увеличить количество сообщений, используйте осторожно
  • Router - создает дополнительные подключения, минимизируйте количество маршрутов

SnakeCase

Преобразует все ключи JSON-объекта в формат snake_case. Полезно для нормализации имен полей при интеграции с системами, использующими snake_case (например, PostgreSQL, Python API).

Конфигурация

transformations:
  - type: snakeCase
    config:
      # Рекурсивно преобразовывать вложенные объекты (опционально, по умолчанию: false)
      deep: true

Примеры

Простое преобразование

transformations:
  - type: snakeCase
    config:
      deep: false

Входное сообщение:

{
  "firstName": "John",
  "lastName": "Doe",
  "userName": "johndoe",
  "isActive": true,
  "itemCount": 42
}

Выходное сообщение:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_name": "johndoe",
  "is_active": true,
  "item_count": 42
}

Рекурсивное преобразование

transformations:
  - type: snakeCase
    config:
      deep: true

Входное сообщение:

{
  "firstName": "John",
  "address": {
    "streetName": "Main St",
    "houseNumber": 123,
    "zipCode": "12345"
  },
  "items": [
    {
      "itemName": "Product",
      "itemPrice": 99.99
    }
  ]
}

Выходное сообщение:

{
  "first_name": "John",
  "address": {
    "street_name": "Main St",
    "house_number": 123,
    "zip_code": "12345"
  },
  "items": [
    {
      "item_name": "Product",
      "item_price": 99.99
    }
  ]
}

Преобразование PascalCase

transformations:
  - type: snakeCase
    config:
      deep: false

Входное сообщение:

{
  "FirstName": "John",
  "LastName": "Doe",
  "UserID": 123
}

Выходное сообщение:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_id": 123
}

Особенности

  • Преобразует camelCasesnake_case
  • Преобразует PascalCasesnake_case
  • Обрабатывает последовательные заглавные буквы (например, XMLHttpRequestxml_http_request)
  • Сохраняет уже существующие ключи в snake_case без изменений
  • При deep: false преобразует только ключи верхнего уровня
  • При deep: true рекурсивно преобразует все вложенные объекты и массивы

CamelCase

Преобразует все ключи JSON-объекта в формат CamelCase (PascalCase). Полезно для нормализации имен полей при интеграции с системами, использующими CamelCase (например, Java, C# API).

Конфигурация

transformations:
  - type: camelCase
    config:
      # Рекурсивно преобразовывать вложенные объекты (опционально, по умолчанию: false)
      deep: true

Примеры

Простое преобразование

transformations:
  - type: camelCase
    config:
      deep: false

Входное сообщение:

{
  "first_name": "John",
  "last_name": "Doe",
  "user_name": "johndoe",
  "is_active": true,
  "item_count": 42
}

Выходное сообщение:

{
  "FirstName": "John",
  "LastName": "Doe",
  "UserName": "johndoe",
  "IsActive": true,
  "ItemCount": 42
}

Рекурсивное преобразование

transformations:
  - type: camelCase
    config:
      deep: true

Входное сообщение:

{
  "first_name": "John",
  "address": {
    "street_name": "Main St",
    "house_number": 123,
    "zip_code": "12345"
  },
  "items": [
    {
      "item_name": "Product",
      "item_price": 99.99
    }
  ]
}

Выходное сообщение:

{
  "FirstName": "John",
  "Address": {
    "StreetName": "Main St",
    "HouseNumber": 123,
    "ZipCode": "12345"
  },
  "Items": [
    {
      "ItemName": "Product",
      "ItemPrice": 99.99
    }
  ]
}

Преобразование одиночных слов

transformations:
  - type: camelCase
    config:
      deep: false

Входное сообщение:

{
  "name": "John",
  "id": 123
}

Выходное сообщение:

{
  "Name": "John",
  "Id": 123
}

Особенности

  • Преобразует snake_caseCamelCase
  • Все слова начинаются с заглавной буквы (PascalCase)
  • Сохраняет уже существующие ключи в CamelCase без изменений
  • При deep: false преобразует только ключи верхнего уровня
  • При deep: true рекурсивно преобразует все вложенные объекты и массивы

Планируемые трансформации

В API (CRD) пока недоступны:

  • ReplaceField — переименование полей (например, старый.путьновый.путь)
  • HeaderFrom — копирование заголовков Kafka в тело сообщения

Актуальные возможности см. в Коннекторы и Примеры.

Ограничения

  • Filter: проверяется только существование поля и его «истинность»; сравнения (например ==) не поддерживаются — для маршрутизации по значению используйте Router.
  • Router: условия проверяются по порядку; первое совпадение задаёт маршрут. Поддерживаются форматы $.field (истинность) и $.field == 'value'.
  • Flatten: работает только с массивами (в т.ч. в обёртке Avro с ключом array), не с произвольными объектами.
  • Select: результат всегда плоский; ключом становится последний сегмент JSONPath.
  • SnakeCase и CamelCase: работают только с валидным JSON; бинарные данные возвращаются без изменений.