Введение в Go Pipeline v2
Go Pipeline v2 - это высокопроизводительный фреймворк конвейера пакетной обработки для Go, основанный на дженериках и безопасности параллелизма, со встроенной пакетной обработкой по размеру пакета и временному окну, противодавлением и корректным завершением, ошибками и метриками, асинхронным сбросом с ограничением скорости и динамической настройкой параметров, предоставляющий стандартные режимы конвейера и дедупликации.
🚀 Основные функции
- Поддержка дженериков: Основан на дженериках Go 1.20+, типобезопасный
- Пакетная обработка: Поддерживает автоматическую пакетную обработку по размеру и временному интервалу
- Безопасность параллелизма: Встроенные механизмы безопасности горутин
- Гибкая конфигурация: Настраиваемый размер буфера, размер пакета и интервал сброса
- Обработка ошибок: Комплексные механизмы обработки и распространения ошибок
- Два режима: Стандартная пакетная обработка и пакетная обработка с дедупликацией
- Синхронный/Асинхронный: Поддерживает как синхронные, так и асинхронные режимы выполнения
- Соглашения Go: Следует принципам управления каналами "кто пишет, тот закрывает"
- Удобный API: Новые методы Start() и Run() для уменьшения шаблонного кода
- Динамическая настройка параметров: Поддерживает безопасную настройку ключевых параметров во время выполнения
- Корректное завершение: Поддерживает ограниченную по времени очистку при отмене и защиту от таймаута финального сброса
📋 Системные требования
- Go 1.20+ (поддержка дженериков)
- Поддерживает Linux, macOS, Windows
📦 Установка
go get github.com/rushairer/go-pipeline/v2@latest
🏗️ Архитектурный дизайн
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Ввод данных │───▶│ Буферный канал │───▶│ Пакетный │
│ │ │ │ │ процессор │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Таймер тикер │ │ Обработчик │
│ │ │ сброса │
└──────────────────┘ └─────────────────┘
│ │
└────────┬───────────────┘
▼
┌─────────────────┐
│ Канал ошибок │
└─────────────────┘
📦 Основные компоненты
Определения интерфейсов
PipelineChannel[T]: Определяет интерфейс доступа к каналу конвейераPerformer: Определяет интерфейс для выполнения операций конвейераDataProcessor[T]: Определяет основной интерфейс для пакетной обработки данныхPipeline[T]: Объединяет всю функциональность конвейера в универсальный интерфейс
Типы реализации
StandardPipeline[T]: Стандартный конвейер пакетной обработки, данные обрабатываются пакетами по порядкуDeduplicationPipeline[T]: Конвейер пакетной обработки с дедупликацией, дедуплицирует на основе уникальных ключейPipelineImpl[T]: Универсальная реализация конвейера, предоставляет базовую функциональность
💡 Быстрый старт
Использование удобного API (Рекомендуется)
package main
import (
"context"
"fmt"
"log"
"time"
gopipeline "github.com/rushairer/go-pipeline/v2"
)
func main() {
// Создать стандартный конвейер
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("Обработка пакетных данных: %v\n", batchData)
return nil
},
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// Запуск с использование м удобного API
done, errs := pipeline.Start(ctx)
// Прослушивание ошибок
go func() {
for err := range errs {
log.Printf("Ошибка обработки: %v", err)
}
}()
// Добавление данных
dataChan := pipeline.DataChan()
go func() {
defer close(dataChan) // Кто пишет, тот закрывает
for i := 0; i < 100; i++ {
select {
case dataChan <- i:
case <-ctx.Done():
return
}
}
}()
// Ожидание завершения
<-done
}
Пример синхронного выполнения
func syncExample() {
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("Обработка пакетных данных: %v\n", batchData)
return nil
},
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// Синхронный запуск, установить емкость канала ошибок в 128
if err := pipeline.Run(ctx, 128); err != nil {
log.Printf("Ошибка выполнения конвейера: %v", err)
}
}
📋 Параметры конфигурации
type PipelineConfig struct {
BufferSize uint32 // Емкость буферного канала (по умолчанию: 100)
FlushSize uint32 // Максимальная емкость для данных пакетной обработки (по умолчанию: 50)
FlushInterval time.Duration // Временной интервал для запланированного обновления (по умолчанию: 50ms)
DrainOnCancel bool // Выполнять ли ограниченную по времени очистку при отмене (по умолчанию false)
DrainGracePeriod time.Duration // Максимальное временное окно для очистки
FinalFlushOnCloseTimeout time.Duration // Таймаут финального сброса для пути закрытия канала (0 означает отключено)
MaxConcurrentFlushes uint32 // Максимальное количество одновременных асинхронных сбросов (0 означает неограниченно)
}
🎯 Оптимизированные для производительности значения по умолчанию
На основе бенчмарков производительности, v2.2.2 использует оптимизированную конфигурацию по умолчанию:
- BufferSize: 100 - Размер буфера, должен быть >= FlushSize * 2 для избежания блокировки
- FlushSize: 50 - Размер пакета, тесты производительности показывают, что около 50 оптимально
- FlushInterval: 50ms - Интервал сброса, балансирует задержку и пропускную способность