Справочник API
Этот документ предоставляет полный справочник API для Go Pipeline v2.
Основные интерфейсы
Pipeline[T any]
Основной интерфейс пайплайна, который объединяет всю функциональность пайплайна.
type Pipeline[T any] interface {
PipelineChannel[T]
Performer[T]
DataProcessor[T]
}
PipelineChannel[T any]
Определяет интерфейс доступа к каналу пайплайна.
type PipelineChannel[T any] interface {
// DataChan возвращает канал для записи для добавления данных в пайплайн
DataChan() chan<- T
// ErrorChan возвращает канал только для чтения для получения информации об ошибках из пайплайна
ErrorChan(size int) <-chan error
}
DataChan()
Возвращает канал ввода данных.
Возвращаемое значение: chan<- T
- Канал только для записи для добавления данных
Пример использования:
dataChan := pipeline.DataChan()
dataChan <- "some data"
close(dataChan) // Закрыть канал по завершении
ErrorChan(size int)
Возвращает канал вывода ошибок.
Параметры:
size int
- Размер буфера канала ошибок
Возвращаемое значение: <-chan error
- Канал только для чтения для получения ошибок
Пример использования:
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("Ошибка пайплайна: %v", err)
}
}()
Performer[T any]
Определяет интерфейс для выполнения операций пайплайна.
type Performer[T any] interface {
// AsyncPerform выполняет операции пайплайна асинхронно
AsyncPerform(ctx context.Context) error
// SyncPerform выполняет операции пайплайна синхронно
SyncPerform(ctx context.Context) error
}
AsyncPerform(ctx context.Context)
Выполняет операции пайплайна асинхронно, не блокирует вызывающий поток.
Параметры:
ctx context.Context
- Объект контекста для контроля жизненного цикла операции
Возвращаемое значение: error
- Возвращает ошибку, если ctx отменен
Пример использования:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
}()
SyncPerform(ctx context.Context)
Выполняет операции пайплайна синхронно, блокирует до завершения или отмены.
Параметры:
ctx context.Context
- Объект контекста
Возвращаемое значение: error
- Ошибка выполнения или ошибка отмены
Пример использования:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := pipeline.SyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
DataProcessor[T any]
Определяет основной интерфейс для пакетной обработки данных (в основном для внутренней реализации).
type DataProcessor[T any] interface {
initBatchData() any
addToBatch(batchData any, data T) any
flush(ctx context.Context, batchData any) error
isBatchFull(batchData any) bool
isBatchEmpty(batchData any) bool
}
Типы конфигурации
PipelineConfig
Структура конфигурации пайплайна.
type PipelineConfig struct {
BufferSize uint32 // Емкость канала буфера (по умолчанию: 100)
FlushSize uint32 // Максимальная емкость данных пакетной обработки (по умолчанию: 50)
FlushInterval time.Duration // Временной интервал для запланированного обновления (по умолчанию: 50ms)
}
Описания полей:
BufferSize
: Размер буфера внутреннего канала данныхFlushSize
: Максимальное количество данных на пакетную обработкуFlushInterval
: Временной интервал для запуска пакетной обработки
API стандартного пайплайна
Определения типов
type FlushStandardFunc[T any] func(ctx context.Context, batchData []T) error
type StandardPipeline[T any] struct {
*PipelineImpl[T]
flushFunc FlushStandardFunc[T]
}
Конструкторы
NewDefaultStandardPipeline[T any]
Создает стандартный пайплайн с конфигурацией по умолчанию.
func NewDefaultStandardPipeline[T any](
flushFunc FlushStandardFunc[T],
) *StandardPipeline[T]
Параметры:
flushFunc FlushStandardFunc[T]
- Функция пакетной обработки
Возвращаемое значение: *StandardPipeline[T]
- Экземпляр стандартного пайплайна
Пример использования:
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []string) error {
fmt.Printf("Обработка %d элементов: %v\n", len(batchData), batchData)
return nil
},
)
NewStandardPipeline[T any]
Создает стандартный пайплайн с пользовательской конфигурацией.
func NewStandardPipeline[T any](
config PipelineConfig,
flushFunc FlushStandardFunc[T],
) *StandardPipeline[T]
Параметры:
config PipelineConfig
- Пользовательская конфигурацияflushFunc FlushStandardFunc[T]
- Функция пакетной обработки
Возвращаемое значение: *StandardPipeline[T]
- Экземпляр стандартного пайплайна
Пример использования:
standardConfig := gopipeline.PipelineConfig{
BufferSize: 200,
FlushSize: 100,
FlushInterval: time.Millisecond * 100,
}
pipeline := gopipeline.NewStandardPipeline(standardConfig,
func(ctx context.Context, batchData []string) error {
return processData(batchData)
},
)
API пайплайна дедупликации
Определения типов
type KeyFunc[T any] func(T) string
type FlushDeduplicationFunc[T any] func(ctx context.Context, batchData []T) error
type DeduplicationPipeline[T any] struct {
*PipelineImpl[T]
keyFunc KeyFunc[T]
flushFunc FlushDeduplicationFunc[T]
}
Конструкторы
NewDefaultDeduplicationPipeline[T any]
Создает пайплайн дедупликации с конфигурацией по умолчанию.
func NewDefaultDeduplicationPipeline[T any](
keyFunc KeyFunc[T],
flushFunc FlushDeduplicationFunc[T],
) *DeduplicationPipeline[T]
Параметры:
keyFunc KeyFunc[T]
- Функция генерации уникального ключаflushFunc FlushDeduplicationFunc[T]
- Функция пакетной обработки
Возвращаемое значение: *DeduplicationPipeline[T]
- Экземпляр пайплайна дедупликации
Пример использования:
pipeline := gopipeline.NewDefaultDeduplicationPipeline(
func(user User) string {
return user.Email // Использовать email как уникальный ключ
},
func(ctx context.Context, users []User) error {
return processUsers(users)
},
)
NewDeduplicationPipeline[T any]
Создает пайплайн дедупликации с пользовательской конфигурацией.
func NewDeduplicationPipeline[T any](
config PipelineConfig,
keyFunc KeyFunc[T],
flushFunc FlushDeduplicationFunc[T],
) *DeduplicationPipeline[T]
Параметры:
config PipelineConfig
- Пользовательская конфигурацияkeyFunc KeyFunc[T]
- Функция генерации уникального ключаflushFunc FlushDeduplicationFunc[T]
- Функция пакетной обработки
Возвращаемое значение: *DeduplicationPipeline[T]
- Экземпляр пайплайна дедупликации
Пример использования:
deduplicationConfig := gopipeline.PipelineConfig{
BufferSize: 100,
FlushSize: 50,
FlushInterval: time.Millisecond * 100,
}
pipeline := gopipeline.NewDeduplicationPipeline(deduplicationConfig,
func(product Product) string {
return fmt.Sprintf("%s-%s", product.SKU, product.Version)
},
func(ctx context.Context, products []Product) error {
return updateProducts(products)
},
)
Типы ошибок
PipelineError
Базовый тип для ошибок, связанных с пайплайном.
type PipelineError struct {
Op string // Имя операции
Err error // Исходная ошибка
}
func (e *PipelineError) Error() string {
return fmt.Sprintf("pipeline %s: %v", e.Op, e.Err)
}
func (e *PipelineError) Unwrap() error {
return e.Err
}
Общие ошибки
ErrPipelineClosed
: Пайплайн закрытErrContextCanceled
: Контекст был отмененErrFlushTimeout
: Таймаут операции сброса