Обработка ошибок
DataFlow Operator позволяет отправлять сообщения, которые не удалось записать в основной приёмник (sink), в отдельный error sink. Это не останавливает основной пайплайн и даёт место для разбора, повторной обработки или архивации неудачных сообщений.
Обзор
Секция errors в спецификации DataFlow задаёт приёмник для ошибок. Когда сообщение не удаётся записать в основной sink (например, ошибка соединения, валидации или нарушения ограничения), оно записывается в error sink. Для error sink можно использовать те же типы коннекторов, что и для основного sink (Kafka, PostgreSQL, ClickHouse, Trino, Nessie).
Когда использовать
Используйте error sink, когда нужно не терять неудачные сообщения и иметь возможность обработать или проанализировать их позже.
Конфигурация
Добавьте блок errors в спецификацию DataFlow с полем type и конфигурацией выбранного коннектора.
Kafka как приёмник ошибок
spec:
source:
type: kafka
config:
brokers:
- localhost:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://..."
table: output_table
errors:
type: kafka
config:
brokers:
- localhost:9092
topic: error-topic
Для error sink можно использовать те же опции Kafka, что и для основного sink (brokersSecretRef, topicSecretRef, sasl, tls). Подробнее в Коннекторы.
PostgreSQL как приёмник ошибок
errors:
type: postgresql
config:
connectionString: "postgres://..."
table: error_messages
autoCreateTable: true
Поддерживаются те же опции, что и для основного PostgreSQL sink (connectionStringSecretRef, tableSecretRef, batchSize и др.).
Структура сообщения об ошибке
Каждая запись, отправляемая в error sink, имеет следующую структуру:
| Поле | Описание |
|---|---|
error |
Объект с данными об ошибке |
error.message |
Текст ошибки (например, connection refused, нарушение ограничения) |
error.timestamp |
Время ошибки в формате ISO 8601 |
error.original_sink |
Тип коннектора основного sink (например, postgresql, kafka) |
error.metadata |
Опциональные метаданные исходного сообщения |
original_message |
Исходное тело сообщения (объект для JSON или строка в original_data) |
Пример:
{
"error": {
"message": "failed to send message: connection refused",
"timestamp": "2026-01-24T12:34:56Z",
"original_sink": "postgresql"
},
"original_message": {
"id": 1,
"name": "test",
"value": 100
}
}
Типы ошибок
Ошибки классифицируются по типу для метрик. Метка error_type в dataflow_connector_errors_total, dataflow_transformer_errors_total и dataflow_task_stage_errors_total может принимать следующие значения:
| Тип | Описание |
|---|---|
context_canceled |
Операция отменена (context.Canceled) |
timeout |
context.DeadlineExceeded или текст сообщения содержит "timeout", "deadline exceeded", "i/o timeout" |
connection_error |
Connection refused, not connected, failed to connect или connection failure |
constraint_violation |
Нарушение ограничения целостности PostgreSQL (SQLSTATE класс 23xx) |
invalid_data |
Ошибка парсинга JSON, схемы, валидации или синтаксиса |
transient |
Временные ошибки Trino (TOO_MANY_REQUESTS_FAILED, перегрузка worker, подсказки retry) |
auth_error |
Ошибка аутентификации, SASL или авторизации |
unknown |
Ошибка не удалось классифицировать |
Метрики
Обработка ошибок отражается в метриках оператора:
- Ошибки коннекторов:
dataflow_connector_errors_total(метки:namespace,name,connector_type,connector_name,operation,error_type) - Ошибки трансформеров:
dataflow_transformer_errors_total(метки:namespace,name,transformer_type,transformer_index,error_type) - Ошибки этапов задачи:
dataflow_task_stage_errors_total(метки:namespace,name,stage,error_type) - Этапы задачи: в
dataflow_task_stage_duration_secondsприсутствует этапerror_sink_write, если настроен error sink - Доля успешных задач:
dataflow_task_success_rate(0.0–1.0) для мониторинга состояния пайплайна
Подробнее см. Метрики.
Пример манифеста
Полный пример с приёмником ошибок есть в репозитории:
kubectl apply -f dataflow/config/samples/kafka-to-postgres-with-errors.yaml
Дополнительный контекст — в разделе Примеры.