Worker Pool Pattern in Go

    Go Concurrency Patterns Series: ← Request/Response | Series Overview | Mutex Patterns → What is the Worker Pool Pattern? The Worker Pool pattern manages a fixed number of worker goroutines that process jobs from a shared queue. This pattern is essential for controlling resource usage, preventing system overload, and ensuring predictable performance under varying loads. Key Components: Job Queue: Channel containing work to be processed Worker Pool: Fixed number of worker goroutines Result Channel: Optional channel for collecting results Dispatcher: Coordinates job distribution to workers Real-World Use Cases Image Processing: Resize/compress images with limited CPU cores Database Operations: Limit concurrent database connections API Rate Limiting: Control outbound API call rates File Processing: Process files with bounded I/O operations Web Scraping: Limit concurrent HTTP requests Background Jobs: Process queued tasks with resource limits Basic Worker Pool Implementation package main import ( "fmt" "math/rand" "sync" "time" ) // Job represents work to be processed type Job struct { ID int Data interface{} } // Result represents the outcome of processing a job type Result struct { JobID int Output interface{} Error error } // WorkerPool manages a pool of workers type WorkerPool struct { workerCount int jobQueue chan Job resultQueue chan Result quit chan bool wg sync.WaitGroup } // NewWorkerPool creates a new worker pool func NewWorkerPool(workerCount, jobQueueSize int) *WorkerPool { return &WorkerPool{ workerCount: workerCount, jobQueue: make(chan Job, jobQueueSize), resultQueue: make(chan Result, jobQueueSize), quit: make(chan bool), } } // Start initializes and starts all workers func (wp *WorkerPool) Start() { for i := 0; i < wp.workerCount; i++ { wp.wg.Add(1) go wp.worker(i) } } // worker processes jobs from the job queue func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() for { select { case job := <-wp.jobQueue: fmt.Printf("Worker %d processing job %d\n", id, job.ID) result := wp.processJob(job) wp.resultQueue <- result case <-wp.quit: fmt.Printf("Worker %d stopping\n", id) return } } } // processJob simulates job processing func (wp *WorkerPool) processJob(job Job) Result { // Simulate work time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the job (example: square the number) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * num, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // Submit adds a job to the queue func (wp *WorkerPool) Submit(job Job) { wp.jobQueue <- job } // Results returns the result channel func (wp *WorkerPool) Results() <-chan Result { return wp.resultQueue } // Stop gracefully shuts down the worker pool func (wp *WorkerPool) Stop() { close(wp.quit) wp.wg.Wait() close(wp.jobQueue) close(wp.resultQueue) } func main() { // Create worker pool with 3 workers pool := NewWorkerPool(3, 10) pool.Start() defer pool.Stop() // Submit jobs go func() { for i := 1; i <= 10; i++ { job := Job{ ID: i, Data: i * 10, } pool.Submit(job) } }() // Collect results for i := 0; i < 10; i++ { result := <-pool.Results() if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d result: %v\n", result.JobID, result.Output) } } } Advanced Worker Pool with Context package main import ( "context" "fmt" "sync" "time" ) // ContextJob includes context for cancellation type ContextJob struct { ID string Data interface{} Context context.Context } // ContextResult includes timing and context information type ContextResult struct { JobID string Output interface{} Error error Duration time.Duration WorkerID int } // AdvancedWorkerPool supports context cancellation and monitoring type AdvancedWorkerPool struct { workerCount int jobQueue chan ContextJob resultQueue chan ContextResult ctx context.Context cancel context.CancelFunc wg sync.WaitGroup metrics *PoolMetrics } // PoolMetrics tracks worker pool performance type PoolMetrics struct { mu sync.RWMutex jobsProcessed int64 jobsFailed int64 totalDuration time.Duration activeWorkers int } func (pm *PoolMetrics) RecordJob(duration time.Duration, success bool) { pm.mu.Lock() defer pm.mu.Unlock() if success { pm.jobsProcessed++ } else { pm.jobsFailed++ } pm.totalDuration += duration } func (pm *PoolMetrics) SetActiveWorkers(count int) { pm.mu.Lock() defer pm.mu.Unlock() pm.activeWorkers = count } func (pm *PoolMetrics) GetStats() (processed, failed int64, avgDuration time.Duration, active int) { pm.mu.RLock() defer pm.mu.RUnlock() processed = pm.jobsProcessed failed = pm.jobsFailed active = pm.activeWorkers if pm.jobsProcessed > 0 { avgDuration = pm.totalDuration / time.Duration(pm.jobsProcessed) } return } // NewAdvancedWorkerPool creates a new advanced worker pool func NewAdvancedWorkerPool(ctx context.Context, workerCount, queueSize int) *AdvancedWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &AdvancedWorkerPool{ workerCount: workerCount, jobQueue: make(chan ContextJob, queueSize), resultQueue: make(chan ContextResult, queueSize), ctx: poolCtx, cancel: cancel, metrics: &PoolMetrics{}, } } // Start begins processing with all workers func (awp *AdvancedWorkerPool) Start() { awp.metrics.SetActiveWorkers(awp.workerCount) for i := 0; i < awp.workerCount; i++ { awp.wg.Add(1) go awp.worker(i) } // Start metrics reporter go awp.reportMetrics() } // worker processes jobs with context support func (awp *AdvancedWorkerPool) worker(id int) { defer awp.wg.Done() for { select { case job := <-awp.jobQueue: start := time.Now() result := awp.processContextJob(job, id) duration := time.Since(start) awp.metrics.RecordJob(duration, result.Error == nil) select { case awp.resultQueue <- result: case <-awp.ctx.Done(): return } case <-awp.ctx.Done(): fmt.Printf("Worker %d shutting down\n", id) return } } } // processContextJob handles job processing with context func (awp *AdvancedWorkerPool) processContextJob(job ContextJob, workerID int) ContextResult { start := time.Now() // Check if job context is already cancelled select { case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } default: } // Simulate work that respects context cancellation workDone := make(chan interface{}, 1) workErr := make(chan error, 1) go func() { // Simulate processing time time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond) if num, ok := job.Data.(int); ok { workDone <- num * num } else { workErr <- fmt.Errorf("invalid data type") } }() select { case result := <-workDone: return ContextResult{ JobID: job.ID, Output: result, Duration: time.Since(start), WorkerID: workerID, } case err := <-workErr: return ContextResult{ JobID: job.ID, Error: err, Duration: time.Since(start), WorkerID: workerID, } case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } case <-awp.ctx.Done(): return ContextResult{ JobID: job.ID, Error: awp.ctx.Err(), Duration: time.Since(start), WorkerID: workerID, } } } // Submit adds a job to the queue func (awp *AdvancedWorkerPool) Submit(job ContextJob) error { select { case awp.jobQueue <- job: return nil case <-awp.ctx.Done(): return awp.ctx.Err() } } // Results returns the result channel func (awp *AdvancedWorkerPool) Results() <-chan ContextResult { return awp.resultQueue } // reportMetrics periodically reports pool statistics func (awp *AdvancedWorkerPool) reportMetrics() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: processed, failed, avgDuration, active := awp.metrics.GetStats() fmt.Printf("Pool Stats - Processed: %d, Failed: %d, Avg Duration: %v, Active Workers: %d\n", processed, failed, avgDuration, active) case <-awp.ctx.Done(): return } } } // Stop gracefully shuts down the worker pool func (awp *AdvancedWorkerPool) Stop() { awp.cancel() awp.wg.Wait() close(awp.jobQueue) close(awp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 4, 20) pool.Start() defer pool.Stop() // Submit jobs with individual timeouts go func() { for i := 1; i <= 15; i++ { jobCtx, jobCancel := context.WithTimeout(ctx, 200*time.Millisecond) job := ContextJob{ ID: fmt.Sprintf("job-%d", i), Data: i * 5, Context: jobCtx, } if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) jobCancel() break } // Cancel some jobs early to demonstrate cancellation if i%5 == 0 { go func() { time.Sleep(50 * time.Millisecond) jobCancel() }() } else { defer jobCancel() } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %s failed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Error, result.Duration) } else { fmt.Printf("Job %s completed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Output, result.Duration) } if resultCount >= 15 { break } } } Dynamic Worker Pool package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // DynamicWorkerPool can scale workers up and down based on load type DynamicWorkerPool struct { minWorkers int maxWorkers int currentWorkers int64 jobQueue chan Job resultQueue chan Result ctx context.Context cancel context.CancelFunc wg sync.WaitGroup workerControl chan int // +1 to add worker, -1 to remove worker metrics *DynamicMetrics } // DynamicMetrics tracks load and performance for scaling decisions type DynamicMetrics struct { mu sync.RWMutex queueLength int64 avgProcessingTime time.Duration lastScaleTime time.Time scaleUpThreshold int scaleDownThreshold int } func (dm *DynamicMetrics) UpdateQueueLength(length int) { atomic.StoreInt64(&dm.queueLength, int64(length)) } func (dm *DynamicMetrics) GetQueueLength() int { return int(atomic.LoadInt64(&dm.queueLength)) } func (dm *DynamicMetrics) ShouldScaleUp(currentWorkers int, maxWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers < maxWorkers && dm.GetQueueLength() > dm.scaleUpThreshold && time.Since(dm.lastScaleTime) > 5*time.Second } func (dm *DynamicMetrics) ShouldScaleDown(currentWorkers int, minWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers > minWorkers && dm.GetQueueLength() < dm.scaleDownThreshold && time.Since(dm.lastScaleTime) > 10*time.Second } func (dm *DynamicMetrics) RecordScale() { dm.mu.Lock() defer dm.mu.Unlock() dm.lastScaleTime = time.Now() } // NewDynamicWorkerPool creates a new dynamic worker pool func NewDynamicWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &DynamicWorkerPool{ minWorkers: minWorkers, maxWorkers: maxWorkers, currentWorkers: 0, jobQueue: make(chan Job, queueSize), resultQueue: make(chan Result, queueSize), ctx: poolCtx, cancel: cancel, workerControl: make(chan int, maxWorkers), metrics: &DynamicMetrics{ scaleUpThreshold: queueSize / 2, scaleDownThreshold: queueSize / 4, }, } } // Start initializes the pool with minimum workers func (dwp *DynamicWorkerPool) Start() { // Start with minimum workers for i := 0; i < dwp.minWorkers; i++ { dwp.addWorker() } // Start the scaler go dwp.scaler() // Start queue monitor go dwp.queueMonitor() } // addWorker creates and starts a new worker func (dwp *DynamicWorkerPool) addWorker() { workerID := atomic.AddInt64(&dwp.currentWorkers, 1) dwp.wg.Add(1) go func(id int64) { defer dwp.wg.Done() defer atomic.AddInt64(&dwp.currentWorkers, -1) fmt.Printf("Worker %d started\n", id) for { select { case job := <-dwp.jobQueue: start := time.Now() result := dwp.processJob(job) duration := time.Since(start) fmt.Printf("Worker %d processed job %d in %v\n", id, job.ID, duration) select { case dwp.resultQueue <- result: case <-dwp.ctx.Done(): return } case <-dwp.ctx.Done(): fmt.Printf("Worker %d stopping\n", id) return } } }(workerID) } // processJob simulates job processing func (dwp *DynamicWorkerPool) processJob(job Job) Result { // Simulate variable processing time time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * 2, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // scaler monitors load and adjusts worker count func (dwp *DynamicWorkerPool) scaler() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: currentWorkers := int(atomic.LoadInt64(&dwp.currentWorkers)) queueLength := dwp.metrics.GetQueueLength() fmt.Printf("Scaler check - Workers: %d, Queue: %d\n", currentWorkers, queueLength) if dwp.metrics.ShouldScaleUp(currentWorkers, dwp.maxWorkers) { fmt.Printf("Scaling up: adding worker (current: %d)\n", currentWorkers) dwp.addWorker() dwp.metrics.RecordScale() } else if dwp.metrics.ShouldScaleDown(currentWorkers, dwp.minWorkers) { fmt.Printf("Scaling down: removing worker (current: %d)\n", currentWorkers) // Signal one worker to stop by closing context // In a real implementation, you might use a more sophisticated approach dwp.metrics.RecordScale() } case <-dwp.ctx.Done(): return } } } // queueMonitor tracks queue length for scaling decisions func (dwp *DynamicWorkerPool) queueMonitor() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: queueLength := len(dwp.jobQueue) dwp.metrics.UpdateQueueLength(queueLength) case <-dwp.ctx.Done(): return } } } // Submit adds a job to the queue func (dwp *DynamicWorkerPool) Submit(job Job) error { select { case dwp.jobQueue <- job: return nil case <-dwp.ctx.Done(): return dwp.ctx.Err() } } // Results returns the result channel func (dwp *DynamicWorkerPool) Results() <-chan Result { return dwp.resultQueue } // Stop gracefully shuts down the pool func (dwp *DynamicWorkerPool) Stop() { dwp.cancel() dwp.wg.Wait() close(dwp.jobQueue) close(dwp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() pool := NewDynamicWorkerPool(ctx, 2, 6, 20) pool.Start() defer pool.Stop() // Submit jobs in bursts to trigger scaling go func() { // Initial burst for i := 1; i <= 10; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(8 * time.Second) // Second burst for i := 11; i <= 25; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(5 * time.Second) // Final smaller batch for i := 26; i <= 30; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d completed: %v\n", result.JobID, result.Output) } if resultCount >= 30 { break } } } Best Practices Right-Size the Pool: Match worker count to available resources Monitor Performance: Track queue length, processing times, and throughput Handle Backpressure: Implement proper queue management Graceful Shutdown: Ensure all workers complete current jobs Error Handling: Isolate worker failures from the pool Resource Cleanup: Properly close channels and cancel contexts Load Balancing: Distribute work evenly across workers Common Pitfalls Too Many Workers: Creating more workers than CPU cores for CPU-bound tasks Unbounded Queues: Memory issues with unlimited job queues Worker Leaks: Not properly shutting down workers Blocking Operations: Long-running jobs blocking other work No Backpressure: Not handling queue overflow situations Testing Worker Pools package main import ( "context" "testing" "time" ) func TestWorkerPool(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 2, 5) pool.Start() defer pool.Stop() // Submit test jobs jobCount := 5 for i := 1; i <= jobCount; i++ { job := ContextJob{ ID: fmt.Sprintf("test-%d", i), Data: i, Context: ctx, } if err := pool.Submit(job); err != nil { t.Fatalf("Failed to submit job: %v", err) } } // Collect results results := make(map[string]ContextResult) for i := 0; i < jobCount; i++ { select { case result := <-pool.Results(): results[result.JobID] = result case <-time.After(2 * time.Second): t.Fatal("Timeout waiting for results") } } // Verify all jobs completed if len(results) != jobCount { t.Errorf("Expected %d results, got %d", jobCount, len(results)) } // Verify results are correct for i := 1; i <= jobCount; i++ { jobID := fmt.Sprintf("test-%d", i) result, exists := results[jobID] if !exists { t.Errorf("Missing result for job %s", jobID) continue } if result.Error != nil { t.Errorf("Job %s failed: %v", jobID, result.Error) continue } expected := i * i if result.Output != expected { t.Errorf("Job %s: expected %d, got %v", jobID, expected, result.Output) } } } The Worker Pool pattern is essential for building scalable, resource-efficient concurrent applications in Go. It provides controlled concurrency, predictable resource usage, and excellent performance characteristics for both CPU-bound and I/O-bound workloads. ...

    August 21, 2024 · 12 min · Rafiul Alam

    Semaphore Pattern in Go

    Go Concurrency Patterns Series: ← Rate Limiter | Series Overview | Actor Model → What is the Semaphore Pattern? A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available. Types: Binary Semaphore: Acts like a mutex (0 or 1) Counting Semaphore: Allows N concurrent accesses Weighted Semaphore: Resources have different weights/costs Real-World Use Cases Connection Pools: Limit database/HTTP connections Resource Management: Control access to limited resources Download Managers: Limit concurrent downloads API Rate Limiting: Control concurrent API calls Worker Pools: Limit concurrent workers Memory Management: Control memory-intensive operations Basic Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // Semaphore implements a counting semaphore type Semaphore struct { ch chan struct{} } // NewSemaphore creates a new semaphore with given capacity func NewSemaphore(capacity int) *Semaphore { return &Semaphore{ ch: make(chan struct{}, capacity), } } // Acquire acquires a resource from the semaphore func (s *Semaphore) Acquire() { s.ch <- struct{}{} } // TryAcquire tries to acquire a resource without blocking func (s *Semaphore) TryAcquire() bool { select { case s.ch <- struct{}{}: return true default: return false } } // AcquireWithContext acquires a resource with context cancellation func (s *Semaphore) AcquireWithContext(ctx context.Context) error { select { case s.ch <- struct{}{}: return nil case <-ctx.Done(): return ctx.Err() } } // Release releases a resource back to the semaphore func (s *Semaphore) Release() { <-s.ch } // Available returns the number of available resources func (s *Semaphore) Available() int { return cap(s.ch) - len(s.ch) } // Used returns the number of used resources func (s *Semaphore) Used() int { return len(s.ch) } // Capacity returns the total capacity func (s *Semaphore) Capacity() int { return cap(s.ch) } // simulateWork simulates work that requires a resource func simulateWork(id int, duration time.Duration, sem *Semaphore) { fmt.Printf("Worker %d: Requesting resource...\n", id) sem.Acquire() fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) // Simulate work time.Sleep(duration) sem.Release() fmt.Printf("Worker %d: Released resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) } func main() { // Create semaphore with capacity of 3 sem := NewSemaphore(3) fmt.Println("=== Basic Semaphore Demo ===") fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity()) var wg sync.WaitGroup // Start 6 workers, but only 3 can work concurrently for i := 1; i <= 6; i++ { wg.Add(1) go func(id int) { defer wg.Done() simulateWork(id, time.Duration(1+id%3)*time.Second, sem) }(i) time.Sleep(200 * time.Millisecond) // Stagger starts } wg.Wait() fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity()) } Advanced Semaphore with Timeout and Context package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // AdvancedSemaphore provides additional features like metrics and timeouts type AdvancedSemaphore struct { ch chan struct{} capacity int // Metrics totalAcquires int64 totalReleases int64 timeouts int64 cancellations int64 // Monitoring mu sync.RWMutex waitingGoroutines int } // NewAdvancedSemaphore creates a new advanced semaphore func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore { return &AdvancedSemaphore{ ch: make(chan struct{}, capacity), capacity: capacity, } } // Acquire acquires a resource (blocking) func (as *AdvancedSemaphore) Acquire() { as.incrementWaiting() defer as.decrementWaiting() as.ch <- struct{}{} atomic.AddInt64(&as.totalAcquires, 1) } // TryAcquire tries to acquire without blocking func (as *AdvancedSemaphore) TryAcquire() bool { select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return true default: return false } } // AcquireWithTimeout acquires with a timeout func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-time.After(timeout): atomic.AddInt64(&as.timeouts, 1) return fmt.Errorf("timeout after %v", timeout) } } // AcquireWithContext acquires with context cancellation func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-ctx.Done(): atomic.AddInt64(&as.cancellations, 1) return ctx.Err() } } // Release releases a resource func (as *AdvancedSemaphore) Release() { <-as.ch atomic.AddInt64(&as.totalReleases, 1) } // incrementWaiting increments waiting goroutines counter func (as *AdvancedSemaphore) incrementWaiting() { as.mu.Lock() as.waitingGoroutines++ as.mu.Unlock() } // decrementWaiting decrements waiting goroutines counter func (as *AdvancedSemaphore) decrementWaiting() { as.mu.Lock() as.waitingGoroutines-- as.mu.Unlock() } // GetStats returns semaphore statistics func (as *AdvancedSemaphore) GetStats() map[string]interface{} { as.mu.RLock() waiting := as.waitingGoroutines as.mu.RUnlock() return map[string]interface{}{ "capacity": as.capacity, "available": as.Available(), "used": as.Used(), "waiting": waiting, "total_acquires": atomic.LoadInt64(&as.totalAcquires), "total_releases": atomic.LoadInt64(&as.totalReleases), "timeouts": atomic.LoadInt64(&as.timeouts), "cancellations": atomic.LoadInt64(&as.cancellations), } } // Available returns available resources func (as *AdvancedSemaphore) Available() int { return cap(as.ch) - len(as.ch) } // Used returns used resources func (as *AdvancedSemaphore) Used() int { return len(as.ch) } // Capacity returns total capacity func (as *AdvancedSemaphore) Capacity() int { return as.capacity } // ResourceManager demonstrates semaphore usage for resource management type ResourceManager struct { semaphore *AdvancedSemaphore resources []string } // NewResourceManager creates a new resource manager func NewResourceManager(resources []string) *ResourceManager { return &ResourceManager{ semaphore: NewAdvancedSemaphore(len(resources)), resources: resources, } } // UseResource uses a resource with timeout func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error { fmt.Printf("User %s: Requesting resource...\n", userID) // Try to acquire with timeout if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil { fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err) return err } defer rm.semaphore.Release() resourceIndex := rm.semaphore.Used() - 1 resourceName := rm.resources[resourceIndex] fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName) // Simulate resource usage select { case <-time.After(time.Duration(1+len(userID)%3) * time.Second): fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName) return nil case <-ctx.Done(): fmt.Printf("User %s: Resource usage cancelled\n", userID) return ctx.Err() } } // GetStats returns resource manager statistics func (rm *ResourceManager) GetStats() map[string]interface{} { return rm.semaphore.GetStats() } func main() { resources := []string{"Database-1", "Database-2", "API-Gateway"} manager := NewResourceManager(resources) fmt.Println("=== Advanced Semaphore Demo ===") fmt.Printf("Available resources: %v\n\n", resources) // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := manager.GetStats() fmt.Printf(" Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n", stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"]) } }() var wg sync.WaitGroup // Simulate users requesting resources users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"} for i, user := range users { wg.Add(1) go func(userID string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Some users have shorter timeouts timeout := 3 * time.Second if len(userID)%2 == 0 { timeout = 1 * time.Second } err := manager.UseResource(ctx, userID, timeout) if err != nil { fmt.Printf(" User %s failed: %v\n", userID, err) } }(user, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := manager.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Weighted Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // WeightedSemaphore allows acquiring resources with different weights type WeightedSemaphore struct { mu sync.Mutex capacity int64 current int64 waiters []waiter } // waiter represents a goroutine waiting for resources type waiter struct { weight int64 ready chan struct{} } // NewWeightedSemaphore creates a new weighted semaphore func NewWeightedSemaphore(capacity int64) *WeightedSemaphore { return &WeightedSemaphore{ capacity: capacity, waiters: make([]waiter, 0), } } // Acquire acquires resources with given weight func (ws *WeightedSemaphore) Acquire(weight int64) { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() <-ready } // TryAcquire tries to acquire resources without blocking func (ws *WeightedSemaphore) TryAcquire(weight int64) bool { ws.mu.Lock() defer ws.mu.Unlock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { ws.current += weight return true } return false } // AcquireWithContext acquires resources with context cancellation func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return nil } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() select { case <-ready: return nil case <-ctx.Done(): // Remove from waiters list ws.mu.Lock() for i, w := range ws.waiters { if w.ready == ready { ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) break } } ws.mu.Unlock() return ctx.Err() } } // Release releases resources with given weight func (ws *WeightedSemaphore) Release(weight int64) { ws.mu.Lock() defer ws.mu.Unlock() ws.current -= weight ws.notifyWaiters() } // notifyWaiters notifies waiting goroutines that can now proceed func (ws *WeightedSemaphore) notifyWaiters() { for i := 0; i < len(ws.waiters); { w := ws.waiters[i] if ws.current+w.weight <= ws.capacity { // This waiter can proceed ws.current += w.weight close(w.ready) // Remove from waiters ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) } else { i++ } } } // GetStats returns current statistics func (ws *WeightedSemaphore) GetStats() map[string]interface{} { ws.mu.Lock() defer ws.mu.Unlock() return map[string]interface{}{ "capacity": ws.capacity, "current": ws.current, "available": ws.capacity - ws.current, "waiters": len(ws.waiters), } } // Task represents a task with resource requirements type Task struct { ID string Weight int64 Duration time.Duration } // TaskProcessor processes tasks using weighted semaphore type TaskProcessor struct { semaphore *WeightedSemaphore } // NewTaskProcessor creates a new task processor func NewTaskProcessor(capacity int64) *TaskProcessor { return &TaskProcessor{ semaphore: NewWeightedSemaphore(capacity), } } // ProcessTask processes a task func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error { fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight) if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil { fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err) return err } defer tp.semaphore.Release(task.Weight) stats := tp.semaphore.GetStats() fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n", task.ID, task.Weight, stats["available"], stats["capacity"]) // Simulate task processing select { case <-time.After(task.Duration): fmt.Printf("Task %s: Completed\n", task.ID) return nil case <-ctx.Done(): fmt.Printf("Task %s: Cancelled\n", task.ID) return ctx.Err() } } // GetStats returns processor statistics func (tp *TaskProcessor) GetStats() map[string]interface{} { return tp.semaphore.GetStats() } func main() { // Create weighted semaphore with capacity of 10 units processor := NewTaskProcessor(10) fmt.Println("=== Weighted Semaphore Demo ===") fmt.Println("Total capacity: 10 units") // Define tasks with different resource requirements tasks := []Task{ {"Small-1", 2, 2 * time.Second}, {"Medium-1", 4, 3 * time.Second}, {"Large-1", 6, 4 * time.Second}, {"Small-2", 1, 1 * time.Second}, {"Small-3", 2, 2 * time.Second}, {"Medium-2", 5, 3 * time.Second}, {"Large-2", 8, 5 * time.Second}, } // Start monitoring go func() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for range ticker.C { stats := processor.GetStats() fmt.Printf(" Resources: %d/%d used, %d waiters\n", stats["current"], stats["capacity"], stats["waiters"]) } }() var wg sync.WaitGroup // Process tasks concurrently for i, task := range tasks { wg.Add(1) go func(t Task, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger task starts ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := processor.ProcessTask(ctx, t) if err != nil { fmt.Printf(" Task %s failed: %v\n", t.ID, err) } }(task, time.Duration(i*200)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := processor.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Semaphore-based Connection Pool package main import ( "context" "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int InUse bool LastUsed time.Time } // ConnectionPool manages database connections using semaphore type ConnectionPool struct { connections []*Connection semaphore *AdvancedSemaphore mu sync.Mutex } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { connections := make([]*Connection, size) for i := 0; i < size; i++ { connections[i] = &Connection{ ID: i + 1, InUse: false, LastUsed: time.Now(), } } return &ConnectionPool{ connections: connections, semaphore: NewAdvancedSemaphore(size), } } // GetConnection acquires a connection from the pool func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) { if err := cp.semaphore.AcquireWithContext(ctx); err != nil { return nil, err } cp.mu.Lock() defer cp.mu.Unlock() // Find an available connection for _, conn := range cp.connections { if !conn.InUse { conn.InUse = true conn.LastUsed = time.Now() return conn, nil } } // This shouldn't happen if semaphore is working correctly cp.semaphore.Release() return nil, fmt.Errorf("no available connections") } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { cp.mu.Lock() conn.InUse = false conn.LastUsed = time.Now() cp.mu.Unlock() cp.semaphore.Release() } // GetStats returns pool statistics func (cp *ConnectionPool) GetStats() map[string]interface{} { cp.mu.Lock() defer cp.mu.Unlock() inUse := 0 for _, conn := range cp.connections { if conn.InUse { inUse++ } } semStats := cp.semaphore.GetStats() return map[string]interface{}{ "total_connections": len(cp.connections), "in_use": inUse, "available": len(cp.connections) - inUse, "semaphore_stats": semStats, } } // DatabaseService simulates a service using the connection pool type DatabaseService struct { pool *ConnectionPool } // NewDatabaseService creates a new database service func NewDatabaseService(poolSize int) *DatabaseService { return &DatabaseService{ pool: NewConnectionPool(poolSize), } } // ExecuteQuery simulates executing a database query func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error { fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query) conn, err := ds.pool.GetConnection(ctx) if err != nil { fmt.Printf("User %s: Failed to get connection: %v\n", userID, err) return err } defer ds.pool.ReturnConnection(conn) fmt.Printf("User %s: Using connection %d\n", userID, conn.ID) // Simulate query execution queryDuration := time.Duration(500+len(query)*10) * time.Millisecond select { case <-time.After(queryDuration): fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID) return nil case <-ctx.Done(): fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID) return ctx.Err() } } // GetStats returns service statistics func (ds *DatabaseService) GetStats() map[string]interface{} { return ds.pool.GetStats() } func main() { // Create database service with 3 connections service := NewDatabaseService(3) fmt.Println("=== Connection Pool Demo ===") fmt.Println("Pool size: 3 connections") // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := service.GetStats() fmt.Printf(" Pool: %d/%d in use, %d available\n", stats["in_use"], stats["total_connections"], stats["available"]) } }() var wg sync.WaitGroup // Simulate multiple users making database queries users := []struct { id string query string }{ {"Alice", "SELECT * FROM users"}, {"Bob", "SELECT * FROM orders WHERE user_id = 123"}, {"Charlie", "UPDATE users SET last_login = NOW()"}, {"Diana", "SELECT COUNT(*) FROM products"}, {"Eve", "INSERT INTO logs (message) VALUES ('test')"}, {"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"}, } for i, user := range users { wg.Add(1) go func(userID, query string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := service.ExecuteQuery(ctx, userID, query) if err != nil { fmt.Printf(" User %s query failed: %v\n", userID, err) } }(user.id, user.query, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetStats() for key, value := range stats { if key == "semaphore_stats" { fmt.Printf("%s:\n", key) semStats := value.(map[string]interface{}) for k, v := range semStats { fmt.Printf(" %s: %v\n", k, v) } } else { fmt.Printf("%s: %v\n", key, value) } } } Best Practices Choose Right Capacity: Set semaphore capacity based on available resources Always Release: Use defer to ensure resources are released Handle Context: Support cancellation in long-running operations Monitor Usage: Track semaphore statistics and resource utilization Avoid Deadlocks: Don’t acquire multiple semaphores in different orders Use Timeouts: Prevent indefinite blocking with timeouts Consider Weighted: Use weighted semaphores for resources with different costs Common Pitfalls Resource Leaks: Forgetting to release acquired resources Deadlocks: Circular dependencies between semaphores Starvation: Large requests blocking smaller ones indefinitely Over-allocation: Setting capacity higher than actual resources Under-utilization: Setting capacity too low for available resources The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines. ...

    August 7, 2024 · 12 min · Rafiul Alam