Справочник API
Полная документация API для Go Pipeline v2.
Основные Интерфейсы
PipelineChannel[T]
Определяет интерфейс доступа к каналу конвейера.
type PipelineChannel[T any] interface {
DataChan() chan<- T
ErrorChan(capacity uint32) <-chan error
}
Методы:
DataChan(): Возвращает канал данных только для записиErrorChan(capacity uint32): Возвращает канал ошибок только для чтения с указанной емкостью
Performer
Определяет интерфейс для выполнения операций конвейера.
type Performer interface {
AsyncPerform(ctx context.Context) error
}
Методы:
AsyncPerform(ctx context.Context): Запускает асинхронное выполнение конвейера
DataProcessor[T]
Определяет основной интерфейс для пакетной обработки данных.
type DataProcessor[T any] interface {
ProcessBatch(ctx context.Context, batchData []T) error
}
Методы:
ProcessBatch(ctx context.Context, batchData []T): Обрабатывает пакетные данные
Pipeline[T]
Объединяет всю функциональность конвейера в универсальный интерфейс.
type Pipeline[T any] interface {
PipelineChannel[T]
Performer
DataProcessor[T]
// Удобные методы API
Start(ctx context.Context) (<-chan struct{}, <-chan error)
Run(ctx context.Context, errorChanCapacity uint32) error
// Динамическая настройка параметров
SetFlushSize(n uint32)
SetFlushInterval(d time.Duration)
SetMaxConcurrentFlushes(n uint32)
}
Типы Реализации
StandardPipeline[T]
Стандартный конвейер пакетной обработки, данные обрабатываются пакетами по порядку.
type StandardPipeline[T any] struct {
// Детали внутренней реализации
}
Конструктор:
func NewStandardPipeline[T any](
config PipelineConfig,
flushFunc func(ctx context.Context, batchData []T) error,
) *StandardPipeline[T]
Конструктор по Умолчанию:
func NewDefaultStandardPipeline[T any](
flushFunc func(ctx context.Context, batchData []T) error,
) *StandardPipeline[T]
DeduplicationPipeline[T, K]
Конвейер пакетной обработки с дедупликацией, дедуплицирует на основе уникальных ключей.
type DeduplicationPipeline[T any, K comparable] struct {
// Детали внутренней реализации
}
Конструктор:
func NewDeduplicationPipeline[T any, K comparable](
config PipelineConfig,
keyFunc func(T) K,
flushFunc func(ctx context.Context, batchData []T) error,
) *DeduplicationPipeline[T, K]
Конструктор по Умолчанию:
func NewDefaultDeduplicationPipeline[T any, K comparable](
keyFunc func(T) K,
flushFunc func(ctx context.Context, batchData []T) error,
) *DeduplicationPipeline[T, K]
Конфигурация
PipelineConfig
Структура конфигурации конвейера.
type PipelineConfig struct {
BufferSize uint32 // Емкость буферного канала (по умолчанию: 100)
FlushSize uint32 // Максимальная емкость для данных пакетной обработки (по умолчанию: 50)
FlushInterval time.Duration // Временной интервал для запланированного обновления (по умолчанию: 50ms)
DrainOnCancel bool // Выполнять ли ограниченную по времени очистку при отмене (по умолчанию false)
DrainGracePeriod time.Duration // Максимальное временное окно для очистки
FinalFlushOnCloseTimeout time.Duration // Таймаут финального сброса для пути закрытия канала (0 означает отключено)
MaxConcurrentFlushes uint32 // Максимальное количество одновременных асинхронных сбросов (0 означает неограниченно)
}
Конструктор Конфигурации
func NewPipelineConfig() PipelineConfig
Возвращает новую конфигурацию с оптимизированными значениями по умолчанию.
Методы Конфигурации
func (c PipelineConfig) WithBufferSize(size uint32) PipelineConfig
func (c PipelineConfig) WithFlushSize(size uint32) PipelineConfig
func (c PipelineConfig) WithFlushInterval(interval time.Duration) PipelineConfig
func (c PipelineConfig) WithDrainOnCancel(enabled bool) PipelineConfig
func (c PipelineConfig) WithDrainGracePeriod(d time.Duration) PipelineConfig
func (c PipelineConfig) WithFinalFlushOnCloseTimeout(d time.Duration) PipelineConfig
func (c PipelineConfig) WithMaxConcurrentFlushes(n uint32) PipelineConfig
Детали Методов
Метод Start
Удобный API для асинхронного выполнения.
func (p *Pipeline[T]) Start(ctx context.Context) (<-chan struct{}, <-chan error)
Параметры:
ctx: Контекст для управления жизненным циклом конвейера
Возвращает:
<-chan struct{}: К анал завершения, закрывается при завершении конвейера<-chan error: Канал ошибок для получения ошибок обработки
Пример Использования:
pipeline := gopipeline.NewDefaultStandardPipeline(flushFunc)
done, errs := pipeline.Start(ctx)
// Прослушивание ошибок
go func() {
for err := range errs {
log.Printf("Ошибка обработки: %v", err)
}
}()
// Добавление данных
dataChan := pipeline.DataChan()
go func() {
defer close(dataChan)
for i := 0; i < 100; i++ {
dataChan <- i
}
}()
// Ожидание завершения
<-done
Метод Run
Удобный API для синхронного выполнения.
func (p *Pipeline[T]) Run(ctx context.Context, errorChanCapacity uint32) error
Параметры:
ctx: Контекст для управления жизненным циклом конвейераerrorChanCapacity: Емкость канала ошибок
Возвращает:
error: Первая ошибка, встреченная во время выполнения, или nil при успехе
Пример Использования:
pipeline := gopipeline.NewDefaultStandardPipeline(flushFunc)
// Добавление данных
dataChan := pipeline.DataChan()
go func() {
defer close(dataChan)
for i := 0; i < 100; i++ {
dataChan <- i
}
}()
// Синхронное выполнение
if err := pipeline.Run(ctx, 128); err != nil {
log.Printf("Ошибка выполнения конвейера: %v", err)
}
Метод AsyncPerform
Традиционный API асинхронного выполнения.
func (p *Pipeline[T]) AsyncPerform(ctx context.Context) error
Параметры:
ctx: Контекст для управления жизненным циклом конвейера
Возвращает:
error: Ошибка при неудачном запуске
Пример Использования:
pipeline := gopipeline.NewDefaultStandardPipeline(flushFunc)
// Запуск конвейера
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Fatalf("Не удалось запустить конвейер: %v", err)
}
// Обработка ошибок
go func() {
for err := range pipeline.ErrorChan(100) {
log.Printf("Ошибка обработки: %v", err)
}
}()
// Добавление данных
dataChan := pipeline.DataChan()
for i := 0; i < 100; i++ {
dataChan <- i
}
close(dataChan)
Динамическая Настройка Параметров
SetFlushSize
Динамически настроить размер пакета во время выполнения.
func (p *Pipeline[T]) SetFlushSize(n uint32)
Параметры:
n: Новый размер пакета
Примечания:
- Изменения не влияют на пакеты, которые в данный момент строятся
- Потокобезопасная операция
SetFlushInterval
Динамически настроить интервал сброса во время выполнения.
func (p *Pipeline[T]) SetFlushInterval(d time.Duration)
Параметры:
d: Новый интервал сброса
Примечания:
- Изменения вступают в силу при следующем сбросе таймера
- Потокобезопасная операция
SetMaxConcurrentFlushes
Динамически настроить максимальное количество одновременных сбросов во время выполнения.
func (p *Pipeline[T]) SetMaxConcurrentFlushes(n uint32)
Параметры:
n: Максимальное количество одновременных сбросов (0 означает неограниченно)
Примечания:
- Изменения вступают в силу немедленно
- Потокобезопасная операция
Шаблоны Использования
Базовый Шаблон Использования
// 1. Создать конвейер
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
// Обработать пакетные данные
fmt.Printf("Обработка %d элементов: %v\n", len(batchData), batchData)
return nil
},
)
// 2. Запустить конвейер
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
done, errs := pipeline.Start(ctx)
// 3. Обработать ошибки
go func() {
for err := range errs {
log.Printf("Ошибка: %v", err)
}
}()
// 4. Добавить данные
dataChan := pipeline.DataChan()
go func() {
defer close(dataChan) // Кто пишет, тот закрывает
for i := 0; i < 100; i++ {
select {
case dataChan <- i:
case <-ctx.Done():
return
}
}
}()
// 5. Ждать завершения
<-done
Шаблон Пользовательской Конфигурации
// Создать пользовательскую конфигурацию
config := gopipeline.NewPipelineConfig().
WithBufferSize(200).
WithFlushSize(100).
WithFlushInterval(time.Millisecond * 100).
WithDrainOnCancel(true).
WithDrainGracePeriod(time.Second * 5)
// Создать конвейер с пользовательской конфигурацией
pipeline := gopipeline.NewStandardPipeline(config, flushFunc)
// Использовать конвейер...
Шаблон Дедупликации
type User struct {
ID int
Name string
}
// Создать конвейер дедупликации
pipeline := gopipeline.NewDefaultDeduplicationPipeline(
func(user User) int { return user.ID }, // Функция ключа
func(ctx context.Context, users []User) error {
// Обработать дедуплицированных пользователей
fmt.Printf("Обработка %d уникальных пользователей\n", len(users))
return nil
},
)
// Использовать конвейер...
Шаблон Обработки Ошибок
pipeline := gopipeline.NewDefaultStandardPipeline(flushFunc)
// Запустить конвейер
done, errs := pipeline.Start(ctx)
// Комплексная обработка ошибок
go func() {
for err := range errs {
// Записать ошибку
log.Printf("Ошибка конвейера: %v", err)
// Реализовать логику повтора или оповещения
if isRetryableError(err) {
// Реализовать логику повтора
} else {
// Отправить оповещение
sendAlert(err)
}
}
}()
// Добавить данные и ждать завершения
// ...