跳到主要内容

Go Pipeline v2 介绍

Go Pipeline v2 是一个高性能的Go语言批处理管道框架,支持泛型、并发安全,提供标准批处理和去重批处理两种模式。

🚀 核心特性

  • 泛型支持: 基于Go 1.18+泛型,类型安全
  • 批处理机制: 支持按大小和时间间隔自动批处理
  • 并发安全: 内置goroutine安全机制
  • 灵活配置: 可自定义缓冲区大小、批处理大小和刷新间隔
  • 错误处理: 完善的错误处理和传播机制
  • 两种模式: 标准批处理和去重批处理
  • 同步/异步: 支持同步和异步执行模式
  • 遵循Go惯例: 采用"谁写谁关闭"的通道管理原则

📋 系统要求

  • Go 1.18+ (支持泛型)
  • 支持 Linux、macOS、Windows

📦 安装

go get github.com/rushairer/go-pipeline/v2@latest

🏗️ 架构设计

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Data Input │───▶│ Buffer Channel │───▶│ Batch Processor│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌─────────────────┐
│ Timer Ticker │ │ Flush Handler │
└──────────────────┘ └─────────────────┘
│ │
└────────┬───────────────┘

┌─────────────────┐
│ Error Channel │
└─────────────────┘

📦 核心组件

接口定义

  • PipelineChannel[T]: 定义管道通道访问接口
  • Performer: 定义执行管道操作的接口
  • DataProcessor[T]: 定义批处理数据的核心接口
  • Pipeline[T]: 组合所有管道功能的通用接口

实现类型

  • StandardPipeline[T]: 标准批处理管道,数据按顺序批处理
  • DeduplicationPipeline[T]: 去重批处理管道,基于唯一键去重
  • PipelineImpl[T]: 通用管道实现,提供基础功能

💡 快速开始

标准管道示例

package main

import (
"context"
"fmt"
"log"
"time"

gopipeline "github.com/rushairer/go-pipeline/v2"
)

func main() {
// 创建标准管道
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []int) error {
fmt.Printf("处理批次数据: %v\n", batchData)
return nil
},
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// 启动异步处理
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("管道执行错误: %v", err)
}
}()

// 监听错误
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("处理错误: %v", err)
}
}()

// 添加数据
dataChan := pipeline.DataChan()
for i := 0; i < 100; i++ {
dataChan <- i
}

// 关闭数据通道
close(dataChan)

// 等待处理完成
time.Sleep(time.Second * 2)
}

📋 配置参数

type PipelineConfig struct {
BufferSize uint32 // 缓冲通道的容量 (默认: 100)
FlushSize uint32 // 批处理数据的最大容量 (默认: 50)
FlushInterval time.Duration // 定时刷新的时间间隔 (默认: 50ms)
}

🎯 性能优化的默认值

基于性能基准测试,v2 版本采用了优化的默认配置:

  • BufferSize: 100 - 缓冲区大小,应该 >= FlushSize * 2 以避免阻塞
  • FlushSize: 50 - 批处理大小,性能测试显示 50 左右为最优
  • FlushInterval: 50ms - 刷新间隔,平衡延迟和吞吐量

下一步