Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern →


What is the Fan-Out/Fan-In Pattern?

The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently.

Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination

Real-World Use Cases

  • Image Processing: Resize multiple images in parallel
  • Data Processing: Process large datasets across multiple workers
  • API Calls: Make multiple HTTP requests concurrently
  • File Operations: Process multiple files simultaneously
  • Database Queries: Execute multiple queries in parallel

Basic Fan-Out/Fan-In Implementation

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    
    for i := 0; i < workers; i++ {
        output := make(chan int)
        outputs[i] = output
        
        go func(out chan<- int) {
            defer close(out)
            for n := range input {
                // Simulate work
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                out <- n * n // Square the number
            }
        }(output)
    }
    
    return outputs
}

// Fan-in: collect results from multiple workers
func fanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    // Start a goroutine for each input channel
    for _, input := range inputs {
        wg.Add(1)
        go func(in <-chan int) {
            defer wg.Done()
            for value := range in {
                output <- value
            }
        }(in)
    }
    
    // Close output channel when all inputs are done
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

func main() {
    // Create input channel
    input := make(chan int)
    
    // Start the pipeline
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()
    
    // Fan-out to 3 workers
    workers := fanOut(input, 3)
    
    // Fan-in results
    results := fanIn(workers...)
    
    // Collect results
    fmt.Println("Results:")
    for result := range results {
        fmt.Printf("Processed: %d\n", result)
    }
}

Advanced Fan-Out/Fan-In with Error Handling

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Result struct {
    Value int
    Error error
}

type Job struct {
    ID   int
    Data string
}

// Worker function that can fail
func processJob(ctx context.Context, job Job) Result {
    // Simulate work that might fail
    time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
    
    // Simulate random failures
    if rand.Float32() < 0.2 {
        return Result{Error: fmt.Errorf("failed to process job %d", job.ID)}
    }
    
    return Result{Value: job.ID * 10}
}

// Fan-out with context and error handling
func fanOutWithErrors(ctx context.Context, jobs <-chan Job, workers int) []<-chan Result {
    outputs := make([]<-chan Result, workers)
    
    for i := 0; i < workers; i++ {
        output := make(chan Result)
        outputs[i] = output
        
        go func(out chan<- Result, workerID int) {
            defer close(out)
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    result := processJob(ctx, job)
                    
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                case <-ctx.Done():
                    return
                }
            }
        }(output, i)
    }
    
    return outputs
}

// Fan-in with error collection
func fanInWithErrors(ctx context.Context, inputs ...<-chan Result) (<-chan Result, <-chan error) {
    output := make(chan Result)
    errors := make(chan error)
    var wg sync.WaitGroup
    
    for _, input := range inputs {
        wg.Add(1)
        go func(in <-chan Result) {
            defer wg.Done()
            for {
                select {
                case result, ok := <-in:
                    if !ok {
                        return
                    }
                    if result.Error != nil {
                        select {
                        case errors <- result.Error:
                        case <-ctx.Done():
                            return
                        }
                    } else {
                        select {
                        case output <- result:
                        case <-ctx.Done():
                            return
                        }
                    }
                case <-ctx.Done():
                    return
                }
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(output)
        close(errors)
    }()
    
    return output, errors
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // Create job channel
    jobs := make(chan Job)
    
    // Generate jobs
    go func() {
        defer close(jobs)
        for i := 1; i <= 20; i++ {
            select {
            case jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // Fan-out to 4 workers
    workers := fanOutWithErrors(ctx, jobs, 4)
    
    // Fan-in results and errors
    results, errors := fanInWithErrors(ctx, workers...)
    
    // Process results and errors
    var successCount, errorCount int
    
    done := make(chan bool)
    
    // Handle results
    go func() {
        for result := range results {
            successCount++
            fmt.Printf("Success: Job %d -> %d\n", result.Value/10, result.Value)
        }
        done <- true
    }()
    
    // Handle errors
    go func() {
        for err := range errors {
            errorCount++
            fmt.Printf("Error: %v\n", err)
        }
        done <- true
    }()
    
    // Wait for both goroutines to finish
    <-done
    <-done
    
    fmt.Printf("\nSummary: %d successful, %d failed\n", successCount, errorCount)
}

Bounded Fan-Out Pattern

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// BoundedFanOut limits the number of concurrent workers
type BoundedFanOut struct {
    maxWorkers int
    semaphore  chan struct{}
}

func NewBoundedFanOut(maxWorkers int) *BoundedFanOut {
    return &BoundedFanOut{
        maxWorkers: maxWorkers,
        semaphore:  make(chan struct{}, maxWorkers),
    }
}

func (b *BoundedFanOut) Process(ctx context.Context, jobs <-chan int, processor func(int) int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    go func() {
        defer close(output)
        
        for job := range jobs {
            // Acquire semaphore
            select {
            case b.semaphore <- struct{}{}:
            case <-ctx.Done():
                return
            }
            
            wg.Add(1)
            go func(j int) {
                defer wg.Done()
                defer func() { <-b.semaphore }() // Release semaphore
                
                result := processor(j)
                select {
                case output <- result:
                case <-ctx.Done():
                }
            }(job)
        }
        
        wg.Wait()
    }()
    
    return output
}

func main() {
    ctx := context.Background()
    
    // Create bounded fan-out with max 3 workers
    fanOut := NewBoundedFanOut(3)
    
    // Create job channel
    jobs := make(chan int)
    
    // Generate jobs
    go func() {
        defer close(jobs)
        for i := 1; i <= 10; i++ {
            jobs <- i
        }
    }()
    
    // Process with bounded concurrency
    results := fanOut.Process(ctx, jobs, func(n int) int {
        // Simulate work
        time.Sleep(500 * time.Millisecond)
        return n * n
    })
    
    // Collect results
    fmt.Println("Bounded Fan-Out Results:")
    for result := range results {
        fmt.Printf("Result: %d\n", result)
    }
}

Performance Considerations

Optimal Worker Count

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func benchmarkWorkers(jobs int, workers int) time.Duration {
    input := make(chan int, jobs)
    
    // Fill input channel
    go func() {
        defer close(input)
        for i := 0; i < jobs; i++ {
            input <- i
        }
    }()
    
    start := time.Now()
    
    // Fan-out
    outputs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        output := make(chan int)
        outputs[i] = output
        
        go func(out chan<- int) {
            defer close(out)
            for n := range input {
                // Simulate CPU work
                time.Sleep(10 * time.Millisecond)
                out <- n * n
            }
        }(output)
    }
    
    // Fan-in
    result := make(chan int)
    var wg sync.WaitGroup
    
    for _, output := range outputs {
        wg.Add(1)
        go func(in <-chan int) {
            defer wg.Done()
            for value := range in {
                result <- value
            }
        }(in)
    }
    
    go func() {
        wg.Wait()
        close(result)
    }()
    
    // Consume results
    count := 0
    for range result {
        count++
    }
    
    return time.Since(start)
}

func main() {
    jobs := 100
    maxWorkers := runtime.NumCPU() * 2
    
    fmt.Printf("Benchmarking with %d jobs:\n", jobs)
    
    for workers := 1; workers <= maxWorkers; workers++ {
        duration := benchmarkWorkers(jobs, workers)
        fmt.Printf("Workers: %2d, Time: %v\n", workers, duration)
    }
}

Common Pitfalls and Solutions

1. Goroutine Leaks

// ❌ Bad: Goroutines may leak if context is cancelled
func badFanOut(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    
    go func() {
        for n := range input {
            output <- n * 2 // May block forever if no reader
        }
        close(output)
    }()
    
    return output
}

// ✅ Good: Proper context handling
func goodFanOut(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    
    go func() {
        defer close(output)
        for {
            select {
            case n, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- n * 2:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return output
}

2. Unbalanced Work Distribution

// ✅ Round-robin distribution for balanced load
func balancedFanOut(input <-chan int, workers int) []<-chan int {
    outputs := make([]chan int, workers)
    for i := range outputs {
        outputs[i] = make(chan int)
    }
    
    go func() {
        defer func() {
            for _, output := range outputs {
                close(output)
            }
        }()
        
        workerIndex := 0
        for n := range input {
            outputs[workerIndex] <- n
            workerIndex = (workerIndex + 1) % workers
        }
    }()
    
    // Convert to read-only channels
    result := make([]<-chan int, workers)
    for i, output := range outputs {
        result[i] = output
    }
    
    return result
}

Best Practices

  1. Use Context: Always support cancellation with context
  2. Handle Errors: Separate error handling from success cases
  3. Bound Concurrency: Limit workers to prevent resource exhaustion
  4. Monitor Performance: Benchmark different worker counts
  5. Graceful Shutdown: Ensure all goroutines can exit cleanly
  6. Resource Cleanup: Use defer statements for cleanup
  7. Avoid Blocking: Use select statements for non-blocking operations

When to Use Fan-Out/Fan-In

Use When:

  • Tasks can be processed independently
  • You have CPU-intensive or I/O-bound work
  • You want to parallelize processing
  • Order of results doesn’t matter (or can be handled separately)

Avoid When:

  • Tasks have dependencies between them
  • Sequential processing is required
  • Memory usage is a concern with many goroutines
  • The overhead of coordination exceeds the benefits

Testing Fan-Out/Fan-In

package main

import (
    "context"
    "testing"
    "time"
)

func TestFanOutFanIn(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // Create input
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
        }
    }()
    
    // Fan-out and fan-in
    workers := fanOut(input, 3)
    results := fanIn(workers...)
    
    // Collect and verify results
    seen := make(map[int]bool)
    count := 0
    
    for result := range results {
        seen[result] = true
        count++
    }
    
    if count != 10 {
        t.Errorf("Expected 10 results, got %d", count)
    }
    
    // Verify all expected results are present
    for i := 1; i <= 10; i++ {
        expected := i * i
        if !seen[expected] {
            t.Errorf("Missing result: %d", expected)
        }
    }
}

The Fan-Out/Fan-In pattern is essential for building scalable concurrent applications in Go. It allows you to leverage multiple CPU cores and handle I/O-bound operations efficiently while maintaining clean, composable code.


Next: Learn about Pub/Sub Pattern for event-driven communication between components.