Go Concurrency Patterns Series: ← Rate Limiter | Series Overview | Actor Model β†’


What is the Semaphore Pattern?

A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available.

Types:

  • Binary Semaphore: Acts like a mutex (0 or 1)
  • Counting Semaphore: Allows N concurrent accesses
  • Weighted Semaphore: Resources have different weights/costs

Real-World Use Cases

  • Connection Pools: Limit database/HTTP connections
  • Resource Management: Control access to limited resources
  • Download Managers: Limit concurrent downloads
  • API Rate Limiting: Control concurrent API calls
  • Worker Pools: Limit concurrent workers
  • Memory Management: Control memory-intensive operations

Basic Semaphore Implementation

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Semaphore implements a counting semaphore
type Semaphore struct {
    ch chan struct{}
}

// NewSemaphore creates a new semaphore with given capacity
func NewSemaphore(capacity int) *Semaphore {
    return &Semaphore{
        ch: make(chan struct{}, capacity),
    }
}

// Acquire acquires a resource from the semaphore
func (s *Semaphore) Acquire() {
    s.ch <- struct{}{}
}

// TryAcquire tries to acquire a resource without blocking
func (s *Semaphore) TryAcquire() bool {
    select {
    case s.ch <- struct{}{}:
        return true
    default:
        return false
    }
}

// AcquireWithContext acquires a resource with context cancellation
func (s *Semaphore) AcquireWithContext(ctx context.Context) error {
    select {
    case s.ch <- struct{}{}:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// Release releases a resource back to the semaphore
func (s *Semaphore) Release() {
    <-s.ch
}

// Available returns the number of available resources
func (s *Semaphore) Available() int {
    return cap(s.ch) - len(s.ch)
}

// Used returns the number of used resources
func (s *Semaphore) Used() int {
    return len(s.ch)
}

// Capacity returns the total capacity
func (s *Semaphore) Capacity() int {
    return cap(s.ch)
}

// simulateWork simulates work that requires a resource
func simulateWork(id int, duration time.Duration, sem *Semaphore) {
    fmt.Printf("Worker %d: Requesting resource...\n", id)
    
    sem.Acquire()
    fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n", 
        id, sem.Available(), sem.Capacity())
    
    // Simulate work
    time.Sleep(duration)
    
    sem.Release()
    fmt.Printf("Worker %d: Released resource (available: %d/%d)\n", 
        id, sem.Available(), sem.Capacity())
}

func main() {
    // Create semaphore with capacity of 3
    sem := NewSemaphore(3)
    
    fmt.Println("=== Basic Semaphore Demo ===")
    fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity())
    
    var wg sync.WaitGroup
    
    // Start 6 workers, but only 3 can work concurrently
    for i := 1; i <= 6; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            simulateWork(id, time.Duration(1+id%3)*time.Second, sem)
        }(i)
        
        time.Sleep(200 * time.Millisecond) // Stagger starts
    }
    
    wg.Wait()
    fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity())
}

Advanced Semaphore with Timeout and Context

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// AdvancedSemaphore provides additional features like metrics and timeouts
type AdvancedSemaphore struct {
    ch          chan struct{}
    capacity    int
    
    // Metrics
    totalAcquires   int64
    totalReleases   int64
    timeouts        int64
    cancellations   int64
    
    // Monitoring
    mu              sync.RWMutex
    waitingGoroutines int
}

// NewAdvancedSemaphore creates a new advanced semaphore
func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore {
    return &AdvancedSemaphore{
        ch:       make(chan struct{}, capacity),
        capacity: capacity,
    }
}

// Acquire acquires a resource (blocking)
func (as *AdvancedSemaphore) Acquire() {
    as.incrementWaiting()
    defer as.decrementWaiting()
    
    as.ch <- struct{}{}
    atomic.AddInt64(&as.totalAcquires, 1)
}

// TryAcquire tries to acquire without blocking
func (as *AdvancedSemaphore) TryAcquire() bool {
    select {
    case as.ch <- struct{}{}:
        atomic.AddInt64(&as.totalAcquires, 1)
        return true
    default:
        return false
    }
}

// AcquireWithTimeout acquires with a timeout
func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error {
    as.incrementWaiting()
    defer as.decrementWaiting()
    
    select {
    case as.ch <- struct{}{}:
        atomic.AddInt64(&as.totalAcquires, 1)
        return nil
    case <-time.After(timeout):
        atomic.AddInt64(&as.timeouts, 1)
        return fmt.Errorf("timeout after %v", timeout)
    }
}

// AcquireWithContext acquires with context cancellation
func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error {
    as.incrementWaiting()
    defer as.decrementWaiting()
    
    select {
    case as.ch <- struct{}{}:
        atomic.AddInt64(&as.totalAcquires, 1)
        return nil
    case <-ctx.Done():
        atomic.AddInt64(&as.cancellations, 1)
        return ctx.Err()
    }
}

// Release releases a resource
func (as *AdvancedSemaphore) Release() {
    <-as.ch
    atomic.AddInt64(&as.totalReleases, 1)
}

// incrementWaiting increments waiting goroutines counter
func (as *AdvancedSemaphore) incrementWaiting() {
    as.mu.Lock()
    as.waitingGoroutines++
    as.mu.Unlock()
}

// decrementWaiting decrements waiting goroutines counter
func (as *AdvancedSemaphore) decrementWaiting() {
    as.mu.Lock()
    as.waitingGoroutines--
    as.mu.Unlock()
}

// GetStats returns semaphore statistics
func (as *AdvancedSemaphore) GetStats() map[string]interface{} {
    as.mu.RLock()
    waiting := as.waitingGoroutines
    as.mu.RUnlock()
    
    return map[string]interface{}{
        "capacity":           as.capacity,
        "available":          as.Available(),
        "used":              as.Used(),
        "waiting":           waiting,
        "total_acquires":    atomic.LoadInt64(&as.totalAcquires),
        "total_releases":    atomic.LoadInt64(&as.totalReleases),
        "timeouts":          atomic.LoadInt64(&as.timeouts),
        "cancellations":     atomic.LoadInt64(&as.cancellations),
    }
}

// Available returns available resources
func (as *AdvancedSemaphore) Available() int {
    return cap(as.ch) - len(as.ch)
}

// Used returns used resources
func (as *AdvancedSemaphore) Used() int {
    return len(as.ch)
}

// Capacity returns total capacity
func (as *AdvancedSemaphore) Capacity() int {
    return as.capacity
}

// ResourceManager demonstrates semaphore usage for resource management
type ResourceManager struct {
    semaphore *AdvancedSemaphore
    resources []string
}

// NewResourceManager creates a new resource manager
func NewResourceManager(resources []string) *ResourceManager {
    return &ResourceManager{
        semaphore: NewAdvancedSemaphore(len(resources)),
        resources: resources,
    }
}

// UseResource uses a resource with timeout
func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error {
    fmt.Printf("User %s: Requesting resource...\n", userID)
    
    // Try to acquire with timeout
    if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil {
        fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err)
        return err
    }
    
    defer rm.semaphore.Release()
    
    resourceIndex := rm.semaphore.Used() - 1
    resourceName := rm.resources[resourceIndex]
    
    fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName)
    
    // Simulate resource usage
    select {
    case <-time.After(time.Duration(1+len(userID)%3) * time.Second):
        fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName)
        return nil
    case <-ctx.Done():
        fmt.Printf("User %s: Resource usage cancelled\n", userID)
        return ctx.Err()
    }
}

// GetStats returns resource manager statistics
func (rm *ResourceManager) GetStats() map[string]interface{} {
    return rm.semaphore.GetStats()
}

func main() {
    resources := []string{"Database-1", "Database-2", "API-Gateway"}
    manager := NewResourceManager(resources)
    
    fmt.Println("=== Advanced Semaphore Demo ===")
    fmt.Printf("Available resources: %v\n\n", resources)
    
    // Start monitoring
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            stats := manager.GetStats()
            fmt.Printf("πŸ“Š Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n",
                stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"])
        }
    }()
    
    var wg sync.WaitGroup
    
    // Simulate users requesting resources
    users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"}
    
    for i, user := range users {
        wg.Add(1)
        go func(userID string, delay time.Duration) {
            defer wg.Done()
            
            time.Sleep(delay) // Stagger requests
            
            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
            defer cancel()
            
            // Some users have shorter timeouts
            timeout := 3 * time.Second
            if len(userID)%2 == 0 {
                timeout = 1 * time.Second
            }
            
            err := manager.UseResource(ctx, userID, timeout)
            if err != nil {
                fmt.Printf("❌ User %s failed: %v\n", userID, err)
            }
        }(user, time.Duration(i*300)*time.Millisecond)
    }
    
    wg.Wait()
    
    // Final statistics
    fmt.Println("\n=== Final Statistics ===")
    stats := manager.GetStats()
    for key, value := range stats {
        fmt.Printf("%s: %v\n", key, value)
    }
}

Weighted Semaphore Implementation

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// WeightedSemaphore allows acquiring resources with different weights
type WeightedSemaphore struct {
    mu       sync.Mutex
    capacity int64
    current  int64
    waiters  []waiter
}

// waiter represents a goroutine waiting for resources
type waiter struct {
    weight int64
    ready  chan struct{}
}

// NewWeightedSemaphore creates a new weighted semaphore
func NewWeightedSemaphore(capacity int64) *WeightedSemaphore {
    return &WeightedSemaphore{
        capacity: capacity,
        waiters:  make([]waiter, 0),
    }
}

// Acquire acquires resources with given weight
func (ws *WeightedSemaphore) Acquire(weight int64) {
    ws.mu.Lock()
    
    if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
        // Can acquire immediately
        ws.current += weight
        ws.mu.Unlock()
        return
    }
    
    // Need to wait
    ready := make(chan struct{})
    ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready})
    ws.mu.Unlock()
    
    <-ready
}

// TryAcquire tries to acquire resources without blocking
func (ws *WeightedSemaphore) TryAcquire(weight int64) bool {
    ws.mu.Lock()
    defer ws.mu.Unlock()
    
    if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
        ws.current += weight
        return true
    }
    
    return false
}

// AcquireWithContext acquires resources with context cancellation
func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error {
    ws.mu.Lock()
    
    if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
        // Can acquire immediately
        ws.current += weight
        ws.mu.Unlock()
        return nil
    }
    
    // Need to wait
    ready := make(chan struct{})
    ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready})
    ws.mu.Unlock()
    
    select {
    case <-ready:
        return nil
    case <-ctx.Done():
        // Remove from waiters list
        ws.mu.Lock()
        for i, w := range ws.waiters {
            if w.ready == ready {
                ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...)
                break
            }
        }
        ws.mu.Unlock()
        return ctx.Err()
    }
}

// Release releases resources with given weight
func (ws *WeightedSemaphore) Release(weight int64) {
    ws.mu.Lock()
    defer ws.mu.Unlock()
    
    ws.current -= weight
    ws.notifyWaiters()
}

// notifyWaiters notifies waiting goroutines that can now proceed
func (ws *WeightedSemaphore) notifyWaiters() {
    for i := 0; i < len(ws.waiters); {
        w := ws.waiters[i]
        
        if ws.current+w.weight <= ws.capacity {
            // This waiter can proceed
            ws.current += w.weight
            close(w.ready)
            
            // Remove from waiters
            ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...)
        } else {
            i++
        }
    }
}

// GetStats returns current statistics
func (ws *WeightedSemaphore) GetStats() map[string]interface{} {
    ws.mu.Lock()
    defer ws.mu.Unlock()
    
    return map[string]interface{}{
        "capacity": ws.capacity,
        "current":  ws.current,
        "available": ws.capacity - ws.current,
        "waiters":  len(ws.waiters),
    }
}

// Task represents a task with resource requirements
type Task struct {
    ID     string
    Weight int64
    Duration time.Duration
}

// TaskProcessor processes tasks using weighted semaphore
type TaskProcessor struct {
    semaphore *WeightedSemaphore
}

// NewTaskProcessor creates a new task processor
func NewTaskProcessor(capacity int64) *TaskProcessor {
    return &TaskProcessor{
        semaphore: NewWeightedSemaphore(capacity),
    }
}

// ProcessTask processes a task
func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error {
    fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight)
    
    if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil {
        fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err)
        return err
    }
    
    defer tp.semaphore.Release(task.Weight)
    
    stats := tp.semaphore.GetStats()
    fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n", 
        task.ID, task.Weight, stats["available"], stats["capacity"])
    
    // Simulate task processing
    select {
    case <-time.After(task.Duration):
        fmt.Printf("Task %s: Completed\n", task.ID)
        return nil
    case <-ctx.Done():
        fmt.Printf("Task %s: Cancelled\n", task.ID)
        return ctx.Err()
    }
}

// GetStats returns processor statistics
func (tp *TaskProcessor) GetStats() map[string]interface{} {
    return tp.semaphore.GetStats()
}

func main() {
    // Create weighted semaphore with capacity of 10 units
    processor := NewTaskProcessor(10)
    
    fmt.Println("=== Weighted Semaphore Demo ===")
    fmt.Println("Total capacity: 10 units")
    
    // Define tasks with different resource requirements
    tasks := []Task{
        {"Small-1", 2, 2 * time.Second},
        {"Medium-1", 4, 3 * time.Second},
        {"Large-1", 6, 4 * time.Second},
        {"Small-2", 1, 1 * time.Second},
        {"Small-3", 2, 2 * time.Second},
        {"Medium-2", 5, 3 * time.Second},
        {"Large-2", 8, 5 * time.Second},
    }
    
    // Start monitoring
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        for range ticker.C {
            stats := processor.GetStats()
            fmt.Printf("πŸ“Š Resources: %d/%d used, %d waiters\n",
                stats["current"], stats["capacity"], stats["waiters"])
        }
    }()
    
    var wg sync.WaitGroup
    
    // Process tasks concurrently
    for i, task := range tasks {
        wg.Add(1)
        go func(t Task, delay time.Duration) {
            defer wg.Done()
            
            time.Sleep(delay) // Stagger task starts
            
            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
            defer cancel()
            
            err := processor.ProcessTask(ctx, t)
            if err != nil {
                fmt.Printf("❌ Task %s failed: %v\n", t.ID, err)
            }
        }(task, time.Duration(i*200)*time.Millisecond)
    }
    
    wg.Wait()
    
    // Final statistics
    fmt.Println("\n=== Final Statistics ===")
    stats := processor.GetStats()
    for key, value := range stats {
        fmt.Printf("%s: %v\n", key, value)
    }
}

Semaphore-based Connection Pool

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Connection represents a database connection
type Connection struct {
    ID       int
    InUse    bool
    LastUsed time.Time
}

// ConnectionPool manages database connections using semaphore
type ConnectionPool struct {
    connections []*Connection
    semaphore   *AdvancedSemaphore
    mu          sync.Mutex
}

// NewConnectionPool creates a new connection pool
func NewConnectionPool(size int) *ConnectionPool {
    connections := make([]*Connection, size)
    for i := 0; i < size; i++ {
        connections[i] = &Connection{
            ID:       i + 1,
            InUse:    false,
            LastUsed: time.Now(),
        }
    }
    
    return &ConnectionPool{
        connections: connections,
        semaphore:   NewAdvancedSemaphore(size),
    }
}

// GetConnection acquires a connection from the pool
func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) {
    if err := cp.semaphore.AcquireWithContext(ctx); err != nil {
        return nil, err
    }
    
    cp.mu.Lock()
    defer cp.mu.Unlock()
    
    // Find an available connection
    for _, conn := range cp.connections {
        if !conn.InUse {
            conn.InUse = true
            conn.LastUsed = time.Now()
            return conn, nil
        }
    }
    
    // This shouldn't happen if semaphore is working correctly
    cp.semaphore.Release()
    return nil, fmt.Errorf("no available connections")
}

// ReturnConnection returns a connection to the pool
func (cp *ConnectionPool) ReturnConnection(conn *Connection) {
    cp.mu.Lock()
    conn.InUse = false
    conn.LastUsed = time.Now()
    cp.mu.Unlock()
    
    cp.semaphore.Release()
}

// GetStats returns pool statistics
func (cp *ConnectionPool) GetStats() map[string]interface{} {
    cp.mu.Lock()
    defer cp.mu.Unlock()
    
    inUse := 0
    for _, conn := range cp.connections {
        if conn.InUse {
            inUse++
        }
    }
    
    semStats := cp.semaphore.GetStats()
    
    return map[string]interface{}{
        "total_connections": len(cp.connections),
        "in_use":           inUse,
        "available":        len(cp.connections) - inUse,
        "semaphore_stats":  semStats,
    }
}

// DatabaseService simulates a service using the connection pool
type DatabaseService struct {
    pool *ConnectionPool
}

// NewDatabaseService creates a new database service
func NewDatabaseService(poolSize int) *DatabaseService {
    return &DatabaseService{
        pool: NewConnectionPool(poolSize),
    }
}

// ExecuteQuery simulates executing a database query
func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error {
    fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query)
    
    conn, err := ds.pool.GetConnection(ctx)
    if err != nil {
        fmt.Printf("User %s: Failed to get connection: %v\n", userID, err)
        return err
    }
    
    defer ds.pool.ReturnConnection(conn)
    
    fmt.Printf("User %s: Using connection %d\n", userID, conn.ID)
    
    // Simulate query execution
    queryDuration := time.Duration(500+len(query)*10) * time.Millisecond
    select {
    case <-time.After(queryDuration):
        fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID)
        return nil
    case <-ctx.Done():
        fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID)
        return ctx.Err()
    }
}

// GetStats returns service statistics
func (ds *DatabaseService) GetStats() map[string]interface{} {
    return ds.pool.GetStats()
}

func main() {
    // Create database service with 3 connections
    service := NewDatabaseService(3)
    
    fmt.Println("=== Connection Pool Demo ===")
    fmt.Println("Pool size: 3 connections")
    
    // Start monitoring
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            stats := service.GetStats()
            fmt.Printf("πŸ“Š Pool: %d/%d in use, %d available\n",
                stats["in_use"], stats["total_connections"], stats["available"])
        }
    }()
    
    var wg sync.WaitGroup
    
    // Simulate multiple users making database queries
    users := []struct {
        id    string
        query string
    }{
        {"Alice", "SELECT * FROM users"},
        {"Bob", "SELECT * FROM orders WHERE user_id = 123"},
        {"Charlie", "UPDATE users SET last_login = NOW()"},
        {"Diana", "SELECT COUNT(*) FROM products"},
        {"Eve", "INSERT INTO logs (message) VALUES ('test')"},
        {"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"},
    }
    
    for i, user := range users {
        wg.Add(1)
        go func(userID, query string, delay time.Duration) {
            defer wg.Done()
            
            time.Sleep(delay) // Stagger requests
            
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            defer cancel()
            
            err := service.ExecuteQuery(ctx, userID, query)
            if err != nil {
                fmt.Printf("❌ User %s query failed: %v\n", userID, err)
            }
        }(user.id, user.query, time.Duration(i*300)*time.Millisecond)
    }
    
    wg.Wait()
    
    // Final statistics
    fmt.Println("\n=== Final Statistics ===")
    stats := service.GetStats()
    for key, value := range stats {
        if key == "semaphore_stats" {
            fmt.Printf("%s:\n", key)
            semStats := value.(map[string]interface{})
            for k, v := range semStats {
                fmt.Printf("  %s: %v\n", k, v)
            }
        } else {
            fmt.Printf("%s: %v\n", key, value)
        }
    }
}

Best Practices

  1. Choose Right Capacity: Set semaphore capacity based on available resources
  2. Always Release: Use defer to ensure resources are released
  3. Handle Context: Support cancellation in long-running operations
  4. Monitor Usage: Track semaphore statistics and resource utilization
  5. Avoid Deadlocks: Don’t acquire multiple semaphores in different orders
  6. Use Timeouts: Prevent indefinite blocking with timeouts
  7. Consider Weighted: Use weighted semaphores for resources with different costs

Common Pitfalls

  1. Resource Leaks: Forgetting to release acquired resources
  2. Deadlocks: Circular dependencies between semaphores
  3. Starvation: Large requests blocking smaller ones indefinitely
  4. Over-allocation: Setting capacity higher than actual resources
  5. Under-utilization: Setting capacity too low for available resources

The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines.


Next: Learn about Actor Model for message-passing concurrency and isolated state management.