Getting Started
Это руководство поможет вам начать работу с DataFlow Operator. Вы узнаете, как установить оператор, создать первый поток данных и настроить локальную среду разработки.
Предварительные требования
Для production установки
- Kubernetes кластер (версия 1.24+)
- Helm 3.0+
- kubectl настроен для работы с кластером
- Доступ к источникам данных (Kafka, PostgreSQL)
Для локальной разработки
- Go 1.21+
- Docker и Docker Compose
- Task (опционально, для использования команд Taskfile)
- Доступ к портам: 8080, 5050, 15672, 8081, 5432, 9092, 5672
Установка
Управление CRD
CRD (Custom Resource Definition) DataFlow определяет тип ресурса DataFlow в Kubernetes.
Автоматическая установка (через Helm)
При установке через Helm (рекомендуемый способ) CRD устанавливается и обновляется автоматически. Отдельный шаг kubectl apply не нужен — чарт управляет жизненным циклом CRD при crds.install: true (значение по умолчанию).
Ручная установка
Если вы управляете CRD отдельно (например, через ArgoCD, FluxCD или с crds.install: false в Helm values), установите CRD вручную:
kubectl apply -f https://raw.githubusercontent.com/dataflow-operator/dataflow/refs/heads/main/config/crd/bases/dataflow.dataflow.io_dataflows.yaml
Или из локального файла:
kubectl apply -f dataflow/config/crd/bases/dataflow.dataflow.io_dataflows.yaml
Конфигурация CRD в Helm
| Параметр | По умолчанию | Описание |
|---|---|---|
crds.install |
true |
Устанавливать и обновлять CRD при helm install / helm upgrade |
crds.keep |
true |
Добавить аннотацию helm.sh/resource-policy: keep для защиты CRD от удаления при helm uninstall |
Поведение при обновлении: CRD обновляется при каждом helm upgrade, изменения схемы применяются автоматически.
Поведение при удалении: При crds.keep: true (по умолчанию) CRD остаётся в кластере после helm uninstall. Это защищает от случайного удаления всех ресурсов DataFlow. Чтобы отключить установку CRD через Helm:
crds:
install: false
Установка через Helm (рекомендуется)
Базовая установка
Самый простой способ установить оператор из OCI registry:
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator
Эта команда установит оператор с настройками по умолчанию в namespace default.
Установка в конкретный namespace
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator \
--namespace dataflow-system \
--create-namespace
Примечание: Для локальной разработки вы также можете использовать локальный chart:
helm install dataflow-operator ./helm-charts/dataflow-operator
Установка с кастомными настройками
Вы можете переопределить значения по умолчанию через флаги:
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator \
--set image.repository=your-registry/controller \
--set image.tag=v1.0.0 \
--set replicaCount=2 \
--set resources.limits.memory=1Gi \
--set resources.limits.cpu=500m \
--set resources.requests.memory=256Mi \
--set resources.requests.cpu=100m
Установка с файлом values
Для более сложных конфигураций создайте файл my-values.yaml:
image:
repository: your-registry/controller
tag: v1.0.0
replicaCount: 2
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 500m
memory: 512Mi
# Настройки для работы с Kubernetes API
serviceAccount:
create: true
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/dataflow-operator
# Настройки безопасности
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
# Опционально: Sentry для мониторинга ошибок и трейсинга
# sentry:
# enabled: true
# dsn: "https://xxx@o0.ingest.sentry.io/123"
# environment: production
# tracesSampleRate: 0.1
Затем установите:
helm install dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator -f my-values.yaml
Проверка установки
После установки проверьте статус:
# Проверьте статус подов
kubectl get pods -l app.kubernetes.io/name=dataflow-operator
# Проверьте CRD
kubectl get crd dataflows.dataflow.dataflow.io
# Проверьте логи оператора
kubectl logs -l app.kubernetes.io/name=dataflow-operator --tail=50
# Проверьте статус deployment
kubectl get deployment dataflow-operator
Ожидаемый вывод:
NAME READY STATUS RESTARTS AGE
dataflow-operator-7d8f9c4b5d-xxxxx 1/1 Running 0 1m
Обновление
Для обновления оператора до новой версии:
helm upgrade dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator
С кастомными значениями:
helm upgrade dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator -f my-values.yaml
Для обновления до конкретной версии:
helm upgrade dataflow-operator oci://ghcr.io/dataflow-operator/helm-charts/dataflow-operator \
--set image.tag=v1.1.0
Удаление
Для удаления оператора:
helm uninstall dataflow-operator
Поведение CRD при удалении: При crds.keep: true (по умолчанию) CRD остаётся в кластере после helm uninstall. Существующие ресурсы DataFlow сохраняются, но перестают обрабатываться.
Для полного удаления CRD и всех ресурсов DataFlow в кластере:
# Сначала удалите все DataFlow ресурсы
kubectl delete dataflow --all --all-namespaces
# Затем удалите оператор
helm uninstall dataflow-operator
# Наконец, удалите CRD (только если crds.keep был true)
kubectl delete crd dataflows.dataflow.dataflow.io
Warning
Удаление CRD удаляет все ресурсы DataFlow во всех namespace кластера. Убедитесь, что это намеренное действие.
Первый DataFlow
Простой пример: Kafka → PostgreSQL
Создайте простой DataFlow ресурс для передачи данных из Kafka в PostgreSQL:
apiVersion: dataflow.dataflow.io/v1
kind: DataFlow
metadata:
name: kafka-to-postgres
namespace: default
spec:
source:
type: kafka
config:
brokers:
- kafka-broker:9092
topic: input-topic
consumerGroup: dataflow-group
sink:
type: postgresql
config:
connectionString: "postgres://user:password@postgres-host:5432/dbname?sslmode=disable"
table: output_table
autoCreateTable: true
Примените ресурс:
kubectl apply -f dataflow/config/samples/kafka-to-postgres.yaml
Примечание: Каждый ресурс DataFlow создает отдельный под (Deployment) для обработки данных. Вы можете настроить ресурсы, выбор нод, affinity и tolerations. Подробнее см. Примеры.
Пример с Kubernetes Secrets
Для безопасного хранения credentials используйте Kubernetes Secrets. См. пример:
kubectl apply -f dataflow/config/samples/kafka-to-postgres-secrets.yaml
Этот пример демонстрирует использование SecretRef для конфигурации коннекторов. Подробнее см. раздел Использование Secrets в Kubernetes в документации по коннекторам.
Проверка статуса
Проверьте статус созданного потока данных:
# Получить информацию о DataFlow
kubectl get dataflow kafka-to-postgres
# Детальная информация
kubectl describe dataflow kafka-to-postgres
# Посмотреть статус в формате YAML
kubectl get dataflow kafka-to-postgres -o yaml
Ожидаемый статус:
status:
phase: Running
processedCount: 150
errorCount: 0
lastProcessedTime: "2024-01-15T10:30:00Z"
message: "Processing messages successfully"
Отправка тестового сообщения
Для тестирования потока данных отправьте сообщение в Kafka топик:
# Используя kafka-console-producer
kafka-console-producer --broker-list localhost:9092 --topic input-topic
# Введите JSON сообщение и нажмите Enter
{"id": 1, "name": "Test", "value": 100}
Или используйте скрипт из проекта:
./scripts/send-test-message.sh
Проверка данных в PostgreSQL
Подключитесь к PostgreSQL и проверьте данные:
psql postgres://user:password@postgres-host:5432/dbname
# Проверьте таблицу
SELECT * FROM output_table;
Локальная разработка
Запуск зависимостей
Используйте docker-compose для запуска всех зависимостей локально:
docker-compose up -d
Эта команда запустит:
- Kafka (порт 9092) с Kafka UI (порт 8080)
- PostgreSQL (порт 5432) с pgAdmin (порт 5050)
Доступ к UI интерфейсам
После запуска доступны следующие UI:
- Kafka UI: http://localhost:8080
- Просмотр топиков, сообщений, consumer groups
- pgAdmin: http://localhost:5050
- Логин:
admin@admin.com, пароль:admin - Управление PostgreSQL базами данных
Запуск оператора локально
Для разработки запустите оператор локально:
# Установите CRD в кластер (если используете kind или minikube)
task install
# Запустите оператор
task run
Или используйте скрипт:
./scripts/run-local.sh
Настройка локального кластера (опционально)
Для полноценного тестирования используйте kind (Kubernetes in Docker):
# Создать kind кластер
./scripts/setup-kind.sh
# Установить CRD
task install
# Запустить оператор локально
task run
Отладка
Для отладки используйте логи оператора:
# Если оператор запущен локально, логи выводятся в консоль
# Для оператора в кластере:
kubectl logs -l app.kubernetes.io/name=dataflow-operator -f
Проверьте события Kubernetes:
kubectl get events --sort-by='.lastTimestamp' | grep dataflow
Следующие шаги
Теперь, когда вы установили оператор и создали первый поток данных:
- Изучите Connectors для понимания всех доступных источников и приемников
- Ознакомьтесь с Transformations для работы с трансформациями сообщений
- Посмотрите Examples для практических примеров использования
- Прочитайте Development для участия в разработке
Устранение неполадок
Оператор не запускается
# Проверьте логи
kubectl logs -l app.kubernetes.io/name=dataflow-operator
# Проверьте события
kubectl describe pod -l app.kubernetes.io/name=dataflow-operator
# Проверьте CRD
kubectl get crd dataflows.dataflow.dataflow.io -o yaml
DataFlow не обрабатывает сообщения
-
Проверьте статус DataFlow:
kubectl describe dataflow <name> -
Проверьте подключение к источнику данных:
# Для Kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic <topic> # Для PostgreSQL psql <connection-string> -c "SELECT * FROM <table> LIMIT 10;" -
Проверьте логи оператора на наличие ошибок
Проблемы с подключением
- Убедитесь, что источники данных доступны из кластера
- Проверьте сетевые политики Kubernetes
- Проверьте правильность connection strings и credentials
- Для локальной разработки используйте
localhostилиhost.docker.internal