Scaling to Multiple Workers
We’ve explored unbounded and bounded buffers with single or few workers. Now let’s scale to many producers and many consumers - the pattern behind most production systems!
This pattern combines:
- Fan-out: Multiple producers generating work
- Fan-in: Multiple consumers processing work
- Load balancing: Work distributed across consumers
- Result aggregation: Collecting results from all consumers
Real-World Applications
This is THE pattern for scalable systems:
- Web servers: Multiple request handlers, multiple worker threads
- Message queues: Multiple publishers, multiple subscribers
- MapReduce: Multiple mappers, multiple reducers
- Microservices: Multiple API instances processing requests
- Data pipelines: Parallel ETL stages
- Video encoding: Multiple encoders processing jobs
The Architecture
Shared Channel
Producers → Consumers
[Buffer]
P1 ─┐ ┌─→ C1
P2 ─┤→ [═════Queue═════] → ├─→ C2
P3 ─┤ ├─→ C3
P4 ─┘ └─→ C4
All producers send to same channel
All consumers receive from same channel
Go's scheduler load balances automatically!
Basic Implementation
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Job represents work to be done
type Job struct {
ID int
ProducerID int
Data int
CreatedAt time.Time
}
// Result represents processed work
type Result struct {
JobID int
ConsumerID int
Output int
Duration time.Duration
}
// Stats tracks system metrics
type Stats struct {
jobsCreated atomic.Int64
jobsProcessed atomic.Int64
totalDuration atomic.Int64 // nanoseconds
}
func (s *Stats) Report() {
created := s.jobsCreated.Load()
processed := s.jobsProcessed.Load()
avgDuration := time.Duration(0)
if processed > 0 {
avgDuration = time.Duration(s.totalDuration.Load() / processed)
}
fmt.Printf(`
📊 Final Statistics:
Jobs created: %d
Jobs processed: %d
Avg duration: %v
Jobs in flight: %d
`, created, processed, avgDuration, created-processed)
}
// Producer generates jobs
func Producer(id int, jobs chan<- Job, duration time.Duration, stats *Stats, wg *sync.WaitGroup) {
defer wg.Done()
jobNum := 0
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
job := Job{
ID: id*1000 + jobNum,
ProducerID: id,
Data: rand.Intn(100),
CreatedAt: time.Now(),
}
jobs <- job
stats.jobsCreated.Add(1)
jobNum++
// Variable production rate
time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
}
fmt.Printf("[Producer %d] Finished, created %d jobs\n", id, jobNum)
}
// Consumer processes jobs and sends results
func Consumer(id int, jobs <-chan Job, results chan<- Result, stats *Stats, wg *sync.WaitGroup) {
defer wg.Done()
processed := 0
for job := range jobs {
start := time.Now()
// Simulate work
processingTime := time.Duration(80+rand.Intn(120)) * time.Millisecond
time.Sleep(processingTime)
// Compute result
result := Result{
JobID: job.ID,
ConsumerID: id,
Output: job.Data * 2,
Duration: time.Since(start),
}
results <- result
stats.jobsProcessed.Add(1)
stats.totalDuration.Add(int64(result.Duration))
processed++
queueTime := start.Sub(job.CreatedAt)
if queueTime > 500*time.Millisecond {
fmt.Printf("[Consumer %d] ⚠️ Job %d queued for %v\n",
id, job.ID, queueTime)
}
}
fmt.Printf("[Consumer %d] Finished, processed %d jobs\n", id, processed)
}
// ResultAggregator collects all results
func ResultAggregator(results <-chan Result, done chan<- struct{}) {
resultCount := 0
totalDuration := time.Duration(0)
for result := range results {
resultCount++
totalDuration += result.Duration
if resultCount%10 == 0 {
avgDuration := totalDuration / time.Duration(resultCount)
fmt.Printf("📦 Aggregator: %d results (avg: %v)\n",
resultCount, avgDuration)
}
}
fmt.Printf("📦 Aggregator: Finished with %d total results\n", resultCount)
close(done)
}
func main() {
fmt.Println("=== Multiple Producers, Multiple Consumers ===\n")
const (
numProducers = 4
numConsumers = 6
bufferSize = 20
duration = 3 * time.Second
)
jobs := make(chan Job, bufferSize)
results := make(chan Result, bufferSize)
stats := &Stats{}
fmt.Printf("Configuration:\n")
fmt.Printf(" Producers: %d\n", numProducers)
fmt.Printf(" Consumers: %d\n", numConsumers)
fmt.Printf(" Buffer size: %d\n", bufferSize)
fmt.Printf(" Duration: %v\n\n", duration)
var producerWg, consumerWg sync.WaitGroup
aggregatorDone := make(chan struct{})
// Start result aggregator
go ResultAggregator(results, aggregatorDone)
// Start consumers
fmt.Println("Starting consumers...")
for i := 0; i < numConsumers; i++ {
consumerWg.Add(1)
go Consumer(i, jobs, results, stats, &consumerWg)
}
// Start producers
fmt.Println("Starting producers...\n")
for i := 0; i < numProducers; i++ {
producerWg.Add(1)
go Producer(i, jobs, duration, stats, &producerWg)
}
// Wait for producers to finish
producerWg.Wait()
fmt.Println("\n✓ All producers finished")
close(jobs) // Signal consumers no more jobs
// Wait for consumers to finish
consumerWg.Wait()
fmt.Println("✓ All consumers finished")
close(results) // Signal aggregator no more results
// Wait for aggregator
<-aggregatorDone
fmt.Println("✓ Result aggregator finished")
stats.Report()
fmt.Println("\n✓ Pipeline complete!")
}
Load Balancing with Channels
One of Go’s superpowers: channels provide automatic load balancing!
// All consumers read from same channel
for job := range jobs {
// Go runtime automatically distributes jobs
// to available consumers
}
This is fair load balancing - each consumer gets the next available job. No manual coordination needed!
Advanced: Work Distribution Patterns
Let’s explore different distribution strategies:
package main
import (
"fmt"
"hash/fnv"
"sync"
)
// Partition-based distribution
type PartitionedQueue struct {
queues []chan Job
numPartitions int
}
func NewPartitionedQueue(numPartitions, bufferSize int) *PartitionedQueue {
pq := &PartitionedQueue{
queues: make([]chan Job, numPartitions),
numPartitions: numPartitions,
}
for i := 0; i < numPartitions; i++ {
pq.queues[i] = make(chan Job, bufferSize)
}
return pq
}
// Send to partition based on job key
func (pq *PartitionedQueue) Send(job Job, key string) {
partition := pq.partition(key)
pq.queues[partition] <- job
}
func (pq *PartitionedQueue) partition(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32()) % pq.numPartitions
}
// Each consumer gets its own partition
func (pq *PartitionedQueue) StartConsumers(numConsumers int, process func(Job)) {
for i := 0; i < numConsumers; i++ {
go func(partition int) {
for job := range pq.queues[partition] {
process(job)
}
}(i % pq.numPartitions)
}
}
Priority-Based Processing
Sometimes jobs have different priorities:
package main
import (
"container/heap"
"sync"
"time"
)
// PriorityJob wraps a job with priority
type PriorityJob struct {
Job Job
Priority int
Index int
}
// PriorityQueue implements heap.Interface
type PriorityQueue []*PriorityJob
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Priority > pq[j].Priority // Higher priority first
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*PriorityJob)
item.Index = len(*pq)
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil
*pq = old[0 : n-1]
return item
}
// PriorityJobQueue coordinates priority-based processing
type PriorityJobQueue struct {
pq PriorityQueue
mu sync.Mutex
cond *sync.Cond
closed bool
}
func NewPriorityJobQueue() *PriorityJobQueue {
pjq := &PriorityJobQueue{
pq: make(PriorityQueue, 0),
}
pjq.cond = sync.NewCond(&pjq.mu)
heap.Init(&pjq.pq)
return pjq
}
func (pjq *PriorityJobQueue) Enqueue(job Job, priority int) {
pjq.mu.Lock()
defer pjq.mu.Unlock()
heap.Push(&pjq.pq, &PriorityJob{
Job: job,
Priority: priority,
})
pjq.cond.Signal()
}
func (pjq *PriorityJobQueue) Dequeue() (Job, bool) {
pjq.mu.Lock()
defer pjq.mu.Unlock()
for pjq.pq.Len() == 0 && !pjq.closed {
pjq.cond.Wait()
}
if pjq.pq.Len() == 0 {
return Job{}, false
}
item := heap.Pop(&pjq.pq).(*PriorityJob)
return item.Job, true
}
func (pjq *PriorityJobQueue) Close() {
pjq.mu.Lock()
pjq.closed = true
pjq.mu.Unlock()
pjq.cond.Broadcast()
}
// Consumer that processes priority jobs
func PriorityConsumer(id int, queue *PriorityJobQueue, wg *sync.WaitGroup) {
defer wg.Done()
for {
job, ok := queue.Dequeue()
if !ok {
break
}
fmt.Printf("[Consumer %d] Processing high-priority job %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)
}
}
Real-World Example: Image Processing Pipeline
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Image struct {
ID int
Filename string
Size int // bytes
}
type ProcessedImage struct {
ID int
Filename string
Thumbnail []byte
Compressed []byte
ProcessTime time.Duration
}
// ImageLoader (Producer) loads images from disk
func ImageLoader(id int, images chan<- Image, count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < count; i++ {
img := Image{
ID: id*1000 + i,
Filename: fmt.Sprintf("image_%d_%d.jpg", id, i),
Size: 1024 * 1024 * (2 + i%3), // 2-4 MB
}
images <- img
time.Sleep(50 * time.Millisecond)
}
}
// ImageProcessor (Consumer) processes images
func ImageProcessor(id int, images <-chan Image, results chan<- ProcessedImage,
stats *ProcessingStats, wg *sync.WaitGroup) {
defer wg.Done()
for img := range images {
start := time.Now()
// Simulate processing
processTime := time.Duration(img.Size/10000) * time.Millisecond
// Generate thumbnail (simulated)
time.Sleep(processTime / 2)
// Compress (simulated)
time.Sleep(processTime / 2)
result := ProcessedImage{
ID: img.ID,
Filename: img.Filename,
Thumbnail: make([]byte, 100*1024), // 100 KB thumbnail
Compressed: make([]byte, img.Size/2), // 50% compression
ProcessTime: time.Since(start),
}
results <- result
stats.recordProcessing(result.ProcessTime)
fmt.Printf("[Processor %d] Processed %s in %v\n",
id, img.Filename, result.ProcessTime)
}
}
type ProcessingStats struct {
totalProcessed atomic.Int64
totalTime atomic.Int64 // nanoseconds
}
func (s *ProcessingStats) recordProcessing(duration time.Duration) {
s.totalProcessed.Add(1)
s.totalTime.Add(int64(duration))
}
func (s *ProcessingStats) Report() {
total := s.totalProcessed.Load()
avg := time.Duration(0)
if total > 0 {
avg = time.Duration(s.totalTime.Load() / total)
}
fmt.Printf("\n📊 Processed %d images, avg time: %v\n", total, avg)
}
// ImageSaver (Aggregator) saves processed images
func ImageSaver(results <-chan ProcessedImage, stats *ProcessingStats, done chan<- struct{}) {
saved := 0
for result := range results {
// Simulate saving to disk
time.Sleep(20 * time.Millisecond)
saved++
if saved%5 == 0 {
fmt.Printf("💾 Saved %d images so far\n", saved)
}
_ = result
}
fmt.Printf("💾 Saved %d total images\n", saved)
close(done)
}
func RunImagePipeline() {
fmt.Println("=== Image Processing Pipeline ===\n")
const (
numLoaders = 2
numProcessors = 4
imagesPerLoader = 10
)
images := make(chan Image, 50)
results := make(chan ProcessedImage, 50)
stats := &ProcessingStats{}
done := make(chan struct{})
var loaderWg, processorWg sync.WaitGroup
// Start saver
go ImageSaver(results, stats, done)
// Start processors
for i := 0; i < numProcessors; i++ {
processorWg.Add(1)
go ImageProcessor(i, images, results, stats, &processorWg)
}
// Start loaders
for i := 0; i < numLoaders; i++ {
loaderWg.Add(1)
go ImageLoader(i, images, imagesPerLoader, &loaderWg)
}
loaderWg.Wait()
close(images)
processorWg.Wait()
close(results)
<-done
stats.Report()
fmt.Println("\n✓ Pipeline complete!")
}
func main() {
RunImagePipeline()
}
Key Patterns for Multiple Workers
1. Fan-Out Pattern
// Multiple goroutines read from same channel
for i := 0; i < numWorkers; i++ {
go worker(jobs)
}
2. Fan-In Pattern
// Multiple goroutines write to same channel
results := make(chan Result, bufferSize)
for i := 0; i < numProducers; i++ {
go producer(results)
}
3. Result Aggregation
go func() {
for result := range results {
aggregate(result)
}
close(done)
}()
Performance Considerations
Optimal Worker Count
- CPU-bound: Workers = Number of CPUs
- I/O-bound: Workers = 10-100× Number of CPUs
- Mixed: Benchmark to find sweet spot
numWorkers := runtime.NumCPU() // CPU-bound
numWorkers := runtime.NumCPU() * 10 // I/O-bound
Buffer Sizing
- Too small: Contention, blocking
- Too large: Memory waste
- Sweet spot: 2-5× number of workers
bufferSize := numWorkers * 3
Common Pitfalls
1. Forgetting to Close Channels
// ✗ Wrong: Consumers wait forever
go producer(jobs)
// jobs never closed!
// ✓ Correct: Close after all producers finish
producerWg.Wait()
close(jobs)
2. Closing Channel Multiple Times
// ✗ Wrong: Panic!
close(jobs)
close(jobs) // Panic: close of closed channel
// ✓ Correct: Use sync.Once
var closeOnce sync.Once
closeOnce.Do(func() { close(jobs) })
3. Not Draining Results Channel
// ✗ Wrong: Results channel fills up, consumers block
results := make(chan Result, 10)
// ✓ Correct: Always drain results
go func() {
for result := range results {
handleResult(result)
}
}()
Monitoring and Metrics
Track these key metrics:
- Throughput: Jobs processed per second
- Latency: Time from creation to completion
- Queue depth: Current buffer fill
- Worker utilization: % time workers are busy
- Error rate: Failed jobs per second
When to Use This Pattern
✓ Use when:
- You need high throughput
- Work can be parallelized
- Workers are independent
- Order doesn’t matter (or you handle it separately)
✗ Avoid when:
- Work must be ordered
- State is shared between jobs
- Coordination overhead is high
- Single worker is fast enough
Next Up: Readers-Writers Problems
We’ve mastered producer-consumer! Next, we’ll tackle the Readers-Writers problem, where multiple readers can share access but writers need exclusive access.
Try It Yourself
- Benchmark worker count - Find optimal number
- Add work stealing - Busy workers help idle ones
- Implement batching - Process jobs in batches
- Add circuit breakers - Stop on too many errors
- Track per-worker metrics - Identify slow workers
This is part 6 of “Golang Experiments: Classic Concurrency Problems”