Go Pipeline v2 介绍
Go Pipeline v2 是一个高性能的Go语言批处理管道 框架,支持泛型、并发安全,提供标准批处理和去重批处理两种模式。
🚀 核心特性
- 泛型支持: 基于Go 1.18+泛型,类型安全
- 批处理机制: 支持按大小和时间间隔自动批处理
- 并发安全: 内置goroutine安全机制
- 灵活配置: 可自定义缓冲区大小、批处理大小和刷新间隔
- 错误处理: 完善的错误处理和传播机制
- 两种模式: 标准批处理和去重批处理
- 同步/异步: 支持同步和异步执行模式
- 遵循Go惯例: 采用"谁写谁关闭"的通道管理原则
📋 系统要求
- Go 1.18+ (支持泛型)
- 支持 Linux、macOS、Windows
📦 安装
go get github.com/rushairer/go-pipeline/v2@latest
🏗️ 架构设计
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Data Input │───▶│ Buffer Channel │───▶│ Batch Processor│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Timer Ticker │ │ Flush Handler │
└──────────────────┘ └─────────────────┘
│ │
└────────┬───────────────┘
▼
┌─────────────────┐
│ Error Channel │
└─────────────────┘
📦 核心组件
接口定义
PipelineChannel[T]
: 定义管道通道访问接口Performer
: 定义执行管道操作的接口DataProcessor[T]
: 定义批处理数据的核心接口Pipeline[T]
: 组合所有管道功能的通用接口
实 现类型
StandardPipeline[T]
: 标准批处理管道,数据按顺序批处理DeduplicationPipeline[T]
: 去重批处理管道,基于唯一键去重PipelineImpl[T]
: 通用管道实现,提供基础功能
💡 快速开始
标准管道示例
package main
import (
"context"
"fmt"
"log"
"time"
gopipeline "github.com/rushairer/go-pipeline/v2"
)
func main() {
// 创建标准管道
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("处理批次数据: %v\n", batchData)
return nil
},
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// 启动异步处理
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("管道执行错误: %v", err)
}
}()
// 监听错误
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("处理错误: %v", err)
}
}()
// 添加数据
dataChan := pipeline.DataChan()
for i := 0; i < 100; i++ {
dataChan <- i
}
// 关闭数据通道
close(dataChan)
// 等待处理完成
time.Sleep(time.Second * 2)
}
📋 配置参数
type PipelineConfig struct {
BufferSize uint32 // 缓冲通 道的容量 (默认: 100)
FlushSize uint32 // 批处理数据的最大容量 (默认: 50)
FlushInterval time.Duration // 定时刷新的时间间隔 (默认: 50ms)
}
🎯 性能优化的默认值
基于性能基准测试,v2 版本采用了优化的默认配置:
- BufferSize: 100 - 缓冲区大小,应该 >= FlushSize * 2 以避免阻塞
- FlushSize: 50 - 批处理大小,性能测试显示 50 左右为最优
- FlushInterval: 50ms - 刷新间隔,平衡延迟和吞吐量