Skip to content

Development

Руководство для разработчиков, желающих внести вклад в DataFlow Operator или настроить локальную среду разработки.

Предварительные требования

  • Go 1.21 или выше
  • Docker и Docker Compose
  • kubectl настроен для работы с кластером
  • Helm 3.0+ (для тестирования установки)
  • Task (опционально, для использования Taskfile)

Настройка окружения

Клонирование репозитория

git clone <repository-url>
cd dataflow

Установка зависимостей

go mod download
go mod tidy

Установка инструментов разработки

# Установка controller-gen
task controller-gen

# Установка envtest
task envtest

Локальная разработка

Запуск зависимостей

Запустите все необходимые сервисы через docker-compose:

docker-compose up -d

Это запустит: - Kafka (порт 9092) с Kafka UI (порт 8080) - PostgreSQL (порт 5432) с pgAdmin (порт 5050)

Запуск оператора локально

# Генерация кода и манифестов
task generate
task manifests

# Установка CRD в кластер (если используете kind/minikube)
task install

# Запуск оператора
task run

Или используйте скрипт:

./scripts/run-local.sh

Настройка kind кластера

Для полноценного тестирования используйте kind:

# Создать kind кластер
./scripts/setup-kind.sh

# Установить CRD
task install

# Запустить оператор локально
task run

Структура проекта

dataflow/
├── api/v1/                    # CRD определения
│   ├── dataflow_types.go      # Типы DataFlow ресурсов
│   └── groupversion_info.go    # Версия API
├── internal/
│   ├── connectors/            # Коннекторы для источников/приемников
│   │   ├── interface.go       # Интерфейсы коннекторов
│   │   ├── factory.go         # Фабрика коннекторов
│   │   ├── kafka.go           # Kafka коннектор
│   │   ├── postgresql.go      # PostgreSQL коннектор
│   ├── transformers/          # Трансформации сообщений
│   │   ├── interface.go       # Интерфейс трансформаций
│   │   ├── factory.go         # Фабрика трансформаций
│   │   ├── timestamp.go        # Timestamp трансформация
│   │   ├── flatten.go         # Flatten трансформация
│   │   ├── filter.go          # Filter трансформация
│   │   ├── mask.go            # Mask трансформация
│   │   ├── router.go          # Router трансформация
│   │   ├── select.go          # Select трансформация
│   │   └── remove.go          # Remove трансформация
│   ├── processor/             # Процессор сообщений
│   │   └── processor.go       # Оркестрация обработки
│   ├── controller/            # Kubernetes контроллер
│   │   └── dataflow_controller.go
│   └── types/                 # Внутренние типы
│       └── message.go         # Тип Message
├── config/                    # Конфигурация Kubernetes
│   ├── crd/                   # CRD манифесты
│   ├── rbac/                  # RBAC манифесты
│   └── samples/              # Примеры DataFlow ресурсов
├── helm/                      # Helm Chart
│   └── dataflow-operator/
├── docs/                      # Документация MkDocs
├── test/                      # Тесты
│   └── fixtures/             # Тестовые данные
├── scripts/                   # Вспомогательные скрипты
├── main.go                    # Точка входа
├── Taskfile                   # Команды сборки
└── go.mod                     # Зависимости Go

Генерация кода

Генерация CRD и RBAC

task manifests

Эта команда генерирует: - CRD манифесты в dataflow/config/crd/bases/

Генерация DeepCopy методов

task generate

Генерирует методы DeepCopy для всех типов в api/v1/.

Обновление controller-gen

Если возникают проблемы с генерацией:

# Обновить controller-gen
go install sigs.k8s.io/controller-tools/cmd/controller-gen@latest

# Затем
task generate
task manifests

Логирование

Структурированные поля

В логах оператора и процессора используются единые поля для корреляции:

Поле Где Назначение
dataflow_name Оператор, процессор, коннекторы Имя ресурса DataFlow
dataflow_namespace Оператор, процессор, коннекторы Namespace DataFlow
reconcile_id Оператор Короткий идентификатор одного цикла реконсиляции (8 hex-символов)
connector_type Процессор, коннекторы Тип коннектора (например kafka-source, trino-sink)
message_id Процессор, коннекторы Идентификатор сообщения из метаданных (если есть) или partition/offset для Kafka

По этим полям можно фильтровать логи в агрегаторе (например по dataflow_name и reconcile_id) и связывать ошибки с конкретным DataFlow и сообщением.

Уровень лога (LOG_LEVEL)

Оператор и процессор читают переменную окружения LOG_LEVEL. Допустимые значения (без учёта регистра): debug, info, warn, error.

  • В проде: задайте LOG_LEVEL=info (или не задавайте — по умолчанию в Helm используется info), чтобы уменьшить объём логов.
  • При отладке: задайте LOG_LEVEL=debug для более детального вывода.

В Helm для оператора уровень задаётся через value logLevel (по умолчанию "info"); он подставляется в env LOG_LEVEL в поде оператора.

PROCESSOR_LOG_LEVEL

Оператор читает переменную окружения PROCESSOR_LOG_LEVEL и передаёт её в каждый под процессора как LOG_LEVEL. Поды процессора — это рабочие нагрузки, создаваемые для каждого DataFlow; в них выполняются сами пайплайны.

Аспект Описание
По умолчанию info (если переменная не задана, поды процессора получают LOG_LEVEL=info)
Допустимые значения Те же, что у LOG_LEVEL: debug, info, warn, error (регистр не важен)
Где задавать В Deployment оператора (не в ресурсе DataFlow). Оператор подставляет это значение в env LOG_LEVEL каждого пода процессора. При установке через Helm задаётся value processorLogLevel (см. ниже).

Через Helm: используйте value processorLogLevel (по умолчанию "info"). Пример для детальных логов процессоров:

helm upgrade dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator \
  --set processorLogLevel=debug \
  --reuse-values

Или в values.yaml:

processorLogLevel: "debug"   # поды процессора получают LOG_LEVEL=debug

Без Helm: задайте переменную окружения в Deployment оператора, например:

kubectl set env deployment/dataflow-operator PROCESSOR_LOG_LEVEL=debug -n <namespace-оператора>

После этого перезапустите или пересоздайте оператор, чтобы поды процессора пересоздались с новым уровнем.

Настройка Validating Webhook

Validating Webhook проверяет spec ресурса DataFlow при создании и обновлении и отклоняет невалидные объекты до записи в кластер. Это даёт немедленную обратную связь (ошибка при kubectl apply или в GUI) и не создаёт лишние объекты и поды с заведомо нерабочей конфигурацией. Подробнее о роли webhook в архитектуре см. Admission Webhook (Validating).

Включение через Helm

Webhook опционален и по умолчанию выключен. Чтобы включить его:

  1. Включите webhook и задайте CA для TLS: в values.yaml или через --set:
  2. webhook.enabled: true
  3. webhook.caBundle — строка в base64 (PEM-сертификат CA, которым подписан сертификат оператора на порту 9443). Без неё ValidatingWebhookConfiguration не создаётся, так как API-сервер требует caBundle для вызова webhook по HTTPS.

  4. Настройте TLS для оператора: API-сервер подключается к оператору по HTTPS. Задайте:

  5. webhook.certDir — путь в контейнере, куда смонтированы сертификаты (например /tmp/k8s-webhook-server/serving-certs).
  6. webhook.secretName — имя Secret с ключами tls.crt и tls.key (и при необходимости ca.crt). Этот Secret монтируется в под оператора по пути webhook.certDir; переменная окружения WEBHOOK_CERT_DIR в поде устанавливается в это значение.

Секрет с сертификатами можно создать вручную или через cert-manager (Certificate для сервиса оператора). CA из этого сертификата (или из выдавшего его CA) нужно подставить в webhook.caBundle в base64.

Пример фрагмента values.yaml при использовании cert-manager и уже известного caBundle:

webhook:
  enabled: true
  caBundle: "LS0tLS1CRUdJTi..."   # base64 PEM CA
  certDir: /tmp/k8s-webhook-server/serving-certs
  secretName: dataflow-operator-webhook-cert

После установки/обновления чарта с этими значениями создаётся ресурс ValidatingWebhookConfiguration; при следующем create/update DataFlow API-сервер будет вызывать оператор для валидации.

Что проверяет webhook

  • Обязательные поля: spec.source, spec.sink, типы source/sink из списка (kafka, postgresql, trino), наличие соответствующей конфигурации (например source.config при source.type: kafka).
  • Для каждого типа source/sink — обязательные поля или SecretRef (например для Kafka: brokers или brokersSecretRef, topic или topicSecretRef).
  • Список трансформаций: допустимые типы и наличие конфигурации для каждого типа; для router — валидация вложенных sink.
  • Опционально: spec.errors (если задан — как SinkSpec), SecretRef (name и key), неотрицательные ресурсы.

При ошибке валидации API возвращает ответ с перечислением полей и сообщений (агрегат из field.ErrorList).

Тестирование

Unit тесты

# Запустить все unit тесты
task test-unit

# Запустить тесты с покрытием
task test

# Запустить тесты конкретного пакета
go test ./internal/connectors/... -v

# Запустить тесты с покрытием для конкретного пакета
go test ./internal/transformers/... -coverprofile=coverage.out
go tool cover -html=coverage.out

Интеграционные тесты

# Настроить kind кластер
./scripts/setup-kind.sh

# Запустить интеграционные тесты
task test-integration

Запуск тестов вручную

# Unit тесты без envtest
go test ./... -v

# Тесты с envtest (требует kubebuilder)
KUBEBUILDER_ASSETS="$(task envtest use 1.28.0 -p path)" go test ./... -coverprofile cover.out

Сборка

Локальная сборка

# Сборка бинарного файла
task build

# Бинарный файл будет в bin/manager
./bin/manager

Сборка Docker образа

# Сборка образа
task docker-build IMG=your-registry/dataflow-operator:v1.0.0

# Отправка образа
task docker-push IMG=your-registry/dataflow-operator:v1.0.0

Или вручную. Если репозиторий — монорепо (есть папки dataflow и dataflow-web), сборка из корня репозитория:

docker build -f dataflow/Dockerfile -t your-registry/dataflow-operator:v1.0.0 .
docker push your-registry/dataflow-operator:v1.0.0

Если вы находитесь в каталоге dataflow и контекст сборки — только он (без dataflow-web), используйте прежний вариант: docker build -t ... .

Добавление нового коннектора

Подробное руководство: см. Разработка коннекторов с baseConnector — пошаговая инструкция с примерами использования baseConnector и baseConnectorRWMutex.

1. Определение типов в API

Добавьте спецификацию в api/v1/dataflow_types.go:

// NewConnectorSourceSpec defines new connector source configuration
type NewConnectorSourceSpec struct {
    // Configuration fields
    Endpoint string `json:"endpoint"`
    // ...
}

// Добавьте в SourceSpec
type SourceSpec struct {
    // ...
    NewConnector *NewConnectorSourceSpec `json:"newConnector,omitempty"`
}

2. Реализация коннектора

Создайте файл internal/connectors/newconnector.go. Рекомендуется встраивать baseConnector для общей синхронизации Connect/Close:

package connectors

import (
    "context"
    "fmt"
    v1 "github.com/dataflow-operator/dataflow/api/v1"
    "github.com/dataflow-operator/dataflow/internal/types"
    "github.com/go-logr/logr"
)

type NewConnectorSourceConnector struct {
    baseConnector
    config *v1.NewConnectorSourceSpec
    conn   *Connection
    logger logr.Logger
}

func NewNewConnectorSourceConnector(config *v1.NewConnectorSourceSpec) *NewConnectorSourceConnector {
    return &NewConnectorSourceConnector{config: config, logger: logr.Discard()}
}

func (n *NewConnectorSourceConnector) Connect(ctx context.Context) error {
    if !n.guardConnect() {
        return fmt.Errorf("connector is closed")
    }
    defer n.Unlock()
    // ... connection logic
    return nil
}

func (n *NewConnectorSourceConnector) Read(ctx context.Context) (<-chan *types.Message, error) {
    // Implement read logic
    return nil, nil
}

func (n *NewConnectorSourceConnector) Close() error {
    if n.guardClose() {
        return nil
    }
    defer n.Unlock()
    // ... close logic
    return nil
}

3. Регистрация в фабрике

Добавьте в internal/connectors/factory.go:

func CreateSourceConnector(source *v1.SourceSpec) (SourceConnector, error) {
    switch source.Type {
    // ...
    case "newconnector":
        if source.NewConnector == nil {
            return nil, fmt.Errorf("newconnector source configuration is required")
        }
        return NewNewConnectorSourceConnector(source.NewConnector), nil
    // ...
    }
}

4. Генерация кода

task generate
task manifests

5. Тестирование

Создайте тесты в internal/connectors/newconnector_test.go:

func TestNewConnectorSourceConnector(t *testing.T) {
    // Test implementation
}

Добавление новой трансформации

1. Определение типов в API

Добавьте в api/v1/dataflow_types.go:

// NewTransformation defines new transformation configuration
type NewTransformation struct {
    Field string `json:"field"`
    // ...
}

// Добавьте в TransformationSpec
type TransformationSpec struct {
    // ...
    NewTransformation *NewTransformation `json:"newTransformation,omitempty"`
}

2. Реализация трансформации

Создайте файл internal/transformers/newtransformation.go:

package transformers

import (
    "context"
    v1 "github.com/dataflow-operator/dataflow/api/v1"
    "github.com/dataflow-operator/dataflow/internal/types"
)

type NewTransformer struct {
    config *v1.NewTransformation
}

func NewNewTransformer(config *v1.NewTransformation) *NewTransformer {
    return &NewTransformer{config: config}
}

func (n *NewTransformer) Transform(ctx context.Context, message *types.Message) ([]*types.Message, error) {
    // Implement transformation logic
    return []*types.Message{message}, nil
}

3. Регистрация в фабрике

Добавьте в internal/transformers/factory.go:

func CreateTransformer(transformation *v1.TransformationSpec) (Transformer, error) {
    switch transformation.Type {
    // ...
    case "newtransformation":
        if transformation.NewTransformation == nil {
            return nil, fmt.Errorf("newtransformation configuration is required")
        }
        return NewNewTransformer(transformation.NewTransformation), nil
    // ...
    }
}

4. Генерация и тестирование

task generate
task test

Отладка

Логирование

Используйте структурированное логирование:

import "github.com/go-logr/logr"

logger.Info("Processing message", "messageId", msg.ID)
logger.Error(err, "Failed to process", "messageId", msg.ID)
logger.V(1).Info("Debug information", "data", data)

Отладка контроллера

# Запустить с детальным логированием
go run ./main.go --zap-log-level=debug

Отладка коннекторов

Добавьте логирование в методы коннекторов:

func (k *KafkaSourceConnector) Read(ctx context.Context) (<-chan *types.Message, error) {
    logger.Info("Starting to read from Kafka", "topic", k.config.Topic)
    // ...
}

Форматирование и линтинг

Форматирование кода

task fmt

Или вручную:

go fmt ./...

Проверка кода

task vet

Или вручную:

go vet ./...

Линтинг (опционально)

Установите golangci-lint:

go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest

Запустите:

golangci-lint run

CI/CD

GitHub Actions

Пример workflow для CI:

name: CI

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v4
        with:
          go-version: '1.21'
      - run: go mod download
      - run: task test-unit
      - run: task vet
      - run: task fmt

Сборка документации

Документация собирается с помощью MkDocs и темы Material. Диаграммы строятся на Mermaid через плагин mkdocs-mermaid2-plugin.

Установка зависимостей (из корня репозитория):

pip install -r docs/requirements.txt

Или вручную:

pip install "mkdocs-material[imaging]" pymdown-extensions mkdocs-mermaid2-plugin

Сборка и просмотр (из корня репозитория):

cd docs && mkdocs build
# или локальный сервер:
cd docs && mkdocs serve

При использовании mkdocs serve откройте в браузере http://127.0.0.1:8000.

Вклад в проект

Процесс разработки

  1. Создайте issue для обсуждения изменений
  2. Создайте feature branch: git checkout -b feature/new-feature
  3. Внесите изменения и добавьте тесты
  4. Убедитесь, что все тесты проходят: task test
  5. Отформатируйте код: task fmt
  6. Создайте Pull Request

Стандарты кода

  • Следуйте Go code review comments
  • Добавляйте комментарии для публичных функций
  • Пишите тесты для нового функционала
  • Обновляйте документацию при необходимости

Коммиты

Используйте понятные сообщения коммитов:

feat: add new connector for Redis
fix: handle connection errors in Kafka connector
docs: update getting started guide
test: add tests for filter transformation

Полезные команды

# Просмотр всех доступных команд
task help

# Очистка сгенерированных файлов
task clean

# Обновление зависимостей
go mod tidy

# Просмотр зависимостей
go list -m all

# Проверка устаревших зависимостей
go list -u -m all

Ресурсы

Получение помощи

  • Создайте issue в репозитории
  • Проверьте существующие issues и PR
  • Изучите примеры в dataflow/config/samples/