Go Concurrency Patterns Series: ← Select Statement | Series Overview | Fan-Out/Fan-In →
What is the Pipeline Pattern?
The Pipeline pattern is a powerful way to structure concurrent data processing by breaking work into stages connected by channels. Each stage runs in its own goroutine, receives data from an input channel, processes it, and sends results to an output channel. This creates a chain of processing stages that can run concurrently, dramatically improving throughput.
Think of it like an assembly line in a factory - each worker (stage) performs a specific task and passes the work to the next worker. While one worker processes item N, the previous worker can already be working on item N+1, creating parallel processing across the entire pipeline.
The Problem: Sequential Processing Bottlenecks
Without pipelines, data processing is often sequential and inefficient:
// ❌ SEQUENTIAL: Inefficient processing
func sequentialProcessing(data []string) []string {
var results []string
for _, item := range data {
// Stage 1: Validate
if !validate(item) {
continue
}
// Stage 2: Transform
transformed := transform(item)
// Stage 3: Enrich
enriched := enrich(transformed)
// Stage 4: Format
formatted := format(enriched)
results = append(results, formatted)
}
return results
}
This approach processes one item completely before starting the next, wasting CPU cycles and creating bottlenecks.
Basic Pipeline Implementation
Here’s how to implement a concurrent pipeline:
package main
import (
"fmt"
"strings"
"time"
)
// Stage 1: Generate data
func generator(data []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, item := range data {
out <- item
}
}()
return out
}
// Stage 2: Validate data
func validator(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
// Simulate validation work
time.Sleep(50 * time.Millisecond)
if len(item) > 0 && !strings.Contains(item, "invalid") {
out <- item
}
}
}()
return out
}
// Stage 3: Transform data
func transformer(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
// Simulate transformation work
time.Sleep(100 * time.Millisecond)
transformed := strings.ToUpper(item)
out <- transformed
}
}()
return out
}
// Stage 4: Format data
func formatter(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
// Simulate formatting work
time.Sleep(75 * time.Millisecond)
formatted := fmt.Sprintf("[PROCESSED: %s]", item)
out <- formatted
}
}()
return out
}
func basicPipeline() {
// Input data
data := []string{
"hello", "world", "invalid", "golang",
"pipeline", "concurrent", "processing",
}
fmt.Println("🚀 Starting pipeline processing...")
start := time.Now()
// Build the pipeline
stage1 := generator(data)
stage2 := validator(stage1)
stage3 := transformer(stage2)
stage4 := formatter(stage3)
// Consume results
var results []string
for result := range stage4 {
results = append(results, result)
fmt.Printf("✅ %s\n", result)
}
duration := time.Since(start)
fmt.Printf("\n📊 Processed %d items in %v\n", len(results), duration)
fmt.Printf("📈 Average time per item: %v\n", duration/time.Duration(len(results)))
}
Real-World Example: Image Processing Pipeline
Let’s build a more sophisticated pipeline for processing images:
package main
import (
"crypto/md5"
"fmt"
"image"
"image/jpeg"
"image/png"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
type ImageJob struct {
ID int
Filename string
Path string
}
type ProcessedImage struct {
Job ImageJob
Image image.Image
Metadata map[string]interface{}
Error error
Duration time.Duration
}
type ImagePipeline struct {
maxWorkers int
bufferSize int
}
func NewImagePipeline(maxWorkers, bufferSize int) *ImagePipeline {
return &ImagePipeline{
maxWorkers: maxWorkers,
bufferSize: bufferSize,
}
}
// Stage 1: File Discovery
func (ip *ImagePipeline) fileDiscovery(directories []string) <-chan ImageJob {
jobs := make(chan ImageJob, ip.bufferSize)
go func() {
defer close(jobs)
jobID := 1
for _, dir := range directories {
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors
}
if !info.IsDir() && isImageFile(path) {
jobs <- ImageJob{
ID: jobID,
Filename: info.Name(),
Path: path,
}
jobID++
}
return nil
})
if err != nil {
fmt.Printf("Error walking directory %s: %v\n", dir, err)
}
}
}()
return jobs
}
// Stage 2: Image Loading
func (ip *ImagePipeline) imageLoader(jobs <-chan ImageJob) <-chan ProcessedImage {
results := make(chan ProcessedImage, ip.bufferSize)
var wg sync.WaitGroup
for i := 0; i < ip.maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs {
start := time.Now()
result := ProcessedImage{
Job: job,
Metadata: make(map[string]interface{}),
Duration: 0,
}
// Load image
file, err := os.Open(job.Path)
if err != nil {
result.Error = fmt.Errorf("failed to open file: %v", err)
result.Duration = time.Since(start)
results <- result
continue
}
img, format, err := image.Decode(file)
file.Close()
if err != nil {
result.Error = fmt.Errorf("failed to decode image: %v", err)
} else {
result.Image = img
result.Metadata["format"] = format
result.Metadata["dimensions"] = fmt.Sprintf("%dx%d",
img.Bounds().Dx(), img.Bounds().Dy())
}
result.Duration = time.Since(start)
results <- result
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
// Stage 3: Image Analysis
func (ip *ImagePipeline) imageAnalyzer(images <-chan ProcessedImage) <-chan ProcessedImage {
results := make(chan ProcessedImage, ip.bufferSize)
var wg sync.WaitGroup
for i := 0; i < ip.maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for processed := range images {
if processed.Error != nil {
results <- processed
continue
}
start := time.Now()
// Analyze image properties
bounds := processed.Image.Bounds()
width := bounds.Dx()
height := bounds.Dy()
processed.Metadata["width"] = width
processed.Metadata["height"] = height
processed.Metadata["aspect_ratio"] = float64(width) / float64(height)
processed.Metadata["pixels"] = width * height
// Calculate average color (simplified)
processed.Metadata["avg_color"] = calculateAverageColor(processed.Image)
// Generate hash for duplicate detection
processed.Metadata["hash"] = generateImageHash(processed.Image)
processed.Duration += time.Since(start)
results <- processed
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
// Stage 4: Result Aggregation
func (ip *ImagePipeline) resultAggregator(processed <-chan ProcessedImage) <-chan []ProcessedImage {
results := make(chan []ProcessedImage, 1)
go func() {
defer close(results)
var allResults []ProcessedImage
duplicates := make(map[string][]ProcessedImage)
for result := range processed {
allResults = append(allResults, result)
if result.Error == nil {
if hash, ok := result.Metadata["hash"].(string); ok {
duplicates[hash] = append(duplicates[hash], result)
}
}
}
// Mark duplicates
for hash, images := range duplicates {
if len(images) > 1 {
for i := range images {
for j := range allResults {
if allResults[j].Job.ID == images[i].Job.ID {
allResults[j].Metadata["is_duplicate"] = true
allResults[j].Metadata["duplicate_group"] = hash
break
}
}
}
}
}
results <- allResults
}()
return results
}
func (ip *ImagePipeline) Process(directories []string) []ProcessedImage {
// Build the pipeline
jobs := ip.fileDiscovery(directories)
loaded := ip.imageLoader(jobs)
analyzed := ip.imageAnalyzer(loaded)
aggregated := ip.resultAggregator(analyzed)
// Get final results
results := <-aggregated
return results
}
// Helper functions
func isImageFile(filename string) bool {
ext := strings.ToLower(filepath.Ext(filename))
return ext == ".jpg" || ext == ".jpeg" || ext == ".png" || ext == ".gif"
}
func calculateAverageColor(img image.Image) string {
bounds := img.Bounds()
var r, g, b uint32
pixelCount := 0
// Sample every 10th pixel for performance
for y := bounds.Min.Y; y < bounds.Max.Y; y += 10 {
for x := bounds.Min.X; x < bounds.Max.X; x += 10 {
pr, pg, pb, _ := img.At(x, y).RGBA()
r += pr >> 8
g += pg >> 8
b += pb >> 8
pixelCount++
}
}
if pixelCount == 0 {
return "#000000"
}
return fmt.Sprintf("#%02x%02x%02x",
r/uint32(pixelCount),
g/uint32(pixelCount),
b/uint32(pixelCount))
}
func generateImageHash(img image.Image) string {
// Simple hash based on image dimensions and sample pixels
bounds := img.Bounds()
hash := md5.New()
// Include dimensions
hash.Write([]byte(fmt.Sprintf("%dx%d", bounds.Dx(), bounds.Dy())))
// Sample a few pixels
for y := bounds.Min.Y; y < bounds.Max.Y; y += bounds.Dy() / 8 {
for x := bounds.Min.X; x < bounds.Max.X; x += bounds.Dx() / 8 {
r, g, b, a := img.At(x, y).RGBA()
hash.Write([]byte{byte(r >> 8), byte(g >> 8), byte(b >> 8), byte(a >> 8)})
}
}
return fmt.Sprintf("%x", hash.Sum(nil))[:16]
}
func createSampleImages() {
// Create sample directory and images for demonstration
os.MkdirAll("sample_images", 0755)
// This would normally create actual image files
// For demo purposes, we'll just create empty files with image extensions
sampleFiles := []string{
"sample_images/photo1.jpg",
"sample_images/photo2.png",
"sample_images/photo3.jpg",
"sample_images/duplicate.jpg",
}
for _, filename := range sampleFiles {
file, _ := os.Create(filename)
file.Close()
}
}
func cleanupSampleImages() {
os.RemoveAll("sample_images")
}
func main() {
// Create sample images for demonstration
createSampleImages()
defer cleanupSampleImages()
pipeline := NewImagePipeline(3, 10)
fmt.Println("🖼️ Starting image processing pipeline...")
start := time.Now()
results := pipeline.Process([]string{"sample_images"})
totalDuration := time.Since(start)
// Display results
fmt.Printf("\n📊 Pipeline Results (completed in %v):\n", totalDuration)
fmt.Println(strings.Repeat("-", 60))
successCount := 0
errorCount := 0
duplicateCount := 0
for _, result := range results {
if result.Error != nil {
fmt.Printf("❌ %s: %v (processed in %v)\n",
result.Job.Filename, result.Error, result.Duration)
errorCount++
} else {
isDuplicate := result.Metadata["is_duplicate"] == true
if isDuplicate {
duplicateCount++
}
fmt.Printf("✅ %s: %s, %s, avg_color: %s%s (processed in %v)\n",
result.Job.Filename,
result.Metadata["format"],
result.Metadata["dimensions"],
result.Metadata["avg_color"],
func() string {
if isDuplicate {
return " [DUPLICATE]"
}
return ""
}(),
result.Duration)
successCount++
}
}
fmt.Println(strings.Repeat("-", 60))
fmt.Printf("📈 Summary: %d successful, %d errors, %d duplicates\n",
successCount, errorCount, duplicateCount)
fmt.Printf("⚡ Average processing time: %v per image\n",
totalDuration/time.Duration(len(results)))
}
Pipeline Patterns and Variations
1. Buffered Pipeline for Throughput
func bufferedPipeline() {
// Create pipeline with different buffer sizes for optimization
stage1 := func(input []int) <-chan int {
out := make(chan int, 10) // Large buffer for fast producer
go func() {
defer close(out)
for _, val := range input {
out <- val
}
}()
return out
}
stage2 := func(in <-chan int) <-chan int {
out := make(chan int, 5) // Medium buffer
go func() {
defer close(out)
for val := range in {
time.Sleep(10 * time.Millisecond) // Simulate work
out <- val * 2
}
}()
return out
}
stage3 := func(in <-chan int) <-chan int {
out := make(chan int, 1) // Small buffer for final stage
go func() {
defer close(out)
for val := range in {
time.Sleep(5 * time.Millisecond) // Simulate work
out <- val + 1
}
}()
return out
}
// Run pipeline
input := make([]int, 20)
for i := range input {
input[i] = i + 1
}
start := time.Now()
results := stage3(stage2(stage1(input)))
var output []int
for result := range results {
output = append(output, result)
}
fmt.Printf("Buffered pipeline processed %d items in %v\n",
len(output), time.Since(start))
}
2. Error Handling in Pipelines
type Result struct {
Value int
Error error
}
func errorHandlingPipeline() {
// Stage that might fail
riskyStage := func(in <-chan int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for val := range in {
time.Sleep(10 * time.Millisecond)
if val%7 == 0 {
out <- Result{Error: fmt.Errorf("unlucky number: %d", val)}
} else {
out <- Result{Value: val * 2}
}
}
}()
return out
}
// Error filtering stage
errorFilter := func(in <-chan Result) (<-chan int, <-chan error) {
values := make(chan int)
errors := make(chan error)
go func() {
defer close(values)
defer close(errors)
for result := range in {
if result.Error != nil {
errors <- result.Error
} else {
values <- result.Value
}
}
}()
return values, errors
}
// Input generator
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
}
}()
// Build pipeline
results := riskyStage(input)
values, errors := errorFilter(results)
// Collect results and errors
var successValues []int
var errorList []error
done := make(chan bool, 2)
// Collect successful values
go func() {
for val := range values {
successValues = append(successValues, val)
}
done <- true
}()
// Collect errors
go func() {
for err := range errors {
errorList = append(errorList, err)
}
done <- true
}()
// Wait for both collectors
<-done
<-done
fmt.Printf("Pipeline completed: %d successes, %d errors\n",
len(successValues), len(errorList))
for _, err := range errorList {
fmt.Printf("Error: %v\n", err)
}
}
3. Cancellable Pipeline with Context
func cancellablePipeline(ctx context.Context) {
// Context-aware stage
contextStage := func(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case val, ok := <-in:
if !ok {
return
}
// Simulate work
select {
case <-time.After(100 * time.Millisecond):
select {
case out <- val * 2:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
// Input generator
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 100; i++ {
select {
case input <- i:
case <-ctx.Done():
return
}
}
}()
// Build pipeline
stage1 := contextStage(ctx, input)
stage2 := contextStage(ctx, stage1)
// Consume with cancellation
count := 0
for {
select {
case val, ok := <-stage2:
if !ok {
fmt.Printf("Pipeline completed normally, processed %d items\n", count)
return
}
count++
fmt.Printf("Processed: %d\n", val)
case <-ctx.Done():
fmt.Printf("Pipeline cancelled after processing %d items\n", count)
return
}
}
}
func demonstrateCancellation() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fmt.Println("Starting cancellable pipeline with 2-second timeout...")
cancellablePipeline(ctx)
}
Pipeline Performance Optimization
1. Benchmarking Pipeline Stages
func benchmarkPipeline() {
measureStage := func(name string, stage func(<-chan int) <-chan int, input <-chan int) <-chan int {
start := time.Now()
output := stage(input)
// Consume all output to measure complete stage time
var results []int
for val := range output {
results = append(results, val)
}
duration := time.Since(start)
fmt.Printf("Stage %s: processed %d items in %v (%.2f items/sec)\n",
name, len(results), duration,
float64(len(results))/duration.Seconds())
// Return results as new channel for next stage
out := make(chan int, len(results))
go func() {
defer close(out)
for _, val := range results {
out <- val
}
}()
return out
}
// Define stages
fastStage := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
out <- val * 2 // Fast operation
}
}()
return out
}
slowStage := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
time.Sleep(1 * time.Millisecond) // Slow operation
out <- val + 1
}
}()
return out
}
// Create input
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 1000; i++ {
input <- i
}
}()
// Benchmark each stage
fmt.Println("Pipeline stage benchmarks:")
stage1Out := measureStage("Fast", fastStage, input)
stage2Out := measureStage("Slow", slowStage, stage1Out)
// Final consumption
count := 0
for range stage2Out {
count++
}
fmt.Printf("Final output: %d items\n", count)
}
2. Dynamic Pipeline Scaling
type ScalablePipeline struct {
workerCount int
bufferSize int
}
func (sp *ScalablePipeline) adaptiveStage(in <-chan int) <-chan int {
out := make(chan int, sp.bufferSize)
var wg sync.WaitGroup
for i := 0; i < sp.workerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for val := range in {
// Simulate variable processing time
processingTime := time.Duration(val%10) * time.Millisecond
time.Sleep(processingTime)
out <- val * 2
}
}(i)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func scalablePipelineDemo() {
// Test different scaling configurations
configs := []struct {
workers int
buffer int
}{
{1, 10},
{2, 10},
{4, 10},
{8, 10},
}
for _, config := range configs {
pipeline := &ScalablePipeline{
workerCount: config.workers,
bufferSize: config.buffer,
}
// Create input
input := make(chan int, 100)
go func() {
defer close(input)
for i := 1; i <= 100; i++ {
input <- i
}
}()
start := time.Now()
output := pipeline.adaptiveStage(input)
count := 0
for range output {
count++
}
duration := time.Since(start)
fmt.Printf("Workers: %d, Buffer: %d -> %d items in %v (%.2f items/sec)\n",
config.workers, config.buffer, count, duration,
float64(count)/duration.Seconds())
}
}
Best Practices
1. Pipeline Design Guidelines
// ✅ GOOD: Well-designed pipeline stage
func goodPipelineStage(in <-chan Input) <-chan Output {
out := make(chan Output, bufferSize) // Appropriate buffer
go func() {
defer close(out) // Always close output channel
for input := range in { // Use range for clean channel reading
// Process input
result := processInput(input)
// Handle backpressure
select {
case out <- result:
// Successfully sent
case <-time.After(timeout):
// Handle slow consumers
log.Printf("Warning: slow consumer detected")
out <- result // Still try to send
}
}
}()
return out
}
2. Resource Management
func resourceManagedPipeline() {
// Use sync.Pool for expensive objects
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
processStage := func(in <-chan []byte) <-chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for data := range in {
// Get buffer from pool
buffer := bufferPool.Get().([]byte)
// Process data using buffer
processed := processWithBuffer(data, buffer)
// Return buffer to pool
bufferPool.Put(buffer)
out <- processed
}
}()
return out
}
// Use the stage...
}
func processWithBuffer(data, buffer []byte) []byte {
// Simulate processing that uses the buffer
copy(buffer, data)
return buffer[:len(data)]
}
Testing Pipeline Code
func TestPipelineStage(t *testing.T) {
// Create test input
input := make(chan int, 5)
expected := []int{2, 4, 6, 8, 10}
go func() {
defer close(input)
for i := 1; i <= 5; i++ {
input <- i
}
}()
// Test the stage
doubler := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
out <- val * 2
}
}()
return out
}
output := doubler(input)
// Collect results
var results []int
for val := range output {
results = append(results, val)
}
// Verify results
if !reflect.DeepEqual(results, expected) {
t.Errorf("Expected %v, got %v", expected, results)
}
}
func TestPipelineTimeout(t *testing.T) {
input := make(chan int)
slowStage := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
time.Sleep(100 * time.Millisecond) // Slow processing
out <- val
}
}()
return out
}
go func() {
input <- 1
close(input)
}()
output := slowStage(input)
// Test with timeout
select {
case result := <-output:
if result != 1 {
t.Errorf("Expected 1, got %d", result)
}
case <-time.After(200 * time.Millisecond):
// Should complete within timeout
}
}
Conclusion
The Pipeline pattern provides:
- Concurrent processing across multiple stages
- Improved throughput through parallelism
- Modular design with composable stages
- Efficient resource utilization with proper buffering
Key Takeaways
- Break work into stages - each stage should have a single responsibility
- Use appropriate buffer sizes - balance memory usage with throughput
- Handle errors gracefully - don’t let one failure stop the entire pipeline
- Support cancellation - use context for clean shutdown
- Monitor performance - identify and optimize bottleneck stages
Pipeline Use Cases
- Data processing: ETL operations, log analysis, stream processing
- Image/video processing: Multi-stage transformations and filters
- Network operations: Request processing, data validation, response formatting
- File operations: Reading, transforming, and writing large datasets
What’s Next?
Now that you understand pipelines, the next pattern builds upon this foundation: Fan-Out/Fan-In. This pattern allows you to distribute work across multiple workers and then collect results, providing even more parallelism and scalability.
In the next post, we’ll explore Fan-Out/Fan-In and learn how to scale pipeline stages horizontally for maximum performance.
This post is part of the Go Concurrency Patterns series. The Pipeline pattern is fundamental to many data processing architectures and forms the basis for more advanced patterns.