API 参考
本文档提供 Go Pipeline v2 的完整 API 参考。
核心接口
Pipeline[T any]
主要的管道接口,组合了所有管道功能。
type Pipeline[T any] interface {
PipelineChannel[T]
Performer[T]
DataProcessor[T]
}
PipelineChannel[T any]
定义管道通道访问接口。
type PipelineChannel[T any] interface {
// DataChan 返回一个可写的通道,用于将数据添加到管道中
DataChan() chan<- T
// ErrorChan 返回一个只读的通道,用于接收管道中的错误信息
ErrorChan(size int) <-chan error
}
DataChan()
返回数据输入通道。
返回值: chan<- T
- 只写通道,用于添加数据
使用示例:
dataChan := pipeline.DataChan()
dataChan <- "some data"
close(dataChan) // 完成后关闭通道
ErrorChan(size int)
返回错误输出通道。
参数:
size int
- 错误通道的缓冲区大小
返回值: <-chan error
- 只读通道,用于接收错误
使用示例:
errorChan := pipeline.ErrorChan(10)
go func() {
for err := range errorChan {
log.Printf("Pipeline error: %v", err)
}
}()
Performer[T any]
定义执行管道操作的接口。
type Performer[T any] interface {
// AsyncPerform 异步执行管道操作
AsyncPerform(ctx context.Context) error
// SyncPerform 同步执行管道操作
SyncPerform(ctx context.Context) error
}
AsyncPerform(ctx context.Context)
异步执行管道操作,不阻塞调用线程。
参数:
ctx context.Context
- 上下文对象,用于控制操作生命周期
返回值: error
- 如果ctx被取消则返回error
使用示例:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
go func() {
if err := pipeline.AsyncPerform(ctx); err != nil {
log.Printf("Pipeline execution error: %v", err)
}
}()
SyncPerform(ctx context.Context)
同步执行管道操作,阻塞直到完成或取消。
参数:
ctx context.Context
- 上下文对象
返回值: error
- 执行错误或取消错误
使用示例:
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := pipeline.SyncPerform(ctx); err != nil {
log.Printf("Pipeline execution error: %v", err)
}
DataProcessor[T any]
定义批处理数据的核心接口(主要用于内部实现)。
type DataProcessor[T any] interface {
initBatchData() any
addToBatch(batchData any, data T) any
flush(ctx context.Context, batchData any) error
isBatchFull(batchData any) bool
isBatchEmpty(batchData any) bool
}
配置类型
PipelineConfig
管道配置结构体。
type PipelineConfig struct {
BufferSize uint32 // 缓冲通道的容量 (默认: 100)
FlushSize uint32 // 批处理数据的最大容量 (默认: 50)
FlushInterval time.Duration // 定时刷新的时间间隔 (默认: 50ms)
}
字段说明:
BufferSize
: 内部数据通道的缓冲区大小FlushSize
: 每次批处理的最大数据量FlushInterval
: 定时触发批处理的时间间隔
标准管道 API
类型定义
type FlushStandardFunc[T any] func(ctx context.Context, batchData []T) error
type StandardPipeline[T any] struct {
*PipelineImpl[T]
flushFunc FlushStandardFunc[T]
}
构造函数
NewDefaultStandardPipeline[T any]
使用默认配置创建标准管道。
func NewDefaultStandardPipeline[T any](
flushFunc FlushStandardFunc[T],
) *StandardPipeline[T]
参数:
flushFunc FlushStandardFunc[T]
- 批处理函数
返回值: *StandardPipeline[T]
- 标准管道实例
使用示例:
pipeline := gopipeline.NewDefaultStandardPipeline(
func(ctx context.Context, batchData []string) error {
fmt.Printf("Processing %d items: %v\n", len(batchData), batchData)
return nil
},
)
NewStandardPipeline[T any]
使用自定义配置创建标准管道。
func NewStandardPipeline[T any](
config PipelineConfig,
flushFunc FlushStandardFunc[T],
) *StandardPipeline[T]
参数:
config PipelineConfig
- 自定义配置flushFunc FlushStandardFunc[T]
- 批处理函数
返回值: *StandardPipeline[T]
- 标准管道实例
使用示例:
standardConfig := gopipeline.PipelineConfig{
BufferSize: 200,
FlushSize: 100,
FlushInterval: time.Millisecond * 100,
}
pipeline := gopipeline.NewStandardPipeline(standardConfig,
func(ctx context.Context, batchData []string) error {
return processData(batchData)
},
)
去重管道 API
类型定义
type KeyFunc[T any] func(T) string
type FlushDeduplicationFunc[T any] func(ctx context.Context, batchData []T) error
type DeduplicationPipeline[T any] struct {
*PipelineImpl[T]
keyFunc KeyFunc[T]
flushFunc FlushDeduplicationFunc[T]
}
构造函数
NewDefaultDeduplicationPipeline[T any]
使用默认配置创建去重管道。
func NewDefaultDeduplicationPipeline[T any](
keyFunc KeyFunc[T],
flushFunc FlushDeduplicationFunc[T],
) *DeduplicationPipeline[T]
参数:
keyFunc KeyFunc[T]
- 唯一键生成函数flushFunc FlushDeduplicationFunc[T]
- 批处理函数
返回值: *DeduplicationPipeline[T]
- 去重管道实例
使用示例:
pipeline := gopipeline.NewDefaultDeduplicationPipeline(
func(user User) string {
return user.Email // 使用邮箱作为唯一键
},
func(ctx context.Context, users []User) error {
return processUsers(users)
},
)
NewDeduplicationPipeline[T any]
使用自定义配置创建去重管道。
func NewDeduplicationPipeline[T any](
config PipelineConfig,
keyFunc KeyFunc[T],
flushFunc FlushDeduplicationFunc[T],
) *DeduplicationPipeline[T]
参数:
config PipelineConfig
- 自定义配置keyFunc KeyFunc[T]
- 唯一键生成函数flushFunc FlushDeduplicationFunc[T]
- 批处理函数
返回值: *DeduplicationPipeline[T]
- 去重管道实例
使用示例:
deduplicationConfig := gopipeline.PipelineConfig{
BufferSize: 100,
FlushSize: 50,
FlushInterval: time.Millisecond * 100,
}
pipeline := gopipeline.NewDeduplicationPipeline(deduplicationConfig,
func(product Product) string {
return fmt.Sprintf("%s-%s", product.SKU, product.Version)
},
func(ctx context.Context, products []Product) error {
return updateProducts(products)
},
)