Перейти к основному содержимому

Введение в Go Pipeline v2

Go Pipeline v2 — это высокопроизводительный фреймворк для пакетной обработки данных на Go, который поддерживает дженерики, безопасность конкурентности и предоставляет два режима: стандартная пакетная обработка и пакетная обработка с дедупликацией.

🚀 Основные функции

  • Поддержка дженериков: Основан на дженериках Go 1.18+, типобезопасный
  • Механизм пакетной обработки: Поддерживает автоматическую пакетную обработку по размеру и временным интервалам
  • Безопасность конкурентности: Встроенный механизм безопасности горутин
  • Гибкая конфигурация: Настраиваемый размер буфера, размер пакета и интервал сброса
  • Обработка ошибок: Комплексный механизм обработки и распространения ошибок
  • Два режима: Стандартная пакетная обработка и пакетная обработка с дедупликацией
  • Синхронный/Асинхронный: Поддерживает синхронный и асинхронный режимы выполнения
  • Соглашения Go: Принимает принцип управления каналами "писатель закрывает"

📋 Системные требования

  • Go 1.18+ (поддержка дженериков)
  • Поддерживает Linux, macOS, Windows

📦 Установка

go get github.com/rushairer/go-pipeline/v2@latest

🏗️ Архитектурный дизайн

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Ввод данных │───▶│ Канал буфера │───▶│ Пакетный │
└─────────────────┘ └──────────────────┘ │ процессор │
│ └─────────────────┘
▼ │
┌──────────────────┐ ▼
│ Timer Ticker │ ┌─────────────────┐
└──────────────────┘ │ Обработчик │
│ │ сброса │
└────────┬─────└─────────────────┘

┌─────────────────┐
│ Канал ошибок │
└─────────────────┘

📦 Основные компоненты

Определения интерфейсов

  • PipelineChannel[T]: Определяет интерфейс доступа к каналу пайплайна
  • Performer: Определяет интерфейс для выполнения операций пайплайна
  • DataProcessor[T]: Определяет основной интерфейс для пакетной обработки данных
  • Pipeline[T]: Объединяет всю функциональность пайплайна в универсальный интерфейс

Типы реализации

  • StandardPipeline[T]: Стандартный пайплайн пакетной обработки, данные обрабатываются пакетами по порядку
  • DeduplicationPipeline[T]: Пайплайн пакетной обработки с дедупликацией, дедуплицирует на основе уникальных ключей
  • PipelineImpl[T]: Общая реализация пайплайна, предоставляет базовую функциональность

💡 Быстрый старт

Пример стандартного пайплайна

package main

import (
"context"
"fmt"
"log"
"time"

gopipeline "github.com/rushairer/go-pipeline/v2"
)

func main() {
// Создать стандартный пайплайн
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("Обработка пакетных данных: %v\n", batchData)
return nil
},
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Запустить асинхронную обработку
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
}()

// Слушать ошибки
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("Ошибка обработки: %v", err)
}
}()

// Добавить данные
dataChan := pipeline.DataChan()
for i := 0; i < 100; i++ {
dataChan <- i
}

// Закрыть канал данных
close(dataChan)

// Дождаться завершения обработки
time.Sleep(time.Second * 2)
}

📋 Параметры конфигурации

type PipelineConfig struct {
BufferSize uint32 // Емкость канала буфера (по умолчанию: 100)
FlushSize uint32 // Максимальная емкость данных пакетной обработки (по умолчанию: 50)
FlushInterval time.Duration // Временной интервал для запланированного обновления (по умолчанию: 50ms)
}

🎯 Оптимизированные для производительности значения по умолчанию

На основе бенчмарков производительности, версия v2 принимает оптимизированную конфигурацию по умолчанию:

  • BufferSize: 100 - Размер буфера, должен быть >= FlushSize * 2 для избежания блокировки
  • FlushSize: 50 - Размер пакета, тесты производительности показывают, что около 50 оптимально
  • FlushInterval: 50ms - Интервал сброса, балансирует задержку и пропускную способность

Следующие шаги