Перейти к основному содержимому

Введение в Go Pipeline v2

Go Pipeline v2 - это высокопроизводительный фреймворк конвейера пакетной обработки для Go, основанный на дженериках и безопасности параллелизма, со встроенной пакетной обработкой по размеру пакета и временному окну, противодавлением и корректным завершением, ошибками и метриками, асинхронным сбросом с ограничением скорости и динамической настройкой параметров, предоставляющий стандартные режимы конвейера и дедупликации.

🚀 Основные функции

  • Поддержка дженериков: Основан на дженериках Go 1.20+, типобезопасный
  • Пакетная обработка: Поддерживает автоматическую пакетную обработку по размеру и временному интервалу
  • Безопасность параллелизма: Встроенные механизмы безопасности горутин
  • Гибкая конфигурация: Настраиваемый размер буфера, размер пакета и интервал сброса
  • Обработка ошибок: Комплексные механизмы обработки и распространения ошибок
  • Два режима: Стандартная пакетная обработка и пакетная обработка с дедупликацией
  • Синхронный/Асинхронный: Поддерживает как синхронные, так и асинхронные режимы выполнения
  • Соглашения Go: Следует принципам управления каналами "кто пишет, тот закрывает"
  • Удобный API: Новые методы Start() и Run() для уменьшения шаблонного кода
  • Динамическая настройка параметров: Поддерживает безопасную настройку ключевых параметров во время выполнения
  • Корректное завершение: Поддерживает ограниченную по времени очистку при отмене и защиту от таймаута финального сброса

📋 Системные требования

  • Go 1.20+ (поддержка дженериков)
  • Поддерживает Linux, macOS, Windows

📦 Установка

go get github.com/rushairer/go-pipeline/v2@latest

🏗️ Архитектурный дизайн

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Ввод данных │───▶│ Буферный канал │───▶│ Пакетный │
│ │ │ │ │ процессор │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Таймер тикер │ │ Обработчик │
│ │ │ сброса │
└──────────────────┘ └─────────────────┘
│ │
└────────┬───────────────┘

┌─────────────────┐
│ Канал ошибок │
└─────────────────┘

📦 Основные компоненты

Определения интерфейсов

  • PipelineChannel[T]: Определяет интерфейс доступа к каналу конвейера
  • Performer: Определяет интерфейс для выполнения операций конвейера
  • DataProcessor[T]: Определяет основной интерфейс для пакетной обработки данных
  • Pipeline[T]: Объединяет всю функциональность конвейера в универсальный интерфейс

Типы реализации

  • StandardPipeline[T]: Стандартный конвейер пакетной обработки, данные обрабатываются пакетами по порядку
  • DeduplicationPipeline[T]: Конвейер пакетной обработки с дедупликацией, дедуплицирует на основе уникальных ключей
  • PipelineImpl[T]: Универсальная реализация конвейера, предоставляет базовую функциональность

💡 Быстрый старт

Использование удобного API (Рекомендуется)

package main

import (
"context"
"fmt"
"log"
"time"

gopipeline "github.com/rushairer/go-pipeline/v2"
)

func main() {
// Создать стандартный конвейер
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("Обработка пакетных данных: %v\n", batchData)
return nil
},
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Запуск с использованием удобного API
done, errs := pipeline.Start(ctx)

// Прослушивание ошибок
go func() {
for err := range errs {
log.Printf("Ошибка обработки: %v", err)
}
}()

// Добавление данных
dataChan := pipeline.DataChan()
go func() {
defer close(dataChan) // Кто пишет, тот закрывает
for i := 0; i < 100; i++ {
select {
case dataChan <- i:
case <-ctx.Done():
return
}
}
}()

// Ожидание завершения
<-done
}

Пример синхронного выполнения

func syncExample() {
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("Обработка пакетных данных: %v\n", batchData)
return nil
},
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Синхронный запуск, установить емкость канала ошибок в 128
if err := pipeline.Run(ctx, 128); err != nil {
log.Printf("Ошибка выполнения конвейера: %v", err)
}
}

📋 Параметры конфигурации

type PipelineConfig struct {
BufferSize uint32 // Емкость буферного канала (по умолчанию: 100)
FlushSize uint32 // Максимальная емкость для данных пакетной обработки (по умолчанию: 50)
FlushInterval time.Duration // Временной интервал для запланированного обновления (по умолчанию: 50ms)
DrainOnCancel bool // Выполнять ли ограниченную по времени очистку при отмене (по умолчанию false)
DrainGracePeriod time.Duration // Максимальное временное окно для очистки
FinalFlushOnCloseTimeout time.Duration // Таймаут финального сброса для пути закрытия канала (0 означает отключено)
MaxConcurrentFlushes uint32 // Максимальное количество одновременных асинхронных сбросов (0 означает неограниченно)
}

🎯 Оптимизированные для производительности значения по умолчанию

На основе бенчмарков производительности, v2.2.2 использует оптимизированную конфигурацию по умолчанию:

  • BufferSize: 100 - Размер буфера, должен быть >= FlushSize * 2 для избежания блокировки
  • FlushSize: 50 - Размер пакета, тесты производительности показывают, что около 50 оптимально
  • FlushInterval: 50ms - Интервал сброса, балансирует задержку и пропускную способность

🔧 Удобные методы конфигурации

// Создать конфигурацию с использованием цепочечных методов
config := gopipeline.NewPipelineConfig().
WithFlushInterval(time.Millisecond * 10).
WithBufferSize(200).
WithDrainOnCancel(true).
WithDrainGracePeriod(150 * time.Millisecond)

pipeline := gopipeline.NewStandardPipeline(config, flushFunc)

Следующие шаги