Scaling to Multiple Workers We’ve explored unbounded and bounded buffers with single or few workers. Now let’s scale to many producers and many consumers - the pattern behind most production systems!
This pattern combines:
Fan-out: Multiple producers generating work Fan-in: Multiple consumers processing work Load balancing: Work distributed across consumers Result aggregation: Collecting results from all consumers Real-World Applications This is THE pattern for scalable systems:
Web servers: Multiple request handlers, multiple worker threads Message queues: Multiple publishers, multiple subscribers MapReduce: Multiple mappers, multiple reducers Microservices: Multiple API instances processing requests Data pipelines: Parallel ETL stages Video encoding: Multiple encoders processing jobs The Architecture Shared Channel Producers → Consumers [Buffer] P1 ─┐ ┌─→ C1 P2 ─┤→ [═════Queue═════] → ├─→ C2 P3 ─┤ ├─→ C3 P4 ─┘ └─→ C4 All producers send to same channel All consumers receive from same channel Go's scheduler load balances automatically! Basic Implementation package main import ( "fmt" "math/rand" "sync" "sync/atomic" "time" ) // Job represents work to be done type Job struct { ID int ProducerID int Data int CreatedAt time.Time } // Result represents processed work type Result struct { JobID int ConsumerID int Output int Duration time.Duration } // Stats tracks system metrics type Stats struct { jobsCreated atomic.Int64 jobsProcessed atomic.Int64 totalDuration atomic.Int64 // nanoseconds } func (s *Stats) Report() { created := s.jobsCreated.Load() processed := s.jobsProcessed.Load() avgDuration := time.Duration(0) if processed > 0 { avgDuration = time.Duration(s.totalDuration.Load() / processed) } fmt.Printf(` 📊 Final Statistics: Jobs created: %d Jobs processed: %d Avg duration: %v Jobs in flight: %d `, created, processed, avgDuration, created-processed) } // Producer generates jobs func Producer(id int, jobs chan<- Job, duration time.Duration, stats *Stats, wg *sync.WaitGroup) { defer wg.Done() jobNum := 0 deadline := time.Now().Add(duration) for time.Now().Before(deadline) { job := Job{ ID: id*1000 + jobNum, ProducerID: id, Data: rand.Intn(100), CreatedAt: time.Now(), } jobs <- job stats.jobsCreated.Add(1) jobNum++ // Variable production rate time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond) } fmt.Printf("[Producer %d] Finished, created %d jobs\n", id, jobNum) } // Consumer processes jobs and sends results func Consumer(id int, jobs <-chan Job, results chan<- Result, stats *Stats, wg *sync.WaitGroup) { defer wg.Done() processed := 0 for job := range jobs { start := time.Now() // Simulate work processingTime := time.Duration(80+rand.Intn(120)) * time.Millisecond time.Sleep(processingTime) // Compute result result := Result{ JobID: job.ID, ConsumerID: id, Output: job.Data * 2, Duration: time.Since(start), } results <- result stats.jobsProcessed.Add(1) stats.totalDuration.Add(int64(result.Duration)) processed++ queueTime := start.Sub(job.CreatedAt) if queueTime > 500*time.Millisecond { fmt.Printf("[Consumer %d] ⚠️ Job %d queued for %v\n", id, job.ID, queueTime) } } fmt.Printf("[Consumer %d] Finished, processed %d jobs\n", id, processed) } // ResultAggregator collects all results func ResultAggregator(results <-chan Result, done chan<- struct{}) { resultCount := 0 totalDuration := time.Duration(0) for result := range results { resultCount++ totalDuration += result.Duration if resultCount%10 == 0 { avgDuration := totalDuration / time.Duration(resultCount) fmt.Printf("📦 Aggregator: %d results (avg: %v)\n", resultCount, avgDuration) } } fmt.Printf("📦 Aggregator: Finished with %d total results\n", resultCount) close(done) } func main() { fmt.Println("=== Multiple Producers, Multiple Consumers ===\n") const ( numProducers = 4 numConsumers = 6 bufferSize = 20 duration = 3 * time.Second ) jobs := make(chan Job, bufferSize) results := make(chan Result, bufferSize) stats := &Stats{} fmt.Printf("Configuration:\n") fmt.Printf(" Producers: %d\n", numProducers) fmt.Printf(" Consumers: %d\n", numConsumers) fmt.Printf(" Buffer size: %d\n", bufferSize) fmt.Printf(" Duration: %v\n\n", duration) var producerWg, consumerWg sync.WaitGroup aggregatorDone := make(chan struct{}) // Start result aggregator go ResultAggregator(results, aggregatorDone) // Start consumers fmt.Println("Starting consumers...") for i := 0; i < numConsumers; i++ { consumerWg.Add(1) go Consumer(i, jobs, results, stats, &consumerWg) } // Start producers fmt.Println("Starting producers...\n") for i := 0; i < numProducers; i++ { producerWg.Add(1) go Producer(i, jobs, duration, stats, &producerWg) } // Wait for producers to finish producerWg.Wait() fmt.Println("\n✓ All producers finished") close(jobs) // Signal consumers no more jobs // Wait for consumers to finish consumerWg.Wait() fmt.Println("✓ All consumers finished") close(results) // Signal aggregator no more results // Wait for aggregator <-aggregatorDone fmt.Println("✓ Result aggregator finished") stats.Report() fmt.Println("\n✓ Pipeline complete!") } Load Balancing with Channels One of Go’s superpowers: channels provide automatic load balancing!
...