Configuration Guide
This document provides detailed information about Go Pipeline v2 configuration parameters and best practices.
Configuration Structure
type PipelineConfig struct {
BufferSize uint32 // Buffer channel capacity
FlushSize uint32 // Maximum capacity of batch processing data
FlushInterval time.Duration // Time interval for timed refresh
}
Default Configuration
Based on performance benchmarks, Go Pipeline v2 provides optimized default configuration:
const (
defaultBufferSize = 100 // Buffer size
defaultFlushSize = 50 // Batch size
defaultFlushInterval = time.Millisecond * 50 // Flush interval
)
Using Default Configuration
You can use the NewPipelineConfig()
function to create configuration with default values, then customize specific parameters:
// Create configuration with default values
config := gopipeline.NewPipelineConfig()
// Use default values directly
pipeline := gopipeline.NewStandardPipeline(config, flushFunc)
// Or use chaining methods to customize specific parameters
config = gopipeline.NewPipelineConfig().
WithFlushInterval(time.Millisecond * 10).
WithBufferSize(200)
pipeline = gopipeline.NewStandardPipeline(config, flushFunc)
Available configuration methods:
NewPipelineConfig()
- Create configuration with default valuesWithBufferSize(size uint32)
- Set buffer sizeWithFlushSize(size uint32)
- Set batch sizeWithFlushInterval(interval time.Duration)
- Set flush interval
Configuration Parameters Details
BufferSize (Buffer Size)
Purpose: Controls the buffer size of internal data channel
Default Value: 100
Recommended Values:
- Should be >= FlushSize * 2 to avoid blocking
- Can be appropriately increased for high concurrency scenarios
standardConfig := gopipeline.PipelineConfig{
BufferSize: 200, // Recommended 2-4 times FlushSize
FlushSize: 50, // Standard batch size
FlushInterval: time.Millisecond * 50, // Standard flush interval
}
Impact:
- Too small: May cause write blocking
- Too large: Increases memory usage and delays shutdown time
FlushSize (Batch Size)
Purpose: Controls the amount of data in each batch processing
Default Value: 50
Recommended Values:
- General scenarios: 20-100
- High throughput scenarios: 100-500
- Low latency scenarios: 10-50
// Configuration examples for different scenarios
// High throughput scenario
highThroughputConfig := gopipeline.PipelineConfig{
BufferSize: 400, // Buffer size 2x FlushSize
FlushSize: 200, // Large batch processing
FlushInterval: time.Millisecond * 100, // Moderate interval
}
// Low latency scenario
lowLatencyConfig := gopipeline.PipelineConfig{
BufferSize: 50, // Small buffer
FlushSize: 20, // Small batch processing
FlushInterval: time.Millisecond * 10, // Short interval
}
Impact:
- Too small: Increases processing frequency, reduces throughput
- Too large: Increases latency and memory usage
FlushInterval (Flush Interval)
Purpose: Controls the time interval for timed refresh
Default Value: 50ms
Recommended Values:
- Low latency scenarios: 10-50ms
- Balanced scenarios: 50-200ms
- High throughput scenarios: 200ms-1s
// Configuration examples for different scenarios
// Low latency scenario
lowLatencyConfig := gopipeline.PipelineConfig{
BufferSize: 50, // Small buffer
FlushSize: 10, // Small batch
FlushInterval: time.Millisecond * 10, // Very short interval
}
// High throughput scenario
highThroughputConfig := gopipeline.PipelineConfig{
BufferSize: 1000, // Large buffer
FlushSize: 500, // Large batch
FlushInterval: time.Second, // Long interval
}
Impact:
- Too small: Increases CPU usage, may cause frequent small batch processing
- Too large: Increases data processing latency
Scenario-Based Configuration
Database Batch Insert
// Database batch insert optimization configuration
dbConfig := gopipeline.PipelineConfig{
BufferSize: 500, // Larger buffer
FlushSize: 100, // Moderate batch size
FlushInterval: time.Millisecond * 200, // Moderate latency
}
pipeline := gopipeline.NewStandardPipeline(dbConfig,
func(ctx context.Context, records []Record) error {
return db.CreateInBatches(records, len(records)).Error
},
)
API Call Batch Processing
// API call batch processing configuration
apiConfig := gopipeline.PipelineConfig{
BufferSize: 100, // Moderate buffer
FlushSize: 20, // Smaller batch (avoid API limits)
FlushInterval: time.Millisecond * 50, // Low latency
}
pipeline := gopipeline.NewStandardPipeline(apiConfig,
func(ctx context.Context, requests []APIRequest) error {
return batchCallAPI(requests)
},
)
Log Batch Writing
// Log batch writing configuration
logConfig := gopipeline.PipelineConfig{
BufferSize: 1000, // Large buffer (high log volume)
FlushSize: 200, // Large batch
FlushInterval: time.Millisecond * 100, // Moderate latency
}
pipeline := gopipeline.NewStandardPipeline(logConfig,
func(ctx context.Context, logs []LogEntry) error {
return writeLogsToFile(logs)
},
)
Real-time Data Processing
// Real-time data processing configuration
realtimeConfig := gopipeline.PipelineConfig{
BufferSize: 50, // Small buffer
FlushSize: 10, // Small batch
FlushInterval: time.Millisecond * 10, // Very low latency
}
pipeline := gopipeline.NewStandardPipeline(realtimeConfig,
func(ctx context.Context, events []Event) error {
return processRealTimeEvents(events)
},
)
Performance Tuning Guide
1. Determine Performance Goals
First clarify your performance goals:
- Throughput Priority: Increase FlushSize and FlushInterval
- Latency Priority: Decrease FlushSize and FlushInterval
- Memory Priority: Decrease BufferSize and FlushSize
2. Benchmarking
Use benchmarks to verify configuration effectiveness:
func BenchmarkPipelineConfig(b *testing.B) {
configs := []gopipeline.PipelineConfig{
{BufferSize: 50, FlushSize: 25, FlushInterval: time.Millisecond * 25},
{BufferSize: 100, FlushSize: 50, FlushInterval: time.Millisecond * 50},
{BufferSize: 200, FlushSize: 100, FlushInterval: time.Millisecond * 100},
}
for i, config := range configs {
b.Run(fmt.Sprintf("Config%d", i), func(b *testing.B) {
pipeline := gopipeline.NewStandardPipeline(config,
func(ctx context.Context, data []int) error {
// Simulate processing
time.Sleep(time.Microsecond * 100)
return nil
})
// Benchmark logic...
})
}
}
3. Monitor Metrics
Monitor key metrics to optimize configuration:
type PipelineMetrics struct {
TotalProcessed int64
BatchCount int64
AverageLatency time.Duration
ErrorCount int64
MemoryUsage int64
}
func monitorPipeline(pipeline Pipeline[Data]) {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for range ticker.C {
// Collect and record metrics
metrics := collectMetrics(pipeline)
log.Printf("Pipeline Metrics: %+v", metrics)
// Adjust configuration based on metrics
if metrics.AverageLatency > time.Millisecond*100 {
// Consider reducing batch size or interval
}
}
}
Configuration Validation
Configuration Reasonableness Check
func validateConfig(config gopipeline.PipelineConfig) error {
if config.BufferSize < config.FlushSize*2 {
return fmt.Errorf("BufferSize (%d) should be at least 2x FlushSize (%d)",
config.BufferSize, config.FlushSize)
}
if config.FlushSize == 0 {
return fmt.Errorf("FlushSize cannot be zero")
}
if config.FlushInterval <= 0 {
return fmt.Errorf("FlushInterval must be positive")
}
return nil
}
Dynamic Configuration Adjustment
type DynamicPipeline struct {
pipeline Pipeline[Data]
config gopipeline.PipelineConfig
mutex sync.RWMutex
}
func (dp *DynamicPipeline) UpdateConfig(newConfig gopipeline.PipelineConfig) error {
if err := validateConfig(newConfig); err != nil {
return err
}
dp.mutex.Lock()
defer dp.mutex.Unlock()
// Recreate pipeline (actual implementation may need more complex logic)
dp.config = newConfig
// dp.pipeline = recreatePipeline(newConfig)
return nil
}
Common Issues and Solutions
Issue 1: High Data Processing Latency
Symptoms: Time from data addition to processing completion is too long
Possible Causes:
- FlushInterval set too large
- FlushSize set too large
- Processing function execution time too long
Solutions:
// Reduce flush interval and batch size
lowLatencyConfig := gopipeline.PipelineConfig{
BufferSize: 50, // Buffer adapted to small batches
FlushSize: 20, // Reduce batch size
FlushInterval: time.Millisecond * 10, // Reduce interval
}
Issue 2: High Memory Usage
Symptoms: Program memory usage continues to grow
Possible Causes:
- BufferSize set too large
- FlushSize set too large (especially for deduplication pipeline)
- Error channel not being consumed
Solutions:
// Reduce buffer and batch size
memoryOptimizedConfig := gopipeline.PipelineConfig{
BufferSize: 50, // Reduce buffer
FlushSize: 25, // Reduce batch size
FlushInterval: time.Millisecond * 50, // Keep moderate interval
}
// Ensure error channel consumption
errorChan := pipeline.ErrorChan(10)
go func() {
for {
select {
case err, ok := <-errorChan:
if !ok {
return
}
log.Printf("Error: %v", err)
case <-ctx.Done():
return
}
}
}()
Issue 3: Insufficient Throughput
Symptoms: Data processing speed cannot keep up with data generation speed
Possible Causes:
- FlushSize set too small
- FlushInterval set too small
- BufferSize set too small causing blocking
Solutions:
// Increase batch size and buffer
highThroughputConfig := gopipeline.PipelineConfig{
BufferSize: 500, // Increase buffer
FlushSize: 100, // Increase batch size
FlushInterval: time.Millisecond * 100, // Moderate interval
}
Best Practices Summary
- Start with Default Configuration: Default configuration suits most scenarios
- Adjust Based on Actual Needs: Adjust according to latency, throughput, memory requirements
- Perform Benchmarking: Use actual data for performance testing
- Monitor Key Metrics: Continuously monitor performance metrics
- Configuration Validation: Ensure configuration parameter reasonableness
- Document Configuration: Record reasons for configuration choices and test results
Next Steps
- API Reference - Complete API documentation
- Standard Pipeline - Standard pipeline usage guide
- Deduplication Pipeline - Deduplication pipeline usage guide