Scaling to Multiple Workers

We’ve explored unbounded and bounded buffers with single or few workers. Now let’s scale to many producers and many consumers - the pattern behind most production systems!

This pattern combines:

  • Fan-out: Multiple producers generating work
  • Fan-in: Multiple consumers processing work
  • Load balancing: Work distributed across consumers
  • Result aggregation: Collecting results from all consumers

Real-World Applications

This is THE pattern for scalable systems:

  • Web servers: Multiple request handlers, multiple worker threads
  • Message queues: Multiple publishers, multiple subscribers
  • MapReduce: Multiple mappers, multiple reducers
  • Microservices: Multiple API instances processing requests
  • Data pipelines: Parallel ETL stages
  • Video encoding: Multiple encoders processing jobs

The Architecture

              Shared Channel
Producers                             Consumers
                  [Buffer]

P1 ─┐                              ┌─→ C1
P2 ─┤→ [═════Queue═════]         ├─→ C2
P3 ─┤                              ├─→ C3
P4 ─┘                              └─→ C4

All producers send to same channel
All consumers receive from same channel
Go's scheduler load balances automatically!

Basic Implementation

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// Job represents work to be done
type Job struct {
	ID         int
	ProducerID int
	Data       int
	CreatedAt  time.Time
}

// Result represents processed work
type Result struct {
	JobID      int
	ConsumerID int
	Output     int
	Duration   time.Duration
}

// Stats tracks system metrics
type Stats struct {
	jobsCreated   atomic.Int64
	jobsProcessed atomic.Int64
	totalDuration atomic.Int64 // nanoseconds
}

func (s *Stats) Report() {
	created := s.jobsCreated.Load()
	processed := s.jobsProcessed.Load()
	avgDuration := time.Duration(0)
	if processed > 0 {
		avgDuration = time.Duration(s.totalDuration.Load() / processed)
	}

	fmt.Printf(`
📊 Final Statistics:
   Jobs created:   %d
   Jobs processed: %d
   Avg duration:   %v
   Jobs in flight: %d
`, created, processed, avgDuration, created-processed)
}

// Producer generates jobs
func Producer(id int, jobs chan<- Job, duration time.Duration, stats *Stats, wg *sync.WaitGroup) {
	defer wg.Done()

	jobNum := 0
	deadline := time.Now().Add(duration)

	for time.Now().Before(deadline) {
		job := Job{
			ID:         id*1000 + jobNum,
			ProducerID: id,
			Data:       rand.Intn(100),
			CreatedAt:  time.Now(),
		}

		jobs <- job
		stats.jobsCreated.Add(1)
		jobNum++

		// Variable production rate
		time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
	}

	fmt.Printf("[Producer %d] Finished, created %d jobs\n", id, jobNum)
}

// Consumer processes jobs and sends results
func Consumer(id int, jobs <-chan Job, results chan<- Result, stats *Stats, wg *sync.WaitGroup) {
	defer wg.Done()

	processed := 0

	for job := range jobs {
		start := time.Now()

		// Simulate work
		processingTime := time.Duration(80+rand.Intn(120)) * time.Millisecond
		time.Sleep(processingTime)

		// Compute result
		result := Result{
			JobID:      job.ID,
			ConsumerID: id,
			Output:     job.Data * 2,
			Duration:   time.Since(start),
		}

		results <- result

		stats.jobsProcessed.Add(1)
		stats.totalDuration.Add(int64(result.Duration))
		processed++

		queueTime := start.Sub(job.CreatedAt)
		if queueTime > 500*time.Millisecond {
			fmt.Printf("[Consumer %d] ⚠️  Job %d queued for %v\n",
				id, job.ID, queueTime)
		}
	}

	fmt.Printf("[Consumer %d] Finished, processed %d jobs\n", id, processed)
}

// ResultAggregator collects all results
func ResultAggregator(results <-chan Result, done chan<- struct{}) {
	resultCount := 0
	totalDuration := time.Duration(0)

	for result := range results {
		resultCount++
		totalDuration += result.Duration

		if resultCount%10 == 0 {
			avgDuration := totalDuration / time.Duration(resultCount)
			fmt.Printf("📦 Aggregator: %d results (avg: %v)\n",
				resultCount, avgDuration)
		}
	}

	fmt.Printf("📦 Aggregator: Finished with %d total results\n", resultCount)
	close(done)
}

func main() {
	fmt.Println("=== Multiple Producers, Multiple Consumers ===\n")

	const (
		numProducers = 4
		numConsumers = 6
		bufferSize   = 20
		duration     = 3 * time.Second
	)

	jobs := make(chan Job, bufferSize)
	results := make(chan Result, bufferSize)
	stats := &Stats{}

	fmt.Printf("Configuration:\n")
	fmt.Printf("  Producers: %d\n", numProducers)
	fmt.Printf("  Consumers: %d\n", numConsumers)
	fmt.Printf("  Buffer size: %d\n", bufferSize)
	fmt.Printf("  Duration: %v\n\n", duration)

	var producerWg, consumerWg sync.WaitGroup
	aggregatorDone := make(chan struct{})

	// Start result aggregator
	go ResultAggregator(results, aggregatorDone)

	// Start consumers
	fmt.Println("Starting consumers...")
	for i := 0; i < numConsumers; i++ {
		consumerWg.Add(1)
		go Consumer(i, jobs, results, stats, &consumerWg)
	}

	// Start producers
	fmt.Println("Starting producers...\n")
	for i := 0; i < numProducers; i++ {
		producerWg.Add(1)
		go Producer(i, jobs, duration, stats, &producerWg)
	}

	// Wait for producers to finish
	producerWg.Wait()
	fmt.Println("\n✓ All producers finished")
	close(jobs) // Signal consumers no more jobs

	// Wait for consumers to finish
	consumerWg.Wait()
	fmt.Println("✓ All consumers finished")
	close(results) // Signal aggregator no more results

	// Wait for aggregator
	<-aggregatorDone
	fmt.Println("✓ Result aggregator finished")

	stats.Report()
	fmt.Println("\n✓ Pipeline complete!")
}

Load Balancing with Channels

One of Go’s superpowers: channels provide automatic load balancing!

// All consumers read from same channel
for job := range jobs {
    // Go runtime automatically distributes jobs
    // to available consumers
}

This is fair load balancing - each consumer gets the next available job. No manual coordination needed!

Advanced: Work Distribution Patterns

Let’s explore different distribution strategies:

package main

import (
	"fmt"
	"hash/fnv"
	"sync"
)

// Partition-based distribution
type PartitionedQueue struct {
	queues []chan Job
	numPartitions int
}

func NewPartitionedQueue(numPartitions, bufferSize int) *PartitionedQueue {
	pq := &PartitionedQueue{
		queues: make([]chan Job, numPartitions),
		numPartitions: numPartitions,
	}
	for i := 0; i < numPartitions; i++ {
		pq.queues[i] = make(chan Job, bufferSize)
	}
	return pq
}

// Send to partition based on job key
func (pq *PartitionedQueue) Send(job Job, key string) {
	partition := pq.partition(key)
	pq.queues[partition] <- job
}

func (pq *PartitionedQueue) partition(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32()) % pq.numPartitions
}

// Each consumer gets its own partition
func (pq *PartitionedQueue) StartConsumers(numConsumers int, process func(Job)) {
	for i := 0; i < numConsumers; i++ {
		go func(partition int) {
			for job := range pq.queues[partition] {
				process(job)
			}
		}(i % pq.numPartitions)
	}
}

Priority-Based Processing

Sometimes jobs have different priorities:

package main

import (
	"container/heap"
	"sync"
	"time"
)

// PriorityJob wraps a job with priority
type PriorityJob struct {
	Job      Job
	Priority int
	Index    int
}

// PriorityQueue implements heap.Interface
type PriorityQueue []*PriorityJob

func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].Priority > pq[j].Priority // Higher priority first
}
func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].Index = i
	pq[j].Index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
	item := x.(*PriorityJob)
	item.Index = len(*pq)
	*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	old[n-1] = nil
	*pq = old[0 : n-1]
	return item
}

// PriorityJobQueue coordinates priority-based processing
type PriorityJobQueue struct {
	pq      PriorityQueue
	mu      sync.Mutex
	cond    *sync.Cond
	closed  bool
}

func NewPriorityJobQueue() *PriorityJobQueue {
	pjq := &PriorityJobQueue{
		pq: make(PriorityQueue, 0),
	}
	pjq.cond = sync.NewCond(&pjq.mu)
	heap.Init(&pjq.pq)
	return pjq
}

func (pjq *PriorityJobQueue) Enqueue(job Job, priority int) {
	pjq.mu.Lock()
	defer pjq.mu.Unlock()

	heap.Push(&pjq.pq, &PriorityJob{
		Job:      job,
		Priority: priority,
	})
	pjq.cond.Signal()
}

func (pjq *PriorityJobQueue) Dequeue() (Job, bool) {
	pjq.mu.Lock()
	defer pjq.mu.Unlock()

	for pjq.pq.Len() == 0 && !pjq.closed {
		pjq.cond.Wait()
	}

	if pjq.pq.Len() == 0 {
		return Job{}, false
	}

	item := heap.Pop(&pjq.pq).(*PriorityJob)
	return item.Job, true
}

func (pjq *PriorityJobQueue) Close() {
	pjq.mu.Lock()
	pjq.closed = true
	pjq.mu.Unlock()
	pjq.cond.Broadcast()
}

// Consumer that processes priority jobs
func PriorityConsumer(id int, queue *PriorityJobQueue, wg *sync.WaitGroup) {
	defer wg.Done()

	for {
		job, ok := queue.Dequeue()
		if !ok {
			break
		}

		fmt.Printf("[Consumer %d] Processing high-priority job %d\n", id, job.ID)
		time.Sleep(100 * time.Millisecond)
	}
}

Real-World Example: Image Processing Pipeline

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Image struct {
	ID       int
	Filename string
	Size     int // bytes
}

type ProcessedImage struct {
	ID          int
	Filename    string
	Thumbnail   []byte
	Compressed  []byte
	ProcessTime time.Duration
}

// ImageLoader (Producer) loads images from disk
func ImageLoader(id int, images chan<- Image, count int, wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i < count; i++ {
		img := Image{
			ID:       id*1000 + i,
			Filename: fmt.Sprintf("image_%d_%d.jpg", id, i),
			Size:     1024 * 1024 * (2 + i%3), // 2-4 MB
		}

		images <- img
		time.Sleep(50 * time.Millisecond)
	}
}

// ImageProcessor (Consumer) processes images
func ImageProcessor(id int, images <-chan Image, results chan<- ProcessedImage,
	stats *ProcessingStats, wg *sync.WaitGroup) {
	defer wg.Done()

	for img := range images {
		start := time.Now()

		// Simulate processing
		processTime := time.Duration(img.Size/10000) * time.Millisecond

		// Generate thumbnail (simulated)
		time.Sleep(processTime / 2)

		// Compress (simulated)
		time.Sleep(processTime / 2)

		result := ProcessedImage{
			ID:          img.ID,
			Filename:    img.Filename,
			Thumbnail:   make([]byte, 100*1024), // 100 KB thumbnail
			Compressed:  make([]byte, img.Size/2), // 50% compression
			ProcessTime: time.Since(start),
		}

		results <- result
		stats.recordProcessing(result.ProcessTime)

		fmt.Printf("[Processor %d] Processed %s in %v\n",
			id, img.Filename, result.ProcessTime)
	}
}

type ProcessingStats struct {
	totalProcessed atomic.Int64
	totalTime      atomic.Int64 // nanoseconds
}

func (s *ProcessingStats) recordProcessing(duration time.Duration) {
	s.totalProcessed.Add(1)
	s.totalTime.Add(int64(duration))
}

func (s *ProcessingStats) Report() {
	total := s.totalProcessed.Load()
	avg := time.Duration(0)
	if total > 0 {
		avg = time.Duration(s.totalTime.Load() / total)
	}
	fmt.Printf("\n📊 Processed %d images, avg time: %v\n", total, avg)
}

// ImageSaver (Aggregator) saves processed images
func ImageSaver(results <-chan ProcessedImage, stats *ProcessingStats, done chan<- struct{}) {
	saved := 0
	for result := range results {
		// Simulate saving to disk
		time.Sleep(20 * time.Millisecond)
		saved++

		if saved%5 == 0 {
			fmt.Printf("💾 Saved %d images so far\n", saved)
		}
		_ = result
	}

	fmt.Printf("💾 Saved %d total images\n", saved)
	close(done)
}

func RunImagePipeline() {
	fmt.Println("=== Image Processing Pipeline ===\n")

	const (
		numLoaders    = 2
		numProcessors = 4
		imagesPerLoader = 10
	)

	images := make(chan Image, 50)
	results := make(chan ProcessedImage, 50)
	stats := &ProcessingStats{}
	done := make(chan struct{})

	var loaderWg, processorWg sync.WaitGroup

	// Start saver
	go ImageSaver(results, stats, done)

	// Start processors
	for i := 0; i < numProcessors; i++ {
		processorWg.Add(1)
		go ImageProcessor(i, images, results, stats, &processorWg)
	}

	// Start loaders
	for i := 0; i < numLoaders; i++ {
		loaderWg.Add(1)
		go ImageLoader(i, images, imagesPerLoader, &loaderWg)
	}

	loaderWg.Wait()
	close(images)

	processorWg.Wait()
	close(results)

	<-done

	stats.Report()
	fmt.Println("\n✓ Pipeline complete!")
}

func main() {
	RunImagePipeline()
}

Key Patterns for Multiple Workers

1. Fan-Out Pattern

// Multiple goroutines read from same channel
for i := 0; i < numWorkers; i++ {
    go worker(jobs)
}

2. Fan-In Pattern

// Multiple goroutines write to same channel
results := make(chan Result, bufferSize)
for i := 0; i < numProducers; i++ {
    go producer(results)
}

3. Result Aggregation

go func() {
    for result := range results {
        aggregate(result)
    }
    close(done)
}()

Performance Considerations

Optimal Worker Count

  • CPU-bound: Workers = Number of CPUs
  • I/O-bound: Workers = 10-100× Number of CPUs
  • Mixed: Benchmark to find sweet spot
numWorkers := runtime.NumCPU()        // CPU-bound
numWorkers := runtime.NumCPU() * 10   // I/O-bound

Buffer Sizing

  • Too small: Contention, blocking
  • Too large: Memory waste
  • Sweet spot: 2-5× number of workers
bufferSize := numWorkers * 3

Common Pitfalls

1. Forgetting to Close Channels

// ✗ Wrong: Consumers wait forever
go producer(jobs)
// jobs never closed!

// ✓ Correct: Close after all producers finish
producerWg.Wait()
close(jobs)

2. Closing Channel Multiple Times

// ✗ Wrong: Panic!
close(jobs)
close(jobs) // Panic: close of closed channel

// ✓ Correct: Use sync.Once
var closeOnce sync.Once
closeOnce.Do(func() { close(jobs) })

3. Not Draining Results Channel

// ✗ Wrong: Results channel fills up, consumers block
results := make(chan Result, 10)

// ✓ Correct: Always drain results
go func() {
    for result := range results {
        handleResult(result)
    }
}()

Monitoring and Metrics

Track these key metrics:

  • Throughput: Jobs processed per second
  • Latency: Time from creation to completion
  • Queue depth: Current buffer fill
  • Worker utilization: % time workers are busy
  • Error rate: Failed jobs per second

When to Use This Pattern

Use when:

  • You need high throughput
  • Work can be parallelized
  • Workers are independent
  • Order doesn’t matter (or you handle it separately)

Avoid when:

  • Work must be ordered
  • State is shared between jobs
  • Coordination overhead is high
  • Single worker is fast enough

Next Up: Readers-Writers Problems

We’ve mastered producer-consumer! Next, we’ll tackle the Readers-Writers problem, where multiple readers can share access but writers need exclusive access.

Try It Yourself

  1. Benchmark worker count - Find optimal number
  2. Add work stealing - Busy workers help idle ones
  3. Implement batching - Process jobs in batches
  4. Add circuit breakers - Stop on too many errors
  5. Track per-worker metrics - Identify slow workers

This is part 6 of “Golang Experiments: Classic Concurrency Problems”