Transformations
DataFlow Operator поддерживает трансформации сообщений, которые применяются последовательно к каждому сообщению в порядке, указанном в конфигурации. Для доступа к полям используется gjson JSONPath.
Обзор трансформаций
| Трансформация | Описание | Вход | Выход |
|---|---|---|---|
| Timestamp | Добавляет поле с временной меткой | 1 сообщение | 1 сообщение |
| Flatten | Разворачивает массив в отдельные сообщения | 1 сообщение | N сообщений |
| Filter | Оставляет сообщения, где поле «истинно» | 1 сообщение | 0 или 1 сообщение |
| Mask | Маскирует указанные поля | 1 сообщение | 1 сообщение |
| Router | Отправляет совпадающие сообщения в альтернативные приёмники | 1 сообщение | 0 или 1 сообщение |
| Select | Оставляет только указанные поля | 1 сообщение | 1 сообщение |
| Remove | Удаляет указанные поля | 1 сообщение | 1 сообщение |
| SnakeCase | Преобразует ключи в snake_case | 1 сообщение | 1 сообщение |
| CamelCase | Преобразует ключи в CamelCase | 1 сообщение | 1 сообщение |
Timestamp
Добавляет поле с временной меткой к каждому сообщению. Полезно для отслеживания времени обработки сообщений.
Конфигурация
transformations:
- type: timestamp
config:
# Имя поля для временной метки (опционально, по умолчанию: created_at)
fieldName: created_at
# Формат временной метки (опционально, по умолчанию: RFC3339)
# Поддерживаются все форматы Go time package
format: RFC3339
Формат
Поле format задаёт формат времени Go. По умолчанию — RFC3339 (например, 2006-01-02T15:04:05Z07:00). Можно указать RFC3339, RFC3339Nano или свой шаблон, например 2006-01-02 15:04:05.
Примеры
Базовое использование
transformations:
- type: timestamp
config:
fieldName: processed_at
Входное сообщение:
{
"id": 1,
"name": "Test"
}
Выходное сообщение:
{
"id": 1,
"name": "Test",
"processed_at": "2024-01-15T10:30:00Z"
}
Кастомный формат
transformations:
- type: timestamp
config:
fieldName: timestamp
format: "2006-01-02 15:04:05"
Выходное сообщение:
{
"id": 1,
"timestamp": "2024-01-15 10:30:00"
}
Unix timestamp
transformations:
- type: timestamp
config:
fieldName: unix_time
format: Unix
Выходное сообщение:
{
"id": 1,
"unix_time": "1705312200"
}
Flatten
Разворачивает массив в отдельные сообщения, сохраняя остальные поля исходного сообщения. Каждый элемент массива подставляется в корень; объекты разворачиваются в ключи верхнего уровня. Если поле не массив — сообщение возвращается без изменений. Поддерживаются массивы в формате Avro (объект с ключом array).
Конфигурация
transformations:
- type: flatten
config:
# JSONPath к массиву для развертывания (обязательно)
field: items
Примеры
Простое развертывание
transformations:
- type: flatten
config:
field: items
Входное сообщение:
{
"order_id": 12345,
"customer": "John Doe",
"items": [
{"product": "Apple", "quantity": 5},
{"product": "Banana", "quantity": 3}
]
}
Выходные сообщения:
{
"order_id": 12345,
"customer": "John Doe",
"product": "Apple",
"quantity": 5
}
{
"order_id": 12345,
"customer": "John Doe",
"product": "Banana",
"quantity": 3
}
Вложенные массивы
transformations:
- type: flatten
config:
field: orders.items
Входное сообщение:
{
"customer_id": 100,
"orders": {
"items": [
{"sku": "SKU001", "price": 10.99},
{"sku": "SKU002", "price": 5.99}
]
}
}
Выходные сообщения:
{
"customer_id": 100,
"orders": {},
"sku": "SKU001",
"price": 10.99
}
Комбинация с другими трансформациями
transformations:
- type: flatten
config:
field: rowsStock
- type: timestamp
config:
fieldName: created_at
Это создаст отдельное сообщение для каждого элемента массива, каждое с добавленной временной меткой.
Filter
Оставляет только те сообщения, у которых поле по указанному JSONPath существует и является «истинным» (булево true, непустая строка, ненулевое число). Остальные сообщения отбрасываются. Выражения сравнения (например ==) не поддерживаются; для маршрутизации по значению используйте Router.
Конфигурация
transformations:
- type: filter
config:
# JSONPath к полю; сообщение проходит, если поле есть и истинно (обязательно)
condition: "$.active"
JSONPath
Используется библиотека gjson: $.field, $.nested.field, $.array[0] и т.д.
Примеры
Простая фильтрация
transformations:
- type: filter
config:
condition: "$.active"
Входные сообщения:
{"id": 1, "active": true} // ✅ Проходит
{"id": 2, "active": false} // ❌ Отфильтровывается
{"id": 3} // ❌ Отфильтровывается (нет поля)
Фильтрация по значению
transformations:
- type: filter
config:
condition: "$.level"
Входные сообщения:
{"level": "error"} // ✅ Проходит (непустая строка)
{"level": "warning"} // ✅ Проходит
{"level": ""} // ❌ Отфильтровывается
{"level": null} // ❌ Отфильтровывается
Фильтрация по числовому значению
transformations:
- type: filter
config:
condition: "$.amount"
Входные сообщения:
{"amount": 100} // ✅ Проходит (не ноль)
{"amount": 0} // ❌ Отфильтровывается
{"amount": -5} // ✅ Проходит (не ноль)
Комплексная фильтрация
transformations:
- type: filter
config:
condition: "$.user.status"
Входное сообщение:
{
"user": {
"status": "active"
}
}
Результат: Сообщение проходит, если user.status существует и не пусто.
Mask
Маскирует чувствительные данные в указанных полях. Поддерживает сохранение длины или полное замещение символами.
Конфигурация
transformations:
- type: mask
config:
# Список JSONPath выражений к полям для маскирования (обязательно)
fields:
- password
- email
- creditCard
# Символ для маскирования (опционально, по умолчанию: *)
maskChar: "*"
# Сохранять оригинальную длину (опционально, по умолчанию: false)
keepLength: true
Примеры
Маскирование с сохранением длины
transformations:
- type: mask
config:
fields:
- password
- email
keepLength: true
Входное сообщение:
{
"id": 1,
"username": "john",
"password": "secret123",
"email": "john@example.com"
}
Выходное сообщение:
{
"id": 1,
"username": "john",
"password": "*********",
"email": "****************"
}
Маскирование с фиксированной длиной
transformations:
- type: mask
config:
fields:
- password
keepLength: false
maskChar: "X"
Входное сообщение:
{
"password": "verylongpassword123"
}
Выходное сообщение:
{
"password": "XXX"
}
Маскирование вложенных полей
transformations:
- type: mask
config:
fields:
- user.password
- payment.cardNumber
keepLength: true
Входное сообщение:
{
"user": {
"password": "secret"
},
"payment": {
"cardNumber": "1234567890123456"
}
}
Выходное сообщение:
{
"user": {
"password": "******"
},
"payment": {
"cardNumber": "****************"
}
}
Router
Маршрутизирует сообщения в разные приёмники по условиям. Первое совпавшее условие задаёт приёмник; если ни одно не подошло — используется основной sink.
Синтаксис условий
- Истинность:
$.field— совпадение, если поле есть и «истинно» (непустая строка, ненулевое число,true). - Сравнение:
$.field == 'value'или$.field == "value"— совпадение, когда значение поля равно заданной строке.
Условия проверяются по порядку; первое совпадение определяет маршрут.
Конфигурация
transformations:
- type: router
config:
routes:
- condition: "$.level == 'error'"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-topic
- condition: "$.level == 'warning'"
sink:
type: postgresql
config:
connectionString: "..."
table: warnings
Особенности
- Условия проверяются в порядке указания; первое совпадение определяет приёмник
- Если ни одно условие не совпало, сообщение идёт в основной sink
Примеры
Маршрутизация по уровню логирования
transformations:
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-logs
Входные сообщения:
{"level": "error", "message": "Critical error"} // → error-logs топик
{"level": "info", "message": "Info message"} // → основной приемник
{"level": "warning", "message": "Warning"} // → основной приемник
Множественная маршрутизация
transformations:
- type: router
config:
routes:
- condition: "$.type"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: events-topic
- condition: "$.priority"
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: high_priority_events
Входные сообщения:
{"type": "event", "data": "..."} // → events-topic
{"priority": "high", "data": "..."} // → high_priority_events таблица
{"data": "..."} // → основной приемник
Комбинация с другими трансформациями
transformations:
- type: timestamp
config:
fieldName: processed_at
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: errors
Сначала добавляется временная метка, затем сообщение маршрутизируется.
Select
Оставляет только указанные поля; остальные удаляются. Для каждого поля используется JSONPath; в выводе ключом становится последний сегмент пути (например, user.name → ключ name), поэтому результат плоский.
Конфигурация
transformations:
- type: select
config:
# Список JSONPath к полям, которые нужно оставить (обязательно)
fields:
- id
- name
- email
Примеры
Простой выбор полей
transformations:
- type: select
config:
fields:
- id
- name
- email
Входное сообщение:
{
"id": 1,
"name": "John Doe",
"email": "john@example.com",
"password": "secret",
"internal_id": 999
}
Выходное сообщение:
{
"id": 1,
"name": "John Doe",
"email": "john@example.com"
}
Выбор вложенных полей (результат — плоский)
transformations:
- type: select
config:
fields:
- user.id
- user.name
- metadata.timestamp
Входное сообщение:
{
"user": {
"id": 1,
"name": "John",
"email": "john@example.com"
},
"metadata": {
"timestamp": "2024-01-15T10:30:00Z",
"source": "api"
}
}
Выходное сообщение (ключи — последние сегменты путей):
{
"id": 1,
"name": "John",
"timestamp": "2024-01-15T10:30:00Z"
}
Remove
Удаляет указанные поля из сообщения. Полезно для очистки данных перед отправкой.
Конфигурация
transformations:
- type: remove
config:
# Список JSONPath выражений к полям для удаления (обязательно)
fields:
- password
- internal_id
- secret_token
Примеры
Удаление чувствительных полей
transformations:
- type: remove
config:
fields:
- password
- creditCard
- ssn
Входное сообщение:
{
"id": 1,
"name": "John Doe",
"password": "secret",
"creditCard": "1234-5678-9012-3456",
"ssn": "123-45-6789"
}
Выходное сообщение:
{
"id": 1,
"name": "John Doe"
}
Удаление вложенных полей
transformations:
- type: remove
config:
fields:
- user.password
- metadata.internal
Входное сообщение:
{
"user": {
"id": 1,
"name": "John",
"password": "secret"
},
"metadata": {
"timestamp": "2024-01-15",
"internal": "secret"
}
}
Выходное сообщение:
{
"user": {
"id": 1,
"name": "John"
},
"metadata": {
"timestamp": "2024-01-15"
}
}
Порядок применения
Трансформации применяются последовательно в порядке, указанном в списке transformations. Каждая трансформация получает результат предыдущей.
Пример последовательности
transformations:
# 1. Развернуть массив
- type: flatten
config:
field: items
# 2. Добавить временную метку
- type: timestamp
config:
fieldName: created_at
# 3. Отфильтровать неактивные
- type: filter
config:
condition: "$.active"
# 4. Удалить внутренние поля
- type: remove
config:
fields:
- internal_id
- debug_info
# 5. Выбрать только нужные поля
- type: select
config:
fields:
- id
- name
- created_at
Рекомендации по порядку
- Flatten должен быть первым, если нужно развернуть массивы
- Filter применяйте рано, чтобы уменьшить объем обрабатываемых данных
- SnakeCase/CamelCase применяйте после Select/Remove, но перед отправкой в приемник
- Mask/Remove применяйте перед Select для безопасности
- Select применяйте в конце для финальной очистки
- Timestamp можно применять в любом месте, но обычно в начале или конце
- Router обычно применяется в конце, после всех других трансформаций
Комбинированные примеры
Обработка заказов
transformations:
# Развернуть товары в отдельные сообщения
- type: flatten
config:
field: items
# Добавить временную метку
- type: timestamp
config:
fieldName: processed_at
# Фильтровать только оплаченные заказы
- type: filter
config:
condition: "$.status"
# Удалить чувствительные данные
- type: remove
config:
fields:
- customer.creditCard
- customer.cvv
Обработка логов
transformations:
# Добавить временную метку
- type: timestamp
config:
fieldName: timestamp
# Маскировать IP адреса
- type: mask
config:
fields:
- ip_address
keepLength: true
# Маршрутизировать ошибки
- type: router
config:
routes:
- condition: "$.level"
sink:
type: kafka
config:
brokers: ["localhost:9092"]
topic: error-logs
Нормализация имен полей
transformations:
# Выбрать нужные поля
- type: select
config:
fields:
- firstName
- lastName
- email
- address
# Преобразовать в snake_case для PostgreSQL
- type: snakeCase
config:
deep: true
Входное сообщение:
{
"firstName": "John",
"lastName": "Doe",
"email": "john@example.com",
"address": {
"streetName": "Main St",
"zipCode": "12345"
}
}
Выходное сообщение:
{
"first_name": "John",
"last_name": "Doe",
"email": "john@example.com",
"address": {
"street_name": "Main St",
"zip_code": "12345"
}
}
JSONPath поддержка
Все трансформации, работающие с полями, поддерживают JSONPath синтаксис:
$.field- корневое поле$.nested.field- вложенное поле$.array[0]- элемент массива по индексу$.array[*]- все элементы массива$.*- все поля корневого уровня
Производительность
- Filter - применяйте рано для уменьшения объема данных
- Select - уменьшает размер сообщений и повышает производительность
- Flatten - может увеличить количество сообщений, используйте осторожно
- Router - создает дополнительные подключения, минимизируйте количество маршрутов
SnakeCase
Преобразует все ключи JSON-объекта в формат snake_case. Полезно для нормализации имен полей при интеграции с системами, использующими snake_case (например, PostgreSQL, Python API).
Конфигурация
transformations:
- type: snakeCase
config:
# Рекурсивно преобразовывать вложенные объекты (опционально, по умолчанию: false)
deep: true
Примеры
Простое преобразование
transformations:
- type: snakeCase
config:
deep: false
Входное сообщение:
{
"firstName": "John",
"lastName": "Doe",
"userName": "johndoe",
"isActive": true,
"itemCount": 42
}
Выходное сообщение:
{
"first_name": "John",
"last_name": "Doe",
"user_name": "johndoe",
"is_active": true,
"item_count": 42
}
Рекурсивное преобразование
transformations:
- type: snakeCase
config:
deep: true
Входное сообщение:
{
"firstName": "John",
"address": {
"streetName": "Main St",
"houseNumber": 123,
"zipCode": "12345"
},
"items": [
{
"itemName": "Product",
"itemPrice": 99.99
}
]
}
Выходное сообщение:
{
"first_name": "John",
"address": {
"street_name": "Main St",
"house_number": 123,
"zip_code": "12345"
},
"items": [
{
"item_name": "Product",
"item_price": 99.99
}
]
}
Преобразование PascalCase
transformations:
- type: snakeCase
config:
deep: false
Входное сообщение:
{
"FirstName": "John",
"LastName": "Doe",
"UserID": 123
}
Выходное сообщение:
{
"first_name": "John",
"last_name": "Doe",
"user_id": 123
}
Особенности
- Преобразует
camelCase→snake_case - Преобразует
PascalCase→snake_case - Обрабатывает последовательные заглавные буквы (например,
XMLHttpRequest→xml_http_request) - Сохраняет уже существующие ключи в
snake_caseбез изменений - При
deep: falseпреобразует только ключи верхнего уровня - При
deep: trueрекурсивно преобразует все вложенные объекты и массивы
CamelCase
Преобразует все ключи JSON-объекта в формат CamelCase (PascalCase). Полезно для нормализации имен полей при интеграции с системами, использующими CamelCase (например, Java, C# API).
Конфигурация
transformations:
- type: camelCase
config:
# Рекурсивно преобразовывать вложенные объекты (опционально, по умолчанию: false)
deep: true
Примеры
Простое преобразование
transformations:
- type: camelCase
config:
deep: false
Входное сообщение:
{
"first_name": "John",
"last_name": "Doe",
"user_name": "johndoe",
"is_active": true,
"item_count": 42
}
Выходное сообщение:
{
"FirstName": "John",
"LastName": "Doe",
"UserName": "johndoe",
"IsActive": true,
"ItemCount": 42
}
Рекурсивное преобразование
transformations:
- type: camelCase
config:
deep: true
Входное сообщение:
{
"first_name": "John",
"address": {
"street_name": "Main St",
"house_number": 123,
"zip_code": "12345"
},
"items": [
{
"item_name": "Product",
"item_price": 99.99
}
]
}
Выходное сообщение:
{
"FirstName": "John",
"Address": {
"StreetName": "Main St",
"HouseNumber": 123,
"ZipCode": "12345"
},
"Items": [
{
"ItemName": "Product",
"ItemPrice": 99.99
}
]
}
Преобразование одиночных слов
transformations:
- type: camelCase
config:
deep: false
Входное сообщение:
{
"name": "John",
"id": 123
}
Выходное сообщение:
{
"Name": "John",
"Id": 123
}
Особенности
- Преобразует
snake_case→CamelCase - Все слова начинаются с заглавной буквы (PascalCase)
- Сохраняет уже существующие ключи в
CamelCaseбез изменений - При
deep: falseпреобразует только ключи верхнего уровня - При
deep: trueрекурсивно преобразует все вложенные объекты и массивы
Планируемые трансформации
В API (CRD) пока недоступны:
- ReplaceField — переименование полей (например,
старый.путь→новый.путь) - HeaderFrom — копирование заголовков Kafka в тело сообщения
Актуальные возможности см. в Коннекторы и Примеры.
Ограничения
- Filter: проверяется только существование поля и его «истинность»; сравнения (например
==) не поддерживаются — для маршрутизации по значению используйте Router. - Router: условия проверяются по порядку; первое совпадение задаёт маршрут. Поддерживаются форматы
$.field(истинность) и$.field == 'value'. - Flatten: работает только с массивами (в т.ч. в обёртке Avro с ключом
array), не с произвольными объектами. - Select: результат всегда плоский; ключом становится последний сегмент JSONPath.
- SnakeCase и CamelCase: работают только с валидным JSON; бинарные данные возвращаются без изменений.