Complete Guide to Go Concurrency Patterns: Visual Patterns & Code Examples

    Concurrency is one of Go’s most powerful features, built into the language from the ground up. This comprehensive guide covers all essential concurrency patterns with visual diagrams and practical code examples. Table of Contents Goroutines - Basic Concurrency Channels - Communication Select Statement - Multiplexing Worker Pool Pattern Fan-In Pattern Fan-Out Pattern Pipeline Pattern Semaphore Pattern Barrier Pattern Future/Promise Pattern Rate Limiting Pattern Circuit Breaker Pattern Context Pattern Mutex Pattern WaitGroup Pattern ErrGroup Pattern Goroutines - Basic Concurrency Goroutines are lightweight threads managed by the Go runtime. They enable concurrent execution with minimal overhead. ...

    November 18, 2025 · 17 min · Rafiul Alam

    Producer-Consumer: The Unbounded Buffer

    The Producer-Consumer Problem The Producer-Consumer pattern is one of the most fundamental concurrency patterns. It appears everywhere in modern software: Message queues (RabbitMQ, Kafka, SQS) Task processing (background jobs, worker pools) Data pipelines (ETL, streaming analytics) Event systems (event buses, pub/sub) Buffering (I/O buffers, network buffers) The Setup: Producers generate data, consumers process it. They run concurrently and need to coordinate through a shared buffer. The Unbounded Buffer Variant In this first variant, we use an unbounded buffer - the queue can grow infinitely (until we run out of memory). This is the simplest version and showcases Go’s beautiful channel abstraction. ...

    November 15, 2025 · 7 min · Rafiul Alam

    Producer-Consumer: Multiple Producers, Multiple Consumers

    Scaling to Multiple Workers We’ve explored unbounded and bounded buffers with single or few workers. Now let’s scale to many producers and many consumers - the pattern behind most production systems! This pattern combines: Fan-out: Multiple producers generating work Fan-in: Multiple consumers processing work Load balancing: Work distributed across consumers Result aggregation: Collecting results from all consumers Real-World Applications This is THE pattern for scalable systems: Web servers: Multiple request handlers, multiple worker threads Message queues: Multiple publishers, multiple subscribers MapReduce: Multiple mappers, multiple reducers Microservices: Multiple API instances processing requests Data pipelines: Parallel ETL stages Video encoding: Multiple encoders processing jobs The Architecture Shared Channel Producers → Consumers [Buffer] P1 ─┐ ┌─→ C1 P2 ─┤→ [═════Queue═════] → ├─→ C2 P3 ─┤ ├─→ C3 P4 ─┘ └─→ C4 All producers send to same channel All consumers receive from same channel Go's scheduler load balances automatically! Basic Implementation package main import ( "fmt" "math/rand" "sync" "sync/atomic" "time" ) // Job represents work to be done type Job struct { ID int ProducerID int Data int CreatedAt time.Time } // Result represents processed work type Result struct { JobID int ConsumerID int Output int Duration time.Duration } // Stats tracks system metrics type Stats struct { jobsCreated atomic.Int64 jobsProcessed atomic.Int64 totalDuration atomic.Int64 // nanoseconds } func (s *Stats) Report() { created := s.jobsCreated.Load() processed := s.jobsProcessed.Load() avgDuration := time.Duration(0) if processed > 0 { avgDuration = time.Duration(s.totalDuration.Load() / processed) } fmt.Printf(` 📊 Final Statistics: Jobs created: %d Jobs processed: %d Avg duration: %v Jobs in flight: %d `, created, processed, avgDuration, created-processed) } // Producer generates jobs func Producer(id int, jobs chan<- Job, duration time.Duration, stats *Stats, wg *sync.WaitGroup) { defer wg.Done() jobNum := 0 deadline := time.Now().Add(duration) for time.Now().Before(deadline) { job := Job{ ID: id*1000 + jobNum, ProducerID: id, Data: rand.Intn(100), CreatedAt: time.Now(), } jobs <- job stats.jobsCreated.Add(1) jobNum++ // Variable production rate time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond) } fmt.Printf("[Producer %d] Finished, created %d jobs\n", id, jobNum) } // Consumer processes jobs and sends results func Consumer(id int, jobs <-chan Job, results chan<- Result, stats *Stats, wg *sync.WaitGroup) { defer wg.Done() processed := 0 for job := range jobs { start := time.Now() // Simulate work processingTime := time.Duration(80+rand.Intn(120)) * time.Millisecond time.Sleep(processingTime) // Compute result result := Result{ JobID: job.ID, ConsumerID: id, Output: job.Data * 2, Duration: time.Since(start), } results <- result stats.jobsProcessed.Add(1) stats.totalDuration.Add(int64(result.Duration)) processed++ queueTime := start.Sub(job.CreatedAt) if queueTime > 500*time.Millisecond { fmt.Printf("[Consumer %d] ⚠️ Job %d queued for %v\n", id, job.ID, queueTime) } } fmt.Printf("[Consumer %d] Finished, processed %d jobs\n", id, processed) } // ResultAggregator collects all results func ResultAggregator(results <-chan Result, done chan<- struct{}) { resultCount := 0 totalDuration := time.Duration(0) for result := range results { resultCount++ totalDuration += result.Duration if resultCount%10 == 0 { avgDuration := totalDuration / time.Duration(resultCount) fmt.Printf("📦 Aggregator: %d results (avg: %v)\n", resultCount, avgDuration) } } fmt.Printf("📦 Aggregator: Finished with %d total results\n", resultCount) close(done) } func main() { fmt.Println("=== Multiple Producers, Multiple Consumers ===\n") const ( numProducers = 4 numConsumers = 6 bufferSize = 20 duration = 3 * time.Second ) jobs := make(chan Job, bufferSize) results := make(chan Result, bufferSize) stats := &Stats{} fmt.Printf("Configuration:\n") fmt.Printf(" Producers: %d\n", numProducers) fmt.Printf(" Consumers: %d\n", numConsumers) fmt.Printf(" Buffer size: %d\n", bufferSize) fmt.Printf(" Duration: %v\n\n", duration) var producerWg, consumerWg sync.WaitGroup aggregatorDone := make(chan struct{}) // Start result aggregator go ResultAggregator(results, aggregatorDone) // Start consumers fmt.Println("Starting consumers...") for i := 0; i < numConsumers; i++ { consumerWg.Add(1) go Consumer(i, jobs, results, stats, &consumerWg) } // Start producers fmt.Println("Starting producers...\n") for i := 0; i < numProducers; i++ { producerWg.Add(1) go Producer(i, jobs, duration, stats, &producerWg) } // Wait for producers to finish producerWg.Wait() fmt.Println("\n✓ All producers finished") close(jobs) // Signal consumers no more jobs // Wait for consumers to finish consumerWg.Wait() fmt.Println("✓ All consumers finished") close(results) // Signal aggregator no more results // Wait for aggregator <-aggregatorDone fmt.Println("✓ Result aggregator finished") stats.Report() fmt.Println("\n✓ Pipeline complete!") } Load Balancing with Channels One of Go’s superpowers: channels provide automatic load balancing! ...

    October 25, 2025 · 9 min · Rafiul Alam

    Golang Experiments: Classic Concurrency Problems

    Welcome to Golang Experiments! This series explores classic concurrency problems through the lens of Go’s powerful concurrency primitives. Each problem is a timeless synchronization challenge that teaches fundamental concepts you’ll use in production systems every day. What you’ll learn: Go’s concurrency features (goroutines, channels, sync primitives) How to recognize and solve common synchronization problems Patterns that appear in real-world distributed systems When to use different synchronization strategies Why Study Classic Problems? These aren’t just academic exercises! Each problem represents a pattern you’ll encounter in production: ...

    October 20, 2025 · 11 min · Rafiul Alam

    Building a Real-Time File Monitor with Goroutines and Channels

    File monitoring is a common requirement in many applications: watching for configuration changes, processing uploads, syncing directories, or building development tools. In this post, we’ll build a production-ready file monitoring service that tracks word counts across all files in a directory using Go’s powerful concurrency primitives. The Challenge Build a service that: Monitors a directory and all its subdirectories Counts words in all files in real-time Updates counts when files are modified, created, or deleted Processes multiple files concurrently Provides current statistics on demand Handles errors gracefully Why This Matters Real-time file monitoring powers many production systems: ...

    October 3, 2025 · 10 min · Rafiul Alam

    Producer-Consumer: The Bounded Buffer

    From Unbounded to Bounded In the previous article, we explored the unbounded buffer pattern where the queue could grow infinitely. This works until you run out of memory! The bounded buffer adds a crucial constraint: maximum queue size. This introduces backpressure - when the buffer is full, producers must wait for consumers to catch up. Why Bounded Buffers Matter Bounded buffers appear everywhere in production systems: TCP sliding windows (flow control) HTTP/2 stream flow control (prevents overwhelm) Message queue limits (RabbitMQ, Kafka partition limits) Thread pool queues (bounded task queues) Rate limiters (token buckets with finite capacity) Circuit breakers (limit concurrent requests) The key benefit: Bounded buffers provide natural backpressure and prevent resource exhaustion. ...

    September 19, 2025 · 8 min · Rafiul Alam

    Go Concurrency Pattern: The Sieve of Eratosthenes Pipeline

    ← Monte Carlo Pi | Series Overview | Mandelbrot Set → The Problem: Finding Primes with Filters The Sieve of Eratosthenes is an ancient algorithm for finding prime numbers. The concurrent version creates a pipeline of filters: each prime spawns a goroutine that filters out its multiples. The Algorithm: Generate sequence: 2, 3, 4, 5, 6, 7, 8, 9, 10, … Take first number (2), it’s prime, filter all multiples of 2 Take next number (3), it’s prime, filter all multiples of 3 Take next number (5), it’s prime, filter all multiples of 5 Repeat until desired count The Beauty: Each prime creates its own filter. Numbers flow through a pipeline of increasingly selective filters. What passes through all filters must be prime. ...

    February 2, 2025 · 11 min · Rafiul Alam

    Mastering Go Concurrency: The Coffee Shop Guide to Goroutines

    Go Concurrency Patterns Series: Series Overview | Goroutine Basics | Channel Fundamentals Introduction: Welcome to Go Coffee Shop Imagine running a busy coffee shop. You have customers placing orders, baristas making drinks, shared equipment like espresso machines and milk steamers, and the constant challenge of managing it all efficiently. This is exactly what concurrent programming in Go is like - and goroutines are your baristas! In this comprehensive guide, we’ll explore Go’s concurrency patterns through the lens of running a coffee shop. By the end, you’ll understand not just how to write concurrent Go code, but why these patterns work and when to use them. ...

    December 15, 2024 · 30 min · Rafiul Alam

    Worker Pool Pattern in Go

    Go Concurrency Patterns Series: ← Request/Response | Series Overview | Mutex Patterns → What is the Worker Pool Pattern? The Worker Pool pattern manages a fixed number of worker goroutines that process jobs from a shared queue. This pattern is essential for controlling resource usage, preventing system overload, and ensuring predictable performance under varying loads. Key Components: Job Queue: Channel containing work to be processed Worker Pool: Fixed number of worker goroutines Result Channel: Optional channel for collecting results Dispatcher: Coordinates job distribution to workers Real-World Use Cases Image Processing: Resize/compress images with limited CPU cores Database Operations: Limit concurrent database connections API Rate Limiting: Control outbound API call rates File Processing: Process files with bounded I/O operations Web Scraping: Limit concurrent HTTP requests Background Jobs: Process queued tasks with resource limits Basic Worker Pool Implementation package main import ( "fmt" "math/rand" "sync" "time" ) // Job represents work to be processed type Job struct { ID int Data interface{} } // Result represents the outcome of processing a job type Result struct { JobID int Output interface{} Error error } // WorkerPool manages a pool of workers type WorkerPool struct { workerCount int jobQueue chan Job resultQueue chan Result quit chan bool wg sync.WaitGroup } // NewWorkerPool creates a new worker pool func NewWorkerPool(workerCount, jobQueueSize int) *WorkerPool { return &WorkerPool{ workerCount: workerCount, jobQueue: make(chan Job, jobQueueSize), resultQueue: make(chan Result, jobQueueSize), quit: make(chan bool), } } // Start initializes and starts all workers func (wp *WorkerPool) Start() { for i := 0; i < wp.workerCount; i++ { wp.wg.Add(1) go wp.worker(i) } } // worker processes jobs from the job queue func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() for { select { case job := <-wp.jobQueue: fmt.Printf("Worker %d processing job %d\n", id, job.ID) result := wp.processJob(job) wp.resultQueue <- result case <-wp.quit: fmt.Printf("Worker %d stopping\n", id) return } } } // processJob simulates job processing func (wp *WorkerPool) processJob(job Job) Result { // Simulate work time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the job (example: square the number) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * num, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // Submit adds a job to the queue func (wp *WorkerPool) Submit(job Job) { wp.jobQueue <- job } // Results returns the result channel func (wp *WorkerPool) Results() <-chan Result { return wp.resultQueue } // Stop gracefully shuts down the worker pool func (wp *WorkerPool) Stop() { close(wp.quit) wp.wg.Wait() close(wp.jobQueue) close(wp.resultQueue) } func main() { // Create worker pool with 3 workers pool := NewWorkerPool(3, 10) pool.Start() defer pool.Stop() // Submit jobs go func() { for i := 1; i <= 10; i++ { job := Job{ ID: i, Data: i * 10, } pool.Submit(job) } }() // Collect results for i := 0; i < 10; i++ { result := <-pool.Results() if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d result: %v\n", result.JobID, result.Output) } } } Advanced Worker Pool with Context package main import ( "context" "fmt" "sync" "time" ) // ContextJob includes context for cancellation type ContextJob struct { ID string Data interface{} Context context.Context } // ContextResult includes timing and context information type ContextResult struct { JobID string Output interface{} Error error Duration time.Duration WorkerID int } // AdvancedWorkerPool supports context cancellation and monitoring type AdvancedWorkerPool struct { workerCount int jobQueue chan ContextJob resultQueue chan ContextResult ctx context.Context cancel context.CancelFunc wg sync.WaitGroup metrics *PoolMetrics } // PoolMetrics tracks worker pool performance type PoolMetrics struct { mu sync.RWMutex jobsProcessed int64 jobsFailed int64 totalDuration time.Duration activeWorkers int } func (pm *PoolMetrics) RecordJob(duration time.Duration, success bool) { pm.mu.Lock() defer pm.mu.Unlock() if success { pm.jobsProcessed++ } else { pm.jobsFailed++ } pm.totalDuration += duration } func (pm *PoolMetrics) SetActiveWorkers(count int) { pm.mu.Lock() defer pm.mu.Unlock() pm.activeWorkers = count } func (pm *PoolMetrics) GetStats() (processed, failed int64, avgDuration time.Duration, active int) { pm.mu.RLock() defer pm.mu.RUnlock() processed = pm.jobsProcessed failed = pm.jobsFailed active = pm.activeWorkers if pm.jobsProcessed > 0 { avgDuration = pm.totalDuration / time.Duration(pm.jobsProcessed) } return } // NewAdvancedWorkerPool creates a new advanced worker pool func NewAdvancedWorkerPool(ctx context.Context, workerCount, queueSize int) *AdvancedWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &AdvancedWorkerPool{ workerCount: workerCount, jobQueue: make(chan ContextJob, queueSize), resultQueue: make(chan ContextResult, queueSize), ctx: poolCtx, cancel: cancel, metrics: &PoolMetrics{}, } } // Start begins processing with all workers func (awp *AdvancedWorkerPool) Start() { awp.metrics.SetActiveWorkers(awp.workerCount) for i := 0; i < awp.workerCount; i++ { awp.wg.Add(1) go awp.worker(i) } // Start metrics reporter go awp.reportMetrics() } // worker processes jobs with context support func (awp *AdvancedWorkerPool) worker(id int) { defer awp.wg.Done() for { select { case job := <-awp.jobQueue: start := time.Now() result := awp.processContextJob(job, id) duration := time.Since(start) awp.metrics.RecordJob(duration, result.Error == nil) select { case awp.resultQueue <- result: case <-awp.ctx.Done(): return } case <-awp.ctx.Done(): fmt.Printf("Worker %d shutting down\n", id) return } } } // processContextJob handles job processing with context func (awp *AdvancedWorkerPool) processContextJob(job ContextJob, workerID int) ContextResult { start := time.Now() // Check if job context is already cancelled select { case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } default: } // Simulate work that respects context cancellation workDone := make(chan interface{}, 1) workErr := make(chan error, 1) go func() { // Simulate processing time time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond) if num, ok := job.Data.(int); ok { workDone <- num * num } else { workErr <- fmt.Errorf("invalid data type") } }() select { case result := <-workDone: return ContextResult{ JobID: job.ID, Output: result, Duration: time.Since(start), WorkerID: workerID, } case err := <-workErr: return ContextResult{ JobID: job.ID, Error: err, Duration: time.Since(start), WorkerID: workerID, } case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } case <-awp.ctx.Done(): return ContextResult{ JobID: job.ID, Error: awp.ctx.Err(), Duration: time.Since(start), WorkerID: workerID, } } } // Submit adds a job to the queue func (awp *AdvancedWorkerPool) Submit(job ContextJob) error { select { case awp.jobQueue <- job: return nil case <-awp.ctx.Done(): return awp.ctx.Err() } } // Results returns the result channel func (awp *AdvancedWorkerPool) Results() <-chan ContextResult { return awp.resultQueue } // reportMetrics periodically reports pool statistics func (awp *AdvancedWorkerPool) reportMetrics() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: processed, failed, avgDuration, active := awp.metrics.GetStats() fmt.Printf("Pool Stats - Processed: %d, Failed: %d, Avg Duration: %v, Active Workers: %d\n", processed, failed, avgDuration, active) case <-awp.ctx.Done(): return } } } // Stop gracefully shuts down the worker pool func (awp *AdvancedWorkerPool) Stop() { awp.cancel() awp.wg.Wait() close(awp.jobQueue) close(awp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 4, 20) pool.Start() defer pool.Stop() // Submit jobs with individual timeouts go func() { for i := 1; i <= 15; i++ { jobCtx, jobCancel := context.WithTimeout(ctx, 200*time.Millisecond) job := ContextJob{ ID: fmt.Sprintf("job-%d", i), Data: i * 5, Context: jobCtx, } if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) jobCancel() break } // Cancel some jobs early to demonstrate cancellation if i%5 == 0 { go func() { time.Sleep(50 * time.Millisecond) jobCancel() }() } else { defer jobCancel() } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %s failed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Error, result.Duration) } else { fmt.Printf("Job %s completed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Output, result.Duration) } if resultCount >= 15 { break } } } Dynamic Worker Pool package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // DynamicWorkerPool can scale workers up and down based on load type DynamicWorkerPool struct { minWorkers int maxWorkers int currentWorkers int64 jobQueue chan Job resultQueue chan Result ctx context.Context cancel context.CancelFunc wg sync.WaitGroup workerControl chan int // +1 to add worker, -1 to remove worker metrics *DynamicMetrics } // DynamicMetrics tracks load and performance for scaling decisions type DynamicMetrics struct { mu sync.RWMutex queueLength int64 avgProcessingTime time.Duration lastScaleTime time.Time scaleUpThreshold int scaleDownThreshold int } func (dm *DynamicMetrics) UpdateQueueLength(length int) { atomic.StoreInt64(&dm.queueLength, int64(length)) } func (dm *DynamicMetrics) GetQueueLength() int { return int(atomic.LoadInt64(&dm.queueLength)) } func (dm *DynamicMetrics) ShouldScaleUp(currentWorkers int, maxWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers < maxWorkers && dm.GetQueueLength() > dm.scaleUpThreshold && time.Since(dm.lastScaleTime) > 5*time.Second } func (dm *DynamicMetrics) ShouldScaleDown(currentWorkers int, minWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers > minWorkers && dm.GetQueueLength() < dm.scaleDownThreshold && time.Since(dm.lastScaleTime) > 10*time.Second } func (dm *DynamicMetrics) RecordScale() { dm.mu.Lock() defer dm.mu.Unlock() dm.lastScaleTime = time.Now() } // NewDynamicWorkerPool creates a new dynamic worker pool func NewDynamicWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &DynamicWorkerPool{ minWorkers: minWorkers, maxWorkers: maxWorkers, currentWorkers: 0, jobQueue: make(chan Job, queueSize), resultQueue: make(chan Result, queueSize), ctx: poolCtx, cancel: cancel, workerControl: make(chan int, maxWorkers), metrics: &DynamicMetrics{ scaleUpThreshold: queueSize / 2, scaleDownThreshold: queueSize / 4, }, } } // Start initializes the pool with minimum workers func (dwp *DynamicWorkerPool) Start() { // Start with minimum workers for i := 0; i < dwp.minWorkers; i++ { dwp.addWorker() } // Start the scaler go dwp.scaler() // Start queue monitor go dwp.queueMonitor() } // addWorker creates and starts a new worker func (dwp *DynamicWorkerPool) addWorker() { workerID := atomic.AddInt64(&dwp.currentWorkers, 1) dwp.wg.Add(1) go func(id int64) { defer dwp.wg.Done() defer atomic.AddInt64(&dwp.currentWorkers, -1) fmt.Printf("Worker %d started\n", id) for { select { case job := <-dwp.jobQueue: start := time.Now() result := dwp.processJob(job) duration := time.Since(start) fmt.Printf("Worker %d processed job %d in %v\n", id, job.ID, duration) select { case dwp.resultQueue <- result: case <-dwp.ctx.Done(): return } case <-dwp.ctx.Done(): fmt.Printf("Worker %d stopping\n", id) return } } }(workerID) } // processJob simulates job processing func (dwp *DynamicWorkerPool) processJob(job Job) Result { // Simulate variable processing time time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * 2, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // scaler monitors load and adjusts worker count func (dwp *DynamicWorkerPool) scaler() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: currentWorkers := int(atomic.LoadInt64(&dwp.currentWorkers)) queueLength := dwp.metrics.GetQueueLength() fmt.Printf("Scaler check - Workers: %d, Queue: %d\n", currentWorkers, queueLength) if dwp.metrics.ShouldScaleUp(currentWorkers, dwp.maxWorkers) { fmt.Printf("Scaling up: adding worker (current: %d)\n", currentWorkers) dwp.addWorker() dwp.metrics.RecordScale() } else if dwp.metrics.ShouldScaleDown(currentWorkers, dwp.minWorkers) { fmt.Printf("Scaling down: removing worker (current: %d)\n", currentWorkers) // Signal one worker to stop by closing context // In a real implementation, you might use a more sophisticated approach dwp.metrics.RecordScale() } case <-dwp.ctx.Done(): return } } } // queueMonitor tracks queue length for scaling decisions func (dwp *DynamicWorkerPool) queueMonitor() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: queueLength := len(dwp.jobQueue) dwp.metrics.UpdateQueueLength(queueLength) case <-dwp.ctx.Done(): return } } } // Submit adds a job to the queue func (dwp *DynamicWorkerPool) Submit(job Job) error { select { case dwp.jobQueue <- job: return nil case <-dwp.ctx.Done(): return dwp.ctx.Err() } } // Results returns the result channel func (dwp *DynamicWorkerPool) Results() <-chan Result { return dwp.resultQueue } // Stop gracefully shuts down the pool func (dwp *DynamicWorkerPool) Stop() { dwp.cancel() dwp.wg.Wait() close(dwp.jobQueue) close(dwp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() pool := NewDynamicWorkerPool(ctx, 2, 6, 20) pool.Start() defer pool.Stop() // Submit jobs in bursts to trigger scaling go func() { // Initial burst for i := 1; i <= 10; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(8 * time.Second) // Second burst for i := 11; i <= 25; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(5 * time.Second) // Final smaller batch for i := 26; i <= 30; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d completed: %v\n", result.JobID, result.Output) } if resultCount >= 30 { break } } } Best Practices Right-Size the Pool: Match worker count to available resources Monitor Performance: Track queue length, processing times, and throughput Handle Backpressure: Implement proper queue management Graceful Shutdown: Ensure all workers complete current jobs Error Handling: Isolate worker failures from the pool Resource Cleanup: Properly close channels and cancel contexts Load Balancing: Distribute work evenly across workers Common Pitfalls Too Many Workers: Creating more workers than CPU cores for CPU-bound tasks Unbounded Queues: Memory issues with unlimited job queues Worker Leaks: Not properly shutting down workers Blocking Operations: Long-running jobs blocking other work No Backpressure: Not handling queue overflow situations Testing Worker Pools package main import ( "context" "testing" "time" ) func TestWorkerPool(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 2, 5) pool.Start() defer pool.Stop() // Submit test jobs jobCount := 5 for i := 1; i <= jobCount; i++ { job := ContextJob{ ID: fmt.Sprintf("test-%d", i), Data: i, Context: ctx, } if err := pool.Submit(job); err != nil { t.Fatalf("Failed to submit job: %v", err) } } // Collect results results := make(map[string]ContextResult) for i := 0; i < jobCount; i++ { select { case result := <-pool.Results(): results[result.JobID] = result case <-time.After(2 * time.Second): t.Fatal("Timeout waiting for results") } } // Verify all jobs completed if len(results) != jobCount { t.Errorf("Expected %d results, got %d", jobCount, len(results)) } // Verify results are correct for i := 1; i <= jobCount; i++ { jobID := fmt.Sprintf("test-%d", i) result, exists := results[jobID] if !exists { t.Errorf("Missing result for job %s", jobID) continue } if result.Error != nil { t.Errorf("Job %s failed: %v", jobID, result.Error) continue } expected := i * i if result.Output != expected { t.Errorf("Job %s: expected %d, got %v", jobID, expected, result.Output) } } } The Worker Pool pattern is essential for building scalable, resource-efficient concurrent applications in Go. It provides controlled concurrency, predictable resource usage, and excellent performance characteristics for both CPU-bound and I/O-bound workloads. ...

    August 21, 2024 · 12 min · Rafiul Alam

    Request/Response Pattern in Go

    Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool → What is the Request/Response Pattern? The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation. Key Components: Request: Contains data and a response channel Response: Contains result data and/or error information Requester: Sends request and waits for response Responder: Processes request and sends response Real-World Use Cases Database Operations: Query execution with results API Gateways: Forwarding requests to microservices Cache Systems: Get/Set operations with confirmation File Operations: Read/Write with status feedback Validation Services: Input validation with results Authentication: Login requests with tokens Basic Request/Response Implementation package main import ( "fmt" "math/rand" "time" ) // Request represents a request with a response channel type Request struct { ID string Data interface{} Response chan Response } // Response represents the response to a request type Response struct { ID string Result interface{} Error error } // Server processes requests type Server struct { requests chan Request quit chan bool } // NewServer creates a new server func NewServer() *Server { return &Server{ requests: make(chan Request), quit: make(chan bool), } } // Start begins processing requests func (s *Server) Start() { go func() { for { select { case req := <-s.requests: s.processRequest(req) case <-s.quit: return } } }() } // processRequest handles a single request func (s *Server) processRequest(req Request) { // Simulate processing time time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the request (example: double the number) var response Response response.ID = req.ID if num, ok := req.Data.(int); ok { response.Result = num * 2 } else { response.Error = fmt.Errorf("invalid data type") } // Send response back req.Response <- response } // SendRequest sends a request and waits for response func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) { responseChan := make(chan Response, 1) request := Request{ ID: id, Data: data, Response: responseChan, } s.requests <- request // Wait for response response := <-responseChan return response.Result, response.Error } // Stop shuts down the server func (s *Server) Stop() { close(s.quit) } func main() { server := NewServer() server.Start() defer server.Stop() // Send multiple requests for i := 1; i <= 5; i++ { result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10) if err != nil { fmt.Printf("Request %d failed: %v\n", i, err) } else { fmt.Printf("Request %d result: %v\n", i, result) } } } Request/Response with Timeout package main import ( "context" "fmt" "math/rand" "time" ) // TimedRequest includes context for timeout handling type TimedRequest struct { ID string Data interface{} Response chan TimedResponse Context context.Context } // TimedResponse includes timing information type TimedResponse struct { ID string Result interface{} Error error Duration time.Duration Timestamp time.Time } // TimedServer processes requests with timeout support type TimedServer struct { requests chan TimedRequest quit chan bool } func NewTimedServer() *TimedServer { return &TimedServer{ requests: make(chan TimedRequest, 10), quit: make(chan bool), } } func (ts *TimedServer) Start() { go func() { for { select { case req := <-ts.requests: go ts.processTimedRequest(req) case <-ts.quit: return } } }() } func (ts *TimedServer) processTimedRequest(req TimedRequest) { start := time.Now() // Check if context is already cancelled select { case <-req.Context.Done(): ts.sendResponse(req, nil, req.Context.Err(), start) return default: } // Simulate work with random duration workDuration := time.Duration(rand.Intn(200)) * time.Millisecond select { case <-time.After(workDuration): // Work completed if num, ok := req.Data.(int); ok { ts.sendResponse(req, num*2, nil, start) } else { ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start) } case <-req.Context.Done(): // Context cancelled during work ts.sendResponse(req, nil, req.Context.Err(), start) } } func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) { response := TimedResponse{ ID: req.ID, Result: result, Error: err, Duration: time.Since(start), Timestamp: time.Now(), } select { case req.Response <- response: case <-req.Context.Done(): // Client no longer waiting } } // SendRequestWithTimeout sends a request with a timeout func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() responseChan := make(chan TimedResponse, 1) request := TimedRequest{ ID: id, Data: data, Response: responseChan, Context: ctx, } select { case ts.requests <- request: case <-ctx.Done(): return nil, ctx.Err() } select { case response := <-responseChan: fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration) return response.Result, response.Error case <-ctx.Done(): return nil, ctx.Err() } } func (ts *TimedServer) Stop() { close(ts.quit) } func main() { server := NewTimedServer() server.Start() defer server.Stop() // Send requests with different timeouts requests := []struct { id string data int timeout time.Duration }{ {"fast", 10, 300 * time.Millisecond}, {"medium", 20, 150 * time.Millisecond}, {"slow", 30, 50 * time.Millisecond}, // This might timeout } for _, req := range requests { result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout) if err != nil { fmt.Printf("Request %s failed: %v\n", req.id, err) } else { fmt.Printf("Request %s result: %v\n", req.id, result) } } } Future/Promise Pattern package main import ( "context" "fmt" "sync" "time" ) // Future represents a value that will be available in the future type Future struct { mu sync.Mutex done chan struct{} result interface{} err error computed bool } // NewFuture creates a new future func NewFuture() *Future { return &Future{ done: make(chan struct{}), } } // Set sets the future's value func (f *Future) Set(result interface{}, err error) { f.mu.Lock() defer f.mu.Unlock() if f.computed { return // Already set } f.result = result f.err = err f.computed = true close(f.done) } // Get waits for and returns the future's value func (f *Future) Get() (interface{}, error) { <-f.done return f.result, f.err } // GetWithTimeout waits for the value with a timeout func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-time.After(timeout): return nil, fmt.Errorf("timeout waiting for future") } } // GetWithContext waits for the value with context cancellation func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-ctx.Done(): return nil, ctx.Err() } } // IsReady returns true if the future has been computed func (f *Future) IsReady() bool { f.mu.Lock() defer f.mu.Unlock() return f.computed } // AsyncService demonstrates async operations with futures type AsyncService struct { workers chan struct{} } func NewAsyncService(maxWorkers int) *AsyncService { return &AsyncService{ workers: make(chan struct{}, maxWorkers), } } // ProcessAsync starts async processing and returns a future func (as *AsyncService) ProcessAsync(data interface{}) *Future { future := NewFuture() go func() { // Acquire worker slot as.workers <- struct{}{} defer func() { <-as.workers }() // Simulate processing time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) // Process data if num, ok := data.(int); ok { future.Set(num*num, nil) } else { future.Set(nil, fmt.Errorf("invalid data type")) } }() return future } func main() { service := NewAsyncService(3) // Start multiple async operations futures := make([]*Future, 5) for i := 0; i < 5; i++ { fmt.Printf("Starting async operation %d\n", i+1) futures[i] = service.ProcessAsync((i + 1) * 10) } // Wait for all results fmt.Println("\nWaiting for results...") for i, future := range futures { result, err := future.Get() if err != nil { fmt.Printf("Operation %d failed: %v\n", i+1, err) } else { fmt.Printf("Operation %d result: %v\n", i+1, result) } } // Example with timeout fmt.Println("\nTesting timeout...") timeoutFuture := service.ProcessAsync(100) result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond) if err != nil { fmt.Printf("Timeout example failed: %v\n", err) } else { fmt.Printf("Timeout example result: %v\n", result) } } Batch Request/Response package main import ( "fmt" "sync" "time" ) // BatchRequest represents multiple requests processed together type BatchRequest struct { ID string Items []interface{} Response chan BatchResponse } // BatchResponse contains results for all items in a batch type BatchResponse struct { ID string Results []BatchResult Error error } // BatchResult represents the result of processing one item type BatchResult struct { Index int Result interface{} Error error } // BatchProcessor processes requests in batches for efficiency type BatchProcessor struct { requests chan BatchRequest batchSize int batchWindow time.Duration quit chan bool } func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor { return &BatchProcessor{ requests: make(chan BatchRequest, 100), batchSize: batchSize, batchWindow: batchWindow, quit: make(chan bool), } } func (bp *BatchProcessor) Start() { go func() { batch := make([]BatchRequest, 0, bp.batchSize) timer := time.NewTimer(bp.batchWindow) timer.Stop() for { select { case req := <-bp.requests: batch = append(batch, req) if len(batch) == 1 { timer.Reset(bp.batchWindow) } if len(batch) >= bp.batchSize { bp.processBatch(batch) batch = batch[:0] timer.Stop() } case <-timer.C: if len(batch) > 0 { bp.processBatch(batch) batch = batch[:0] } case <-bp.quit: if len(batch) > 0 { bp.processBatch(batch) } return } } }() } func (bp *BatchProcessor) processBatch(batch []BatchRequest) { fmt.Printf("Processing batch of %d requests\n", len(batch)) var wg sync.WaitGroup for _, req := range batch { wg.Add(1) go func(r BatchRequest) { defer wg.Done() bp.processRequest(r) }(req) } wg.Wait() } func (bp *BatchProcessor) processRequest(req BatchRequest) { results := make([]BatchResult, len(req.Items)) for i, item := range req.Items { // Simulate processing each item time.Sleep(10 * time.Millisecond) if num, ok := item.(int); ok { results[i] = BatchResult{ Index: i, Result: num * 3, } } else { results[i] = BatchResult{ Index: i, Error: fmt.Errorf("invalid item type at index %d", i), } } } response := BatchResponse{ ID: req.ID, Results: results, } req.Response <- response } // SendBatchRequest sends a batch request and waits for response func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) { responseChan := make(chan BatchResponse, 1) request := BatchRequest{ ID: id, Items: items, Response: responseChan, } bp.requests <- request response := <-responseChan return response.Results, response.Error } func (bp *BatchProcessor) Stop() { close(bp.quit) } func main() { processor := NewBatchProcessor(3, 100*time.Millisecond) processor.Start() defer processor.Stop() // Send individual batch requests go func() { results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5}) if err != nil { fmt.Printf("Batch 1 failed: %v\n", err) return } fmt.Println("Batch 1 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() go func() { results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30}) if err != nil { fmt.Printf("Batch 2 failed: %v\n", err) return } fmt.Println("Batch 2 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() // Wait for processing time.Sleep(500 * time.Millisecond) } Best Practices Always Use Timeouts: Prevent indefinite blocking Handle Context Cancellation: Support graceful cancellation Buffer Response Channels: Avoid blocking responders Error Handling: Always include error information in responses Resource Cleanup: Ensure channels and goroutines are cleaned up Monitoring: Track request/response times and success rates Backpressure: Handle situations when responders are overwhelmed Common Pitfalls Deadlocks: Not buffering response channels Goroutine Leaks: Not handling context cancellation Memory Leaks: Not closing channels properly Blocking Operations: Long-running operations without timeouts Lost Responses: Not handling channel closure Testing Request/Response package main import ( "context" "testing" "time" ) func TestRequestResponse(t *testing.T) { server := NewTimedServer() server.Start() defer server.Stop() // Test successful request result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond) if err != nil { t.Fatalf("Request failed: %v", err) } if result != 84 { t.Errorf("Expected 84, got %v", result) } // Test timeout _, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond) if err == nil { t.Error("Expected timeout error") } } func TestFuture(t *testing.T) { future := NewFuture() // Test that future is not ready initially if future.IsReady() { t.Error("Future should not be ready initially") } // Set value in goroutine go func() { time.Sleep(50 * time.Millisecond) future.Set("test result", nil) }() // Get value result, err := future.Get() if err != nil { t.Fatalf("Future failed: %v", err) } if result != "test result" { t.Errorf("Expected 'test result', got %v", result) } // Test that future is ready after setting if !future.IsReady() { t.Error("Future should be ready after setting") } } The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation. ...

    July 31, 2024 · 10 min · Rafiul Alam