Semaphore Pattern in Go

    Go Concurrency Patterns Series: ← Rate Limiter | Series Overview | Actor Model → What is the Semaphore Pattern? A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available. Types: Binary Semaphore: Acts like a mutex (0 or 1) Counting Semaphore: Allows N concurrent accesses Weighted Semaphore: Resources have different weights/costs Real-World Use Cases Connection Pools: Limit database/HTTP connections Resource Management: Control access to limited resources Download Managers: Limit concurrent downloads API Rate Limiting: Control concurrent API calls Worker Pools: Limit concurrent workers Memory Management: Control memory-intensive operations Basic Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // Semaphore implements a counting semaphore type Semaphore struct { ch chan struct{} } // NewSemaphore creates a new semaphore with given capacity func NewSemaphore(capacity int) *Semaphore { return &Semaphore{ ch: make(chan struct{}, capacity), } } // Acquire acquires a resource from the semaphore func (s *Semaphore) Acquire() { s.ch <- struct{}{} } // TryAcquire tries to acquire a resource without blocking func (s *Semaphore) TryAcquire() bool { select { case s.ch <- struct{}{}: return true default: return false } } // AcquireWithContext acquires a resource with context cancellation func (s *Semaphore) AcquireWithContext(ctx context.Context) error { select { case s.ch <- struct{}{}: return nil case <-ctx.Done(): return ctx.Err() } } // Release releases a resource back to the semaphore func (s *Semaphore) Release() { <-s.ch } // Available returns the number of available resources func (s *Semaphore) Available() int { return cap(s.ch) - len(s.ch) } // Used returns the number of used resources func (s *Semaphore) Used() int { return len(s.ch) } // Capacity returns the total capacity func (s *Semaphore) Capacity() int { return cap(s.ch) } // simulateWork simulates work that requires a resource func simulateWork(id int, duration time.Duration, sem *Semaphore) { fmt.Printf("Worker %d: Requesting resource...\n", id) sem.Acquire() fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) // Simulate work time.Sleep(duration) sem.Release() fmt.Printf("Worker %d: Released resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) } func main() { // Create semaphore with capacity of 3 sem := NewSemaphore(3) fmt.Println("=== Basic Semaphore Demo ===") fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity()) var wg sync.WaitGroup // Start 6 workers, but only 3 can work concurrently for i := 1; i <= 6; i++ { wg.Add(1) go func(id int) { defer wg.Done() simulateWork(id, time.Duration(1+id%3)*time.Second, sem) }(i) time.Sleep(200 * time.Millisecond) // Stagger starts } wg.Wait() fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity()) } Advanced Semaphore with Timeout and Context package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // AdvancedSemaphore provides additional features like metrics and timeouts type AdvancedSemaphore struct { ch chan struct{} capacity int // Metrics totalAcquires int64 totalReleases int64 timeouts int64 cancellations int64 // Monitoring mu sync.RWMutex waitingGoroutines int } // NewAdvancedSemaphore creates a new advanced semaphore func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore { return &AdvancedSemaphore{ ch: make(chan struct{}, capacity), capacity: capacity, } } // Acquire acquires a resource (blocking) func (as *AdvancedSemaphore) Acquire() { as.incrementWaiting() defer as.decrementWaiting() as.ch <- struct{}{} atomic.AddInt64(&as.totalAcquires, 1) } // TryAcquire tries to acquire without blocking func (as *AdvancedSemaphore) TryAcquire() bool { select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return true default: return false } } // AcquireWithTimeout acquires with a timeout func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-time.After(timeout): atomic.AddInt64(&as.timeouts, 1) return fmt.Errorf("timeout after %v", timeout) } } // AcquireWithContext acquires with context cancellation func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-ctx.Done(): atomic.AddInt64(&as.cancellations, 1) return ctx.Err() } } // Release releases a resource func (as *AdvancedSemaphore) Release() { <-as.ch atomic.AddInt64(&as.totalReleases, 1) } // incrementWaiting increments waiting goroutines counter func (as *AdvancedSemaphore) incrementWaiting() { as.mu.Lock() as.waitingGoroutines++ as.mu.Unlock() } // decrementWaiting decrements waiting goroutines counter func (as *AdvancedSemaphore) decrementWaiting() { as.mu.Lock() as.waitingGoroutines-- as.mu.Unlock() } // GetStats returns semaphore statistics func (as *AdvancedSemaphore) GetStats() map[string]interface{} { as.mu.RLock() waiting := as.waitingGoroutines as.mu.RUnlock() return map[string]interface{}{ "capacity": as.capacity, "available": as.Available(), "used": as.Used(), "waiting": waiting, "total_acquires": atomic.LoadInt64(&as.totalAcquires), "total_releases": atomic.LoadInt64(&as.totalReleases), "timeouts": atomic.LoadInt64(&as.timeouts), "cancellations": atomic.LoadInt64(&as.cancellations), } } // Available returns available resources func (as *AdvancedSemaphore) Available() int { return cap(as.ch) - len(as.ch) } // Used returns used resources func (as *AdvancedSemaphore) Used() int { return len(as.ch) } // Capacity returns total capacity func (as *AdvancedSemaphore) Capacity() int { return as.capacity } // ResourceManager demonstrates semaphore usage for resource management type ResourceManager struct { semaphore *AdvancedSemaphore resources []string } // NewResourceManager creates a new resource manager func NewResourceManager(resources []string) *ResourceManager { return &ResourceManager{ semaphore: NewAdvancedSemaphore(len(resources)), resources: resources, } } // UseResource uses a resource with timeout func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error { fmt.Printf("User %s: Requesting resource...\n", userID) // Try to acquire with timeout if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil { fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err) return err } defer rm.semaphore.Release() resourceIndex := rm.semaphore.Used() - 1 resourceName := rm.resources[resourceIndex] fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName) // Simulate resource usage select { case <-time.After(time.Duration(1+len(userID)%3) * time.Second): fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName) return nil case <-ctx.Done(): fmt.Printf("User %s: Resource usage cancelled\n", userID) return ctx.Err() } } // GetStats returns resource manager statistics func (rm *ResourceManager) GetStats() map[string]interface{} { return rm.semaphore.GetStats() } func main() { resources := []string{"Database-1", "Database-2", "API-Gateway"} manager := NewResourceManager(resources) fmt.Println("=== Advanced Semaphore Demo ===") fmt.Printf("Available resources: %v\n\n", resources) // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := manager.GetStats() fmt.Printf(" Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n", stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"]) } }() var wg sync.WaitGroup // Simulate users requesting resources users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"} for i, user := range users { wg.Add(1) go func(userID string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Some users have shorter timeouts timeout := 3 * time.Second if len(userID)%2 == 0 { timeout = 1 * time.Second } err := manager.UseResource(ctx, userID, timeout) if err != nil { fmt.Printf(" User %s failed: %v\n", userID, err) } }(user, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := manager.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Weighted Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // WeightedSemaphore allows acquiring resources with different weights type WeightedSemaphore struct { mu sync.Mutex capacity int64 current int64 waiters []waiter } // waiter represents a goroutine waiting for resources type waiter struct { weight int64 ready chan struct{} } // NewWeightedSemaphore creates a new weighted semaphore func NewWeightedSemaphore(capacity int64) *WeightedSemaphore { return &WeightedSemaphore{ capacity: capacity, waiters: make([]waiter, 0), } } // Acquire acquires resources with given weight func (ws *WeightedSemaphore) Acquire(weight int64) { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() <-ready } // TryAcquire tries to acquire resources without blocking func (ws *WeightedSemaphore) TryAcquire(weight int64) bool { ws.mu.Lock() defer ws.mu.Unlock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { ws.current += weight return true } return false } // AcquireWithContext acquires resources with context cancellation func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return nil } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() select { case <-ready: return nil case <-ctx.Done(): // Remove from waiters list ws.mu.Lock() for i, w := range ws.waiters { if w.ready == ready { ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) break } } ws.mu.Unlock() return ctx.Err() } } // Release releases resources with given weight func (ws *WeightedSemaphore) Release(weight int64) { ws.mu.Lock() defer ws.mu.Unlock() ws.current -= weight ws.notifyWaiters() } // notifyWaiters notifies waiting goroutines that can now proceed func (ws *WeightedSemaphore) notifyWaiters() { for i := 0; i < len(ws.waiters); { w := ws.waiters[i] if ws.current+w.weight <= ws.capacity { // This waiter can proceed ws.current += w.weight close(w.ready) // Remove from waiters ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) } else { i++ } } } // GetStats returns current statistics func (ws *WeightedSemaphore) GetStats() map[string]interface{} { ws.mu.Lock() defer ws.mu.Unlock() return map[string]interface{}{ "capacity": ws.capacity, "current": ws.current, "available": ws.capacity - ws.current, "waiters": len(ws.waiters), } } // Task represents a task with resource requirements type Task struct { ID string Weight int64 Duration time.Duration } // TaskProcessor processes tasks using weighted semaphore type TaskProcessor struct { semaphore *WeightedSemaphore } // NewTaskProcessor creates a new task processor func NewTaskProcessor(capacity int64) *TaskProcessor { return &TaskProcessor{ semaphore: NewWeightedSemaphore(capacity), } } // ProcessTask processes a task func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error { fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight) if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil { fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err) return err } defer tp.semaphore.Release(task.Weight) stats := tp.semaphore.GetStats() fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n", task.ID, task.Weight, stats["available"], stats["capacity"]) // Simulate task processing select { case <-time.After(task.Duration): fmt.Printf("Task %s: Completed\n", task.ID) return nil case <-ctx.Done(): fmt.Printf("Task %s: Cancelled\n", task.ID) return ctx.Err() } } // GetStats returns processor statistics func (tp *TaskProcessor) GetStats() map[string]interface{} { return tp.semaphore.GetStats() } func main() { // Create weighted semaphore with capacity of 10 units processor := NewTaskProcessor(10) fmt.Println("=== Weighted Semaphore Demo ===") fmt.Println("Total capacity: 10 units") // Define tasks with different resource requirements tasks := []Task{ {"Small-1", 2, 2 * time.Second}, {"Medium-1", 4, 3 * time.Second}, {"Large-1", 6, 4 * time.Second}, {"Small-2", 1, 1 * time.Second}, {"Small-3", 2, 2 * time.Second}, {"Medium-2", 5, 3 * time.Second}, {"Large-2", 8, 5 * time.Second}, } // Start monitoring go func() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for range ticker.C { stats := processor.GetStats() fmt.Printf(" Resources: %d/%d used, %d waiters\n", stats["current"], stats["capacity"], stats["waiters"]) } }() var wg sync.WaitGroup // Process tasks concurrently for i, task := range tasks { wg.Add(1) go func(t Task, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger task starts ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := processor.ProcessTask(ctx, t) if err != nil { fmt.Printf(" Task %s failed: %v\n", t.ID, err) } }(task, time.Duration(i*200)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := processor.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Semaphore-based Connection Pool package main import ( "context" "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int InUse bool LastUsed time.Time } // ConnectionPool manages database connections using semaphore type ConnectionPool struct { connections []*Connection semaphore *AdvancedSemaphore mu sync.Mutex } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { connections := make([]*Connection, size) for i := 0; i < size; i++ { connections[i] = &Connection{ ID: i + 1, InUse: false, LastUsed: time.Now(), } } return &ConnectionPool{ connections: connections, semaphore: NewAdvancedSemaphore(size), } } // GetConnection acquires a connection from the pool func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) { if err := cp.semaphore.AcquireWithContext(ctx); err != nil { return nil, err } cp.mu.Lock() defer cp.mu.Unlock() // Find an available connection for _, conn := range cp.connections { if !conn.InUse { conn.InUse = true conn.LastUsed = time.Now() return conn, nil } } // This shouldn't happen if semaphore is working correctly cp.semaphore.Release() return nil, fmt.Errorf("no available connections") } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { cp.mu.Lock() conn.InUse = false conn.LastUsed = time.Now() cp.mu.Unlock() cp.semaphore.Release() } // GetStats returns pool statistics func (cp *ConnectionPool) GetStats() map[string]interface{} { cp.mu.Lock() defer cp.mu.Unlock() inUse := 0 for _, conn := range cp.connections { if conn.InUse { inUse++ } } semStats := cp.semaphore.GetStats() return map[string]interface{}{ "total_connections": len(cp.connections), "in_use": inUse, "available": len(cp.connections) - inUse, "semaphore_stats": semStats, } } // DatabaseService simulates a service using the connection pool type DatabaseService struct { pool *ConnectionPool } // NewDatabaseService creates a new database service func NewDatabaseService(poolSize int) *DatabaseService { return &DatabaseService{ pool: NewConnectionPool(poolSize), } } // ExecuteQuery simulates executing a database query func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error { fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query) conn, err := ds.pool.GetConnection(ctx) if err != nil { fmt.Printf("User %s: Failed to get connection: %v\n", userID, err) return err } defer ds.pool.ReturnConnection(conn) fmt.Printf("User %s: Using connection %d\n", userID, conn.ID) // Simulate query execution queryDuration := time.Duration(500+len(query)*10) * time.Millisecond select { case <-time.After(queryDuration): fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID) return nil case <-ctx.Done(): fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID) return ctx.Err() } } // GetStats returns service statistics func (ds *DatabaseService) GetStats() map[string]interface{} { return ds.pool.GetStats() } func main() { // Create database service with 3 connections service := NewDatabaseService(3) fmt.Println("=== Connection Pool Demo ===") fmt.Println("Pool size: 3 connections") // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := service.GetStats() fmt.Printf(" Pool: %d/%d in use, %d available\n", stats["in_use"], stats["total_connections"], stats["available"]) } }() var wg sync.WaitGroup // Simulate multiple users making database queries users := []struct { id string query string }{ {"Alice", "SELECT * FROM users"}, {"Bob", "SELECT * FROM orders WHERE user_id = 123"}, {"Charlie", "UPDATE users SET last_login = NOW()"}, {"Diana", "SELECT COUNT(*) FROM products"}, {"Eve", "INSERT INTO logs (message) VALUES ('test')"}, {"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"}, } for i, user := range users { wg.Add(1) go func(userID, query string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := service.ExecuteQuery(ctx, userID, query) if err != nil { fmt.Printf(" User %s query failed: %v\n", userID, err) } }(user.id, user.query, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetStats() for key, value := range stats { if key == "semaphore_stats" { fmt.Printf("%s:\n", key) semStats := value.(map[string]interface{}) for k, v := range semStats { fmt.Printf(" %s: %v\n", k, v) } } else { fmt.Printf("%s: %v\n", key, value) } } } Best Practices Choose Right Capacity: Set semaphore capacity based on available resources Always Release: Use defer to ensure resources are released Handle Context: Support cancellation in long-running operations Monitor Usage: Track semaphore statistics and resource utilization Avoid Deadlocks: Don’t acquire multiple semaphores in different orders Use Timeouts: Prevent indefinite blocking with timeouts Consider Weighted: Use weighted semaphores for resources with different costs Common Pitfalls Resource Leaks: Forgetting to release acquired resources Deadlocks: Circular dependencies between semaphores Starvation: Large requests blocking smaller ones indefinitely Over-allocation: Setting capacity higher than actual resources Under-utilization: Setting capacity too low for available resources The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines. ...

    August 7, 2024 · 12 min · Rafiul Alam