v2.2.2 新功能
Go Pipeline v2.2.2 专注于开发者体验和关闭稳健性的提升,引入了多项重要功能。
🚀 主要亮点
便捷API
新增 Start() 和 Run() 方法,大幅减少样板代码:
// 异步启动 - 新方式
done, errs := pipeline.Start(ctx)
go func() {
for err := range errs {
log.Printf("错误: %v", err)
}
}()
<-done
// 同步运行 - 新方式
if err := pipeline.Run(ctx, 128); err != nil {
log.Printf("执行错误: %v", err)
}
动态参数调整
支持运行时安全调整关键参数:
// 根据系统负载动态调整
if systemLoad > 0.8 {
pipeline.UpdateFlushSize(25) // 减小批次大小
pipeline.UpdateFlushInterval(25 * time.Millisecond) // 增加刷新频率
} else {
pipeline.UpdateFlushSize(50) // 标准批次大小
pipeline.UpdateFlushInterval(50 * time.Millisecond) // 标准刷新间隔
}
优雅关闭增强
取消时限时收尾
config := gopipeline.NewPipelineConfig().
WithDrainOnCancel(true). // 启用取消收尾
WithDrainGracePeriod(150 * time.Millisecond) // 收尾时间窗口
pipeline := gopipeline.NewStandardPipeline(config, flushFunc)
// 当上下文取消时,管道会在限定时间内尽力flush当前批次
最终flush超时保护
config := gopipeline.NewPipelineConfig().
WithFinalFlushOnCloseTimeout(500 * time.Millisecond) // 最终flush超时
// 当数据通道关闭时,最终flush会受到超时保护
并发控制
限制异步flush的并发数:
config := gopipeline.NewPipelineConfig().
WithMaxConcurrentFlushes(uint32(runtime.NumCPU())) // 限制并发数
// 防止过多并发flush导致资源耗尽