Разработка коннекторов с baseConnector
Подробное руководство по добавлению нового коннектора в DataFlow Operator с использованием общего паттерна baseConnector.
Обзор
Все коннекторы (ClickHouse, PostgreSQL, Kafka, Trino, Nessie) используют единый паттерн для синхронизации методов Connect и Close:
sync.Mutex(илиsync.RWMutex) для защиты состояния- Флаг
closedдля идемпотентностиCloseи блокировкиConnectпосле закрытия - Одинаковая последовательность:
Lock→ проверкаclosed→ логика →Unlock
Чтобы избежать дублирования, используется встраиваемый baseConnector или baseConnectorRWMutex.
baseConnector и baseConnectorRWMutex
baseConnector (sync.Mutex)
Используется для большинства коннекторов, где операции чтения не требуют отдельной блокировки.
Методы:
| Метод | Описание |
|---|---|
guardConnect() bool |
Захватывает lock. Возвращает false, если коннектор уже закрыт. При true вызывающий держит lock и должен вызвать Unlock() |
guardClose() bool |
Захватывает lock. Возвращает true, если уже закрыт (идемпотентность). При false устанавливает closed=true, вызывающий держит lock |
Unlock() |
Освобождает lock |
Lock() |
Захватывает lock вручную (для операций вроде readRows, где нужна длительная блокировка) |
baseConnectorRWMutex (sync.RWMutex)
Используется, когда коннектор выполняет длительные операции чтения (например, SQL-запросы), и Connect/Close не должны блокироваться на время этих операций.
Дополнительные методы:
| Метод | Описание |
|---|---|
RLock() |
Захватывает read lock |
RUnlock() |
Освобождает read lock |
Closed() bool |
Возвращает closed (вызывать только под RLock) |
Пошаговое руководство: добавление нового коннектора
Шаг 1. Определение типов в API
Добавьте спецификацию в api/v1/dataflow_types.go:
// MyDBSourceSpec defines MyDB source configuration
type MyDBSourceSpec struct {
ConnectionString string `json:"connectionString"`
Table string `json:"table"`
// ...
}
Пользователи указывают коннектор в манифесте DataFlow через единый формат type + config:
source:
type: mydb
config:
connectionString: "mydb://localhost:3306/db"
table: my_table
Структуры SourceSpec и SinkSpec используют Type string + Config json.RawMessage. Фабрика парсит Config в вашу spec-структуру.
Шаг 2. Выбор baseConnector или baseConnectorRWMutex
Используйте baseConnector, если:
- Коннектор не выполняет длительные операции чтения под lock
- Или операции чтения модифицируют состояние (например,
lastReadID) и требуют полной блокировки
Используйте baseConnectorRWMutex, если:
- Коннектор выполняет длительные read-операции (запросы к БД)
- Эти операции только читают
connиclosed, не модифицируют их - Нужно, чтобы
Connect/Closeне блокировались на время запроса
Шаг 3. Реализация Source-коннектора
С baseConnector (простой случай)
// internal/connectors/mydb.go
package connectors
import (
"context"
"fmt"
v1 "github.com/dataflow-operator/dataflow/api/v1"
"github.com/dataflow-operator/dataflow/internal/types"
"github.com/go-logr/logr"
)
type MyDBSourceConnector struct {
baseConnector
config *v1.MyDBSourceSpec
conn *MyDBConnection
logger logr.Logger
}
func NewMyDBSourceConnector(config *v1.MyDBSourceSpec) *MyDBSourceConnector {
return &MyDBSourceConnector{
config: config,
logger: logr.Discard(),
}
}
func (c *MyDBSourceConnector) SetLogger(logger logr.Logger) {
c.logger = logger
}
func (c *MyDBSourceConnector) Connect(ctx context.Context) error {
if !c.guardConnect() {
return fmt.Errorf("connector is closed")
}
defer c.Unlock()
c.logger.Info("Connecting to MyDB", "table", c.config.Table)
conn, err := connectToMyDB(ctx, c.config.ConnectionString)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
c.conn = conn
c.logger.Info("Successfully connected to MyDB", "table", c.config.Table)
return nil
}
func (c *MyDBSourceConnector) Read(ctx context.Context) (<-chan *types.Message, error) {
if c.conn == nil {
return nil, fmt.Errorf("not connected, call Connect first")
}
// ... реализация чтения
return msgChan, nil
}
func (c *MyDBSourceConnector) Close() error {
if c.guardClose() {
return nil
}
defer c.Unlock()
c.logger.Info("Closing MyDB source connection", "table", c.config.Table)
if c.conn != nil {
return c.conn.Close()
}
return nil
}
С baseConnectorRWMutex (длительные read-операции)
Если readRows выполняет долгий запрос и не должен блокировать Connect/Close:
type MyDBSourceConnector struct {
baseConnectorRWMutex
config *v1.MyDBSourceSpec
conn *MyDBConnection
logger logr.Logger
}
func (c *MyDBSourceConnector) readRows(ctx context.Context, msgChan chan *types.Message) {
// RLock — не блокирует Connect/Close
c.RLock()
if c.Closed() {
c.RUnlock()
return
}
conn := c.conn
c.RUnlock()
if conn == nil {
return
}
// Долгий запрос выполняется БЕЗ удержания lock
rows, err := conn.QueryContext(ctx, "SELECT * FROM ...")
// ...
}
Шаг 4. Реализация Sink-коннектора
type MyDBSinkConnector struct {
baseConnector
config *v1.MyDBSinkSpec
conn *MyDBConnection
logger logr.Logger
}
func (c *MyDBSinkConnector) Connect(ctx context.Context) error {
if !c.guardConnect() {
return fmt.Errorf("connector is closed")
}
defer c.Unlock()
c.logger.Info("Connecting to MyDB", "table", c.config.Table)
conn, err := connectToMyDB(ctx, c.config.ConnectionString)
if err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
c.conn = conn
return nil
}
func (c *MyDBSinkConnector) Write(ctx context.Context, messages <-chan *types.Message) error {
// ... реализация записи
}
func (c *MyDBSinkConnector) Close() error {
if c.guardClose() {
return nil
}
defer c.Unlock()
c.logger.Info("Closing MyDB sink connection", "table", c.config.Table)
if c.conn != nil {
return c.conn.Close()
}
return nil
}
Шаг 5. Поддержка rawMode (опционально, только для sink-коннекторов)
Для sink-коннекторов можно добавить режим сырой записи — сохранение сообщений в формате {"value": ..., "_metadata": {...}}:
- Добавьте в sink spec поле
RawMode *boolс тегомjson:"rawMode,omitempty" - При записи сообщений проверяйте
config.RawMode != nil && *config.RawMode - При rawMode=true оборачивайте входящие plain-данные, используя
msg.Metadataдля_metadata(см. реализации PostgreSQL/ClickHouse/Trino sink)
Примечание: rawMode поддерживается только в sink-коннекторах. Источники всегда отдают plain колоночный формат.
Шаг 6. Регистрация в фабрике
В internal/connectors/factory.go:
func CreateSourceConnector(source *v1.SourceSpec) (SourceConnector, error) {
switch source.Type {
// ...
case "mydb":
var cfg MyDBSourceSpec
if err := json.Unmarshal(source.Config.Raw, &cfg); err != nil {
return nil, fmt.Errorf("invalid mydb source config: %w", err)
}
return NewMyDBSourceConnector(&cfg), nil
default:
return nil, fmt.Errorf("unknown source type: %s", source.Type)
}
}
func CreateSinkConnector(sink *v1.SinkSpec) (SinkConnector, error) {
switch sink.Type {
// ...
case "mydb":
var cfg MyDBSinkSpec
if err := json.Unmarshal(sink.Config.Raw, &cfg); err != nil {
return nil, fmt.Errorf("invalid mydb sink config: %w", err)
}
return NewMyDBSinkConnector(&cfg), nil
default:
return nil, fmt.Errorf("unknown sink type: %s", sink.Type)
}
}
Шаг 7. Генерация и тестирование
task generate
task manifests
task test-unit
Важные замечания
Порядок вызовов в Connect
if !c.guardConnect() {
return fmt.Errorf("connector is closed")
}
defer c.Unlock()
// ... логика подключения
guardConnect()возвращаетfalse→ коннектор закрыт, вызываемreturnguardConnect()возвращаетtrue→ держим lock, обязательноdefer c.Unlock()
Порядок вызовов в Close
if c.guardClose() {
return nil // уже закрыт, идемпотентность
}
defer c.Unlock()
// ... закрытие соединения
guardClose()возвращаетtrue→ уже закрыт, возвращаемnilguardClose()возвращаетfalse→ устанавливаемclosed=true, держим lock, закрываем ресурсы
Использование Lock() для readRows
Если readRows модифицирует состояние коннектора (например, lastReadID) и требует эксклюзивного доступа:
func (p *PostgreSQLSourceConnector) readRows(ctx context.Context, msgChan chan *types.Message) {
p.Lock()
defer p.Unlock()
// ... чтение и обновление lastReadID
}
Тестирование
Обязательно добавьте тесты:
Closeпри уже закрытом коннекторе (идемпотентность)ConnectпослеClose(должен возвращать ошибку)- Создание коннектора и проверка начального состояния
func TestMyDBSourceConnector_Close_WhenAlreadyClosed(t *testing.T) {
conn := NewMyDBSourceConnector(spec)
conn.SetLogger(logr.Discard())
require.NoError(t, conn.Close())
require.NoError(t, conn.Close()) // второй вызов — без ошибки
}
func TestMyDBSourceConnector_Connect_WhenClosed(t *testing.T) {
conn := NewMyDBSourceConnector(spec)
conn.SetLogger(logr.Discard())
conn.closed = true
err := conn.Connect(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "closed")
}
Ссылки
- Добавление нового коннектора — общий раздел в руководстве по разработке
- Интерфейсы коннекторов —
SourceConnectorиSinkConnector - baseConnector — исходный код