Стандартный пайплайн
StandardPipeline является одним из основных компонентов Go Pipeline v2, предоставляющим функциональность последовательной пакетной обработки данных.
Обзор
Стандартный пайплайн обрабатывает входные данные пакетами согласно настроенному размеру пакета и временным интервалам, подходит для сценариев, которые требуют сохранения порядка данных.
Основные функции
- Последовательная обработка: Данные обрабатываются пакетами в том порядке, в котором они были добавлены
- Автоматическая пакетная обработка: Поддерживает автоматическую пакетную обработку, запускаемую размером и временными интервалами
- Безопасность конкурентности: Встроенный механизм безопасности горутин
- Обработка ошибок: Комплексный сбор и распространение ошибок
Поток данных
Создание стандартного пайплайна
Использование конфигурации по умолчанию
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []string) error {
// Обработать пакетные данные
fmt.Printf("Обработка %d элементов: %v\n", len(batchData), batchData)
return nil
},
)
Использование пользовательской конфигурации
customConfig := gopipeline.PipelineConfig{
BufferSize: 200, // Размер буфера
FlushSize: 100, // Размер пакета
FlushInterval: time.Millisecond * 100, // Интервал сброса
}
pipeline := gopipeline.NewStandardPipeline(customConfig,
func(ctx context.Context, batchData []string) error {
// Обработать пакетные данные
return processData(batchData)
},
)
Примеры использования
Базовое использование
package main
import (
"context"
"fmt"
"log"
"time"
gopipeline "github.com/rushairer/go-pipeline/v2"
)
func main() {
// Создать пайплайн
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []string) error {
fmt.Printf("Пакетная обработка %d элементов: %v\n", len(batchData), batchData)
// Симулировать время обработки
time.Sleep(time.Millisecond * 10)
return nil
},
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// Запустить асинхронную обработку
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
}()
// Слушать ошибки
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("Ошибка обработки: %v", err)
}
}()
// Добавить данные
dataChan := pipeline.DataChan()
for i := 0; i < 200; i++ {
dataChan <- fmt.Sprintf("data-%d", i)
}
// Закрыть канал данных
close(dataChan)
// Дождаться завершения обработки
time.Sleep(time.Second * 2)
}
Пример пакетной вставки в базу данных
func batchInsertExample() {
// Создать пайплайн пакетной вставки в базу данных
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, users []User) error {
// Пакетная вставка в базу данных
return db.CreateInBatches(users, len(users)).Error
},
)
ctx := context.Background()
// Запустить пайплайн
go pipeline.AsyncPerform(ctx)
// Обработка ошибок
go func() {
for err := range pipeline.ErrorChan(10) {
log.Printf("Ошибка вставки в базу данных: %v", err)
}
}()
// Добавить пользовательские данные
dataChan := pipeline.DataChan()
for i := 0; i < 1000; i++ {
user := User{
Name: fmt.Sprintf("user-%d", i),
Email: fmt.Sprintf("user%d@example.com", i),
}
dataChan <- user
}
close(dataChan)
}
Пример пакетной обработки API-вызовов
func apiCallExample() {
pipeline := gopipeline.NewStandardPipeline(
gopipeline.PipelineConfig{
FlushSize: 20, // 20 элементов на вызов
FlushInterval: time.Millisecond * 200, // Интервал 200ms
},
func(ctx context.Context, requests []APIRequest) error {
// Пакетный вызов API
return batchCallAPI(requests)
},
)
// Использовать пайплайн...
}
Синхронное vs Асинхронное выполнение
Асинхронное выполнение (рекомендуется)
// Асинхронное выполнение, не блокирует основной поток
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
}()
Синхронное выполнение
// Синхронное выполнение, блокирует до завершения или отмены
if err := pipeline.SyncPerform(ctx); err != nil {
log.Printf("Ошибка выполнения пайплайна: %v", err)
}
Обработка ошибок
Стандартный пайплайн предоставляет комплексный механизм обработки ошибок:
// Создать канал ошибок
errorChan := pipeline.ErrorChan(100) // Размер буфера 100
// Слушать ошибки
go func() {
for err := range errorChan {
// Обработать ошибки
log.Printf("Ошибка пакетной обработки: %v", err)
// Можно обрабатывать разные типы ошибок по-разному
switch e := err.(type) {
case *DatabaseError:
// Обработка ошибки базы данных
case *NetworkError:
// Обработка сетевой ошибки
default:
// Обработка других ошибок
}
}
}()