Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern →
What is the Fan-Out/Fan-In Pattern?
The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently.
Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination
Real-World Use Cases
- Image Processing: Resize multiple images in parallel
- Data Processing: Process large datasets across multiple workers
- API Calls: Make multiple HTTP requests concurrently
- File Operations: Process multiple files simultaneously
- Database Queries: Execute multiple queries in parallel
Basic Fan-Out/Fan-In Implementation
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
output := make(chan int)
outputs[i] = output
go func(out chan<- int) {
defer close(out)
for n := range input {
// Simulate work
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- n * n // Square the number
}
}(output)
}
return outputs
}
// Fan-in: collect results from multiple workers
func fanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for _, input := range inputs {
wg.Add(1)
go func(in <-chan int) {
defer wg.Done()
for value := range in {
output <- value
}
}(in)
}
// Close output channel when all inputs are done
go func() {
wg.Wait()
close(output)
}()
return output
}
func main() {
// Create input channel
input := make(chan int)
// Start the pipeline
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// Fan-out to 3 workers
workers := fanOut(input, 3)
// Fan-in results
results := fanIn(workers...)
// Collect results
fmt.Println("Results:")
for result := range results {
fmt.Printf("Processed: %d\n", result)
}
}
Advanced Fan-Out/Fan-In with Error Handling
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type Result struct {
Value int
Error error
}
type Job struct {
ID int
Data string
}
// Worker function that can fail
func processJob(ctx context.Context, job Job) Result {
// Simulate work that might fail
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
// Simulate random failures
if rand.Float32() < 0.2 {
return Result{Error: fmt.Errorf("failed to process job %d", job.ID)}
}
return Result{Value: job.ID * 10}
}
// Fan-out with context and error handling
func fanOutWithErrors(ctx context.Context, jobs <-chan Job, workers int) []<-chan Result {
outputs := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
output := make(chan Result)
outputs[i] = output
go func(out chan<- Result, workerID int) {
defer close(out)
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
result := processJob(ctx, job)
select {
case out <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(output, i)
}
return outputs
}
// Fan-in with error collection
func fanInWithErrors(ctx context.Context, inputs ...<-chan Result) (<-chan Result, <-chan error) {
output := make(chan Result)
errors := make(chan error)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(in <-chan Result) {
defer wg.Done()
for {
select {
case result, ok := <-in:
if !ok {
return
}
if result.Error != nil {
select {
case errors <- result.Error:
case <-ctx.Done():
return
}
} else {
select {
case output <- result:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}(input)
}
go func() {
wg.Wait()
close(output)
close(errors)
}()
return output, errors
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create job channel
jobs := make(chan Job)
// Generate jobs
go func() {
defer close(jobs)
for i := 1; i <= 20; i++ {
select {
case jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}:
case <-ctx.Done():
return
}
}
}()
// Fan-out to 4 workers
workers := fanOutWithErrors(ctx, jobs, 4)
// Fan-in results and errors
results, errors := fanInWithErrors(ctx, workers...)
// Process results and errors
var successCount, errorCount int
done := make(chan bool)
// Handle results
go func() {
for result := range results {
successCount++
fmt.Printf("Success: Job %d -> %d\n", result.Value/10, result.Value)
}
done <- true
}()
// Handle errors
go func() {
for err := range errors {
errorCount++
fmt.Printf("Error: %v\n", err)
}
done <- true
}()
// Wait for both goroutines to finish
<-done
<-done
fmt.Printf("\nSummary: %d successful, %d failed\n", successCount, errorCount)
}
Bounded Fan-Out Pattern
package main
import (
"context"
"fmt"
"sync"
"time"
)
// BoundedFanOut limits the number of concurrent workers
type BoundedFanOut struct {
maxWorkers int
semaphore chan struct{}
}
func NewBoundedFanOut(maxWorkers int) *BoundedFanOut {
return &BoundedFanOut{
maxWorkers: maxWorkers,
semaphore: make(chan struct{}, maxWorkers),
}
}
func (b *BoundedFanOut) Process(ctx context.Context, jobs <-chan int, processor func(int) int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
go func() {
defer close(output)
for job := range jobs {
// Acquire semaphore
select {
case b.semaphore <- struct{}{}:
case <-ctx.Done():
return
}
wg.Add(1)
go func(j int) {
defer wg.Done()
defer func() { <-b.semaphore }() // Release semaphore
result := processor(j)
select {
case output <- result:
case <-ctx.Done():
}
}(job)
}
wg.Wait()
}()
return output
}
func main() {
ctx := context.Background()
// Create bounded fan-out with max 3 workers
fanOut := NewBoundedFanOut(3)
// Create job channel
jobs := make(chan int)
// Generate jobs
go func() {
defer close(jobs)
for i := 1; i <= 10; i++ {
jobs <- i
}
}()
// Process with bounded concurrency
results := fanOut.Process(ctx, jobs, func(n int) int {
// Simulate work
time.Sleep(500 * time.Millisecond)
return n * n
})
// Collect results
fmt.Println("Bounded Fan-Out Results:")
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}
Performance Considerations
Optimal Worker Count
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func benchmarkWorkers(jobs int, workers int) time.Duration {
input := make(chan int, jobs)
// Fill input channel
go func() {
defer close(input)
for i := 0; i < jobs; i++ {
input <- i
}
}()
start := time.Now()
// Fan-out
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
output := make(chan int)
outputs[i] = output
go func(out chan<- int) {
defer close(out)
for n := range input {
// Simulate CPU work
time.Sleep(10 * time.Millisecond)
out <- n * n
}
}(output)
}
// Fan-in
result := make(chan int)
var wg sync.WaitGroup
for _, output := range outputs {
wg.Add(1)
go func(in <-chan int) {
defer wg.Done()
for value := range in {
result <- value
}
}(in)
}
go func() {
wg.Wait()
close(result)
}()
// Consume results
count := 0
for range result {
count++
}
return time.Since(start)
}
func main() {
jobs := 100
maxWorkers := runtime.NumCPU() * 2
fmt.Printf("Benchmarking with %d jobs:\n", jobs)
for workers := 1; workers <= maxWorkers; workers++ {
duration := benchmarkWorkers(jobs, workers)
fmt.Printf("Workers: %2d, Time: %v\n", workers, duration)
}
}
Common Pitfalls and Solutions
1. Goroutine Leaks
// ❌ Bad: Goroutines may leak if context is cancelled
func badFanOut(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
for n := range input {
output <- n * 2 // May block forever if no reader
}
close(output)
}()
return output
}
// ✅ Good: Proper context handling
func goodFanOut(ctx context.Context, input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for {
select {
case n, ok := <-input:
if !ok {
return
}
select {
case output <- n * 2:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
2. Unbalanced Work Distribution
// ✅ Round-robin distribution for balanced load
func balancedFanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]chan int, workers)
for i := range outputs {
outputs[i] = make(chan int)
}
go func() {
defer func() {
for _, output := range outputs {
close(output)
}
}()
workerIndex := 0
for n := range input {
outputs[workerIndex] <- n
workerIndex = (workerIndex + 1) % workers
}
}()
// Convert to read-only channels
result := make([]<-chan int, workers)
for i, output := range outputs {
result[i] = output
}
return result
}
Best Practices
- Use Context: Always support cancellation with context
- Handle Errors: Separate error handling from success cases
- Bound Concurrency: Limit workers to prevent resource exhaustion
- Monitor Performance: Benchmark different worker counts
- Graceful Shutdown: Ensure all goroutines can exit cleanly
- Resource Cleanup: Use defer statements for cleanup
- Avoid Blocking: Use select statements for non-blocking operations
When to Use Fan-Out/Fan-In
Use When:
- Tasks can be processed independently
- You have CPU-intensive or I/O-bound work
- You want to parallelize processing
- Order of results doesn’t matter (or can be handled separately)
Avoid When:
- Tasks have dependencies between them
- Sequential processing is required
- Memory usage is a concern with many goroutines
- The overhead of coordination exceeds the benefits
Testing Fan-Out/Fan-In
package main
import (
"context"
"testing"
"time"
)
func TestFanOutFanIn(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create input
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
}
}()
// Fan-out and fan-in
workers := fanOut(input, 3)
results := fanIn(workers...)
// Collect and verify results
seen := make(map[int]bool)
count := 0
for result := range results {
seen[result] = true
count++
}
if count != 10 {
t.Errorf("Expected 10 results, got %d", count)
}
// Verify all expected results are present
for i := 1; i <= 10; i++ {
expected := i * i
if !seen[expected] {
t.Errorf("Missing result: %d", expected)
}
}
}
The Fan-Out/Fan-In pattern is essential for building scalable concurrent applications in Go. It allows you to leverage multiple CPU cores and handle I/O-bound operations efficiently while maintaining clean, composable code.
Next: Learn about Pub/Sub Pattern for event-driven communication between components.