Go Concurrency Patterns Series: β Rate Limiter | Series Overview | Actor Model β
What is the Semaphore Pattern?
A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available.
Types:
- Binary Semaphore: Acts like a mutex (0 or 1)
- Counting Semaphore: Allows N concurrent accesses
- Weighted Semaphore: Resources have different weights/costs
Real-World Use Cases
- Connection Pools: Limit database/HTTP connections
- Resource Management: Control access to limited resources
- Download Managers: Limit concurrent downloads
- API Rate Limiting: Control concurrent API calls
- Worker Pools: Limit concurrent workers
- Memory Management: Control memory-intensive operations
Basic Semaphore Implementation
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Semaphore implements a counting semaphore
type Semaphore struct {
ch chan struct{}
}
// NewSemaphore creates a new semaphore with given capacity
func NewSemaphore(capacity int) *Semaphore {
return &Semaphore{
ch: make(chan struct{}, capacity),
}
}
// Acquire acquires a resource from the semaphore
func (s *Semaphore) Acquire() {
s.ch <- struct{}{}
}
// TryAcquire tries to acquire a resource without blocking
func (s *Semaphore) TryAcquire() bool {
select {
case s.ch <- struct{}{}:
return true
default:
return false
}
}
// AcquireWithContext acquires a resource with context cancellation
func (s *Semaphore) AcquireWithContext(ctx context.Context) error {
select {
case s.ch <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Release releases a resource back to the semaphore
func (s *Semaphore) Release() {
<-s.ch
}
// Available returns the number of available resources
func (s *Semaphore) Available() int {
return cap(s.ch) - len(s.ch)
}
// Used returns the number of used resources
func (s *Semaphore) Used() int {
return len(s.ch)
}
// Capacity returns the total capacity
func (s *Semaphore) Capacity() int {
return cap(s.ch)
}
// simulateWork simulates work that requires a resource
func simulateWork(id int, duration time.Duration, sem *Semaphore) {
fmt.Printf("Worker %d: Requesting resource...\n", id)
sem.Acquire()
fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n",
id, sem.Available(), sem.Capacity())
// Simulate work
time.Sleep(duration)
sem.Release()
fmt.Printf("Worker %d: Released resource (available: %d/%d)\n",
id, sem.Available(), sem.Capacity())
}
func main() {
// Create semaphore with capacity of 3
sem := NewSemaphore(3)
fmt.Println("=== Basic Semaphore Demo ===")
fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity())
var wg sync.WaitGroup
// Start 6 workers, but only 3 can work concurrently
for i := 1; i <= 6; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
simulateWork(id, time.Duration(1+id%3)*time.Second, sem)
}(i)
time.Sleep(200 * time.Millisecond) // Stagger starts
}
wg.Wait()
fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity())
}
Advanced Semaphore with Timeout and Context
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// AdvancedSemaphore provides additional features like metrics and timeouts
type AdvancedSemaphore struct {
ch chan struct{}
capacity int
// Metrics
totalAcquires int64
totalReleases int64
timeouts int64
cancellations int64
// Monitoring
mu sync.RWMutex
waitingGoroutines int
}
// NewAdvancedSemaphore creates a new advanced semaphore
func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore {
return &AdvancedSemaphore{
ch: make(chan struct{}, capacity),
capacity: capacity,
}
}
// Acquire acquires a resource (blocking)
func (as *AdvancedSemaphore) Acquire() {
as.incrementWaiting()
defer as.decrementWaiting()
as.ch <- struct{}{}
atomic.AddInt64(&as.totalAcquires, 1)
}
// TryAcquire tries to acquire without blocking
func (as *AdvancedSemaphore) TryAcquire() bool {
select {
case as.ch <- struct{}{}:
atomic.AddInt64(&as.totalAcquires, 1)
return true
default:
return false
}
}
// AcquireWithTimeout acquires with a timeout
func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error {
as.incrementWaiting()
defer as.decrementWaiting()
select {
case as.ch <- struct{}{}:
atomic.AddInt64(&as.totalAcquires, 1)
return nil
case <-time.After(timeout):
atomic.AddInt64(&as.timeouts, 1)
return fmt.Errorf("timeout after %v", timeout)
}
}
// AcquireWithContext acquires with context cancellation
func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error {
as.incrementWaiting()
defer as.decrementWaiting()
select {
case as.ch <- struct{}{}:
atomic.AddInt64(&as.totalAcquires, 1)
return nil
case <-ctx.Done():
atomic.AddInt64(&as.cancellations, 1)
return ctx.Err()
}
}
// Release releases a resource
func (as *AdvancedSemaphore) Release() {
<-as.ch
atomic.AddInt64(&as.totalReleases, 1)
}
// incrementWaiting increments waiting goroutines counter
func (as *AdvancedSemaphore) incrementWaiting() {
as.mu.Lock()
as.waitingGoroutines++
as.mu.Unlock()
}
// decrementWaiting decrements waiting goroutines counter
func (as *AdvancedSemaphore) decrementWaiting() {
as.mu.Lock()
as.waitingGoroutines--
as.mu.Unlock()
}
// GetStats returns semaphore statistics
func (as *AdvancedSemaphore) GetStats() map[string]interface{} {
as.mu.RLock()
waiting := as.waitingGoroutines
as.mu.RUnlock()
return map[string]interface{}{
"capacity": as.capacity,
"available": as.Available(),
"used": as.Used(),
"waiting": waiting,
"total_acquires": atomic.LoadInt64(&as.totalAcquires),
"total_releases": atomic.LoadInt64(&as.totalReleases),
"timeouts": atomic.LoadInt64(&as.timeouts),
"cancellations": atomic.LoadInt64(&as.cancellations),
}
}
// Available returns available resources
func (as *AdvancedSemaphore) Available() int {
return cap(as.ch) - len(as.ch)
}
// Used returns used resources
func (as *AdvancedSemaphore) Used() int {
return len(as.ch)
}
// Capacity returns total capacity
func (as *AdvancedSemaphore) Capacity() int {
return as.capacity
}
// ResourceManager demonstrates semaphore usage for resource management
type ResourceManager struct {
semaphore *AdvancedSemaphore
resources []string
}
// NewResourceManager creates a new resource manager
func NewResourceManager(resources []string) *ResourceManager {
return &ResourceManager{
semaphore: NewAdvancedSemaphore(len(resources)),
resources: resources,
}
}
// UseResource uses a resource with timeout
func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error {
fmt.Printf("User %s: Requesting resource...\n", userID)
// Try to acquire with timeout
if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil {
fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err)
return err
}
defer rm.semaphore.Release()
resourceIndex := rm.semaphore.Used() - 1
resourceName := rm.resources[resourceIndex]
fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName)
// Simulate resource usage
select {
case <-time.After(time.Duration(1+len(userID)%3) * time.Second):
fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName)
return nil
case <-ctx.Done():
fmt.Printf("User %s: Resource usage cancelled\n", userID)
return ctx.Err()
}
}
// GetStats returns resource manager statistics
func (rm *ResourceManager) GetStats() map[string]interface{} {
return rm.semaphore.GetStats()
}
func main() {
resources := []string{"Database-1", "Database-2", "API-Gateway"}
manager := NewResourceManager(resources)
fmt.Println("=== Advanced Semaphore Demo ===")
fmt.Printf("Available resources: %v\n\n", resources)
// Start monitoring
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := manager.GetStats()
fmt.Printf("π Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n",
stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"])
}
}()
var wg sync.WaitGroup
// Simulate users requesting resources
users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"}
for i, user := range users {
wg.Add(1)
go func(userID string, delay time.Duration) {
defer wg.Done()
time.Sleep(delay) // Stagger requests
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Some users have shorter timeouts
timeout := 3 * time.Second
if len(userID)%2 == 0 {
timeout = 1 * time.Second
}
err := manager.UseResource(ctx, userID, timeout)
if err != nil {
fmt.Printf("β User %s failed: %v\n", userID, err)
}
}(user, time.Duration(i*300)*time.Millisecond)
}
wg.Wait()
// Final statistics
fmt.Println("\n=== Final Statistics ===")
stats := manager.GetStats()
for key, value := range stats {
fmt.Printf("%s: %v\n", key, value)
}
}
Weighted Semaphore Implementation
package main
import (
"context"
"fmt"
"sync"
"time"
)
// WeightedSemaphore allows acquiring resources with different weights
type WeightedSemaphore struct {
mu sync.Mutex
capacity int64
current int64
waiters []waiter
}
// waiter represents a goroutine waiting for resources
type waiter struct {
weight int64
ready chan struct{}
}
// NewWeightedSemaphore creates a new weighted semaphore
func NewWeightedSemaphore(capacity int64) *WeightedSemaphore {
return &WeightedSemaphore{
capacity: capacity,
waiters: make([]waiter, 0),
}
}
// Acquire acquires resources with given weight
func (ws *WeightedSemaphore) Acquire(weight int64) {
ws.mu.Lock()
if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
// Can acquire immediately
ws.current += weight
ws.mu.Unlock()
return
}
// Need to wait
ready := make(chan struct{})
ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready})
ws.mu.Unlock()
<-ready
}
// TryAcquire tries to acquire resources without blocking
func (ws *WeightedSemaphore) TryAcquire(weight int64) bool {
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
ws.current += weight
return true
}
return false
}
// AcquireWithContext acquires resources with context cancellation
func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error {
ws.mu.Lock()
if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 {
// Can acquire immediately
ws.current += weight
ws.mu.Unlock()
return nil
}
// Need to wait
ready := make(chan struct{})
ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready})
ws.mu.Unlock()
select {
case <-ready:
return nil
case <-ctx.Done():
// Remove from waiters list
ws.mu.Lock()
for i, w := range ws.waiters {
if w.ready == ready {
ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...)
break
}
}
ws.mu.Unlock()
return ctx.Err()
}
}
// Release releases resources with given weight
func (ws *WeightedSemaphore) Release(weight int64) {
ws.mu.Lock()
defer ws.mu.Unlock()
ws.current -= weight
ws.notifyWaiters()
}
// notifyWaiters notifies waiting goroutines that can now proceed
func (ws *WeightedSemaphore) notifyWaiters() {
for i := 0; i < len(ws.waiters); {
w := ws.waiters[i]
if ws.current+w.weight <= ws.capacity {
// This waiter can proceed
ws.current += w.weight
close(w.ready)
// Remove from waiters
ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...)
} else {
i++
}
}
}
// GetStats returns current statistics
func (ws *WeightedSemaphore) GetStats() map[string]interface{} {
ws.mu.Lock()
defer ws.mu.Unlock()
return map[string]interface{}{
"capacity": ws.capacity,
"current": ws.current,
"available": ws.capacity - ws.current,
"waiters": len(ws.waiters),
}
}
// Task represents a task with resource requirements
type Task struct {
ID string
Weight int64
Duration time.Duration
}
// TaskProcessor processes tasks using weighted semaphore
type TaskProcessor struct {
semaphore *WeightedSemaphore
}
// NewTaskProcessor creates a new task processor
func NewTaskProcessor(capacity int64) *TaskProcessor {
return &TaskProcessor{
semaphore: NewWeightedSemaphore(capacity),
}
}
// ProcessTask processes a task
func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error {
fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight)
if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil {
fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err)
return err
}
defer tp.semaphore.Release(task.Weight)
stats := tp.semaphore.GetStats()
fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n",
task.ID, task.Weight, stats["available"], stats["capacity"])
// Simulate task processing
select {
case <-time.After(task.Duration):
fmt.Printf("Task %s: Completed\n", task.ID)
return nil
case <-ctx.Done():
fmt.Printf("Task %s: Cancelled\n", task.ID)
return ctx.Err()
}
}
// GetStats returns processor statistics
func (tp *TaskProcessor) GetStats() map[string]interface{} {
return tp.semaphore.GetStats()
}
func main() {
// Create weighted semaphore with capacity of 10 units
processor := NewTaskProcessor(10)
fmt.Println("=== Weighted Semaphore Demo ===")
fmt.Println("Total capacity: 10 units")
// Define tasks with different resource requirements
tasks := []Task{
{"Small-1", 2, 2 * time.Second},
{"Medium-1", 4, 3 * time.Second},
{"Large-1", 6, 4 * time.Second},
{"Small-2", 1, 1 * time.Second},
{"Small-3", 2, 2 * time.Second},
{"Medium-2", 5, 3 * time.Second},
{"Large-2", 8, 5 * time.Second},
}
// Start monitoring
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
stats := processor.GetStats()
fmt.Printf("π Resources: %d/%d used, %d waiters\n",
stats["current"], stats["capacity"], stats["waiters"])
}
}()
var wg sync.WaitGroup
// Process tasks concurrently
for i, task := range tasks {
wg.Add(1)
go func(t Task, delay time.Duration) {
defer wg.Done()
time.Sleep(delay) // Stagger task starts
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := processor.ProcessTask(ctx, t)
if err != nil {
fmt.Printf("β Task %s failed: %v\n", t.ID, err)
}
}(task, time.Duration(i*200)*time.Millisecond)
}
wg.Wait()
// Final statistics
fmt.Println("\n=== Final Statistics ===")
stats := processor.GetStats()
for key, value := range stats {
fmt.Printf("%s: %v\n", key, value)
}
}
Semaphore-based Connection Pool
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Connection represents a database connection
type Connection struct {
ID int
InUse bool
LastUsed time.Time
}
// ConnectionPool manages database connections using semaphore
type ConnectionPool struct {
connections []*Connection
semaphore *AdvancedSemaphore
mu sync.Mutex
}
// NewConnectionPool creates a new connection pool
func NewConnectionPool(size int) *ConnectionPool {
connections := make([]*Connection, size)
for i := 0; i < size; i++ {
connections[i] = &Connection{
ID: i + 1,
InUse: false,
LastUsed: time.Now(),
}
}
return &ConnectionPool{
connections: connections,
semaphore: NewAdvancedSemaphore(size),
}
}
// GetConnection acquires a connection from the pool
func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) {
if err := cp.semaphore.AcquireWithContext(ctx); err != nil {
return nil, err
}
cp.mu.Lock()
defer cp.mu.Unlock()
// Find an available connection
for _, conn := range cp.connections {
if !conn.InUse {
conn.InUse = true
conn.LastUsed = time.Now()
return conn, nil
}
}
// This shouldn't happen if semaphore is working correctly
cp.semaphore.Release()
return nil, fmt.Errorf("no available connections")
}
// ReturnConnection returns a connection to the pool
func (cp *ConnectionPool) ReturnConnection(conn *Connection) {
cp.mu.Lock()
conn.InUse = false
conn.LastUsed = time.Now()
cp.mu.Unlock()
cp.semaphore.Release()
}
// GetStats returns pool statistics
func (cp *ConnectionPool) GetStats() map[string]interface{} {
cp.mu.Lock()
defer cp.mu.Unlock()
inUse := 0
for _, conn := range cp.connections {
if conn.InUse {
inUse++
}
}
semStats := cp.semaphore.GetStats()
return map[string]interface{}{
"total_connections": len(cp.connections),
"in_use": inUse,
"available": len(cp.connections) - inUse,
"semaphore_stats": semStats,
}
}
// DatabaseService simulates a service using the connection pool
type DatabaseService struct {
pool *ConnectionPool
}
// NewDatabaseService creates a new database service
func NewDatabaseService(poolSize int) *DatabaseService {
return &DatabaseService{
pool: NewConnectionPool(poolSize),
}
}
// ExecuteQuery simulates executing a database query
func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error {
fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query)
conn, err := ds.pool.GetConnection(ctx)
if err != nil {
fmt.Printf("User %s: Failed to get connection: %v\n", userID, err)
return err
}
defer ds.pool.ReturnConnection(conn)
fmt.Printf("User %s: Using connection %d\n", userID, conn.ID)
// Simulate query execution
queryDuration := time.Duration(500+len(query)*10) * time.Millisecond
select {
case <-time.After(queryDuration):
fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID)
return nil
case <-ctx.Done():
fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID)
return ctx.Err()
}
}
// GetStats returns service statistics
func (ds *DatabaseService) GetStats() map[string]interface{} {
return ds.pool.GetStats()
}
func main() {
// Create database service with 3 connections
service := NewDatabaseService(3)
fmt.Println("=== Connection Pool Demo ===")
fmt.Println("Pool size: 3 connections")
// Start monitoring
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := service.GetStats()
fmt.Printf("π Pool: %d/%d in use, %d available\n",
stats["in_use"], stats["total_connections"], stats["available"])
}
}()
var wg sync.WaitGroup
// Simulate multiple users making database queries
users := []struct {
id string
query string
}{
{"Alice", "SELECT * FROM users"},
{"Bob", "SELECT * FROM orders WHERE user_id = 123"},
{"Charlie", "UPDATE users SET last_login = NOW()"},
{"Diana", "SELECT COUNT(*) FROM products"},
{"Eve", "INSERT INTO logs (message) VALUES ('test')"},
{"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"},
}
for i, user := range users {
wg.Add(1)
go func(userID, query string, delay time.Duration) {
defer wg.Done()
time.Sleep(delay) // Stagger requests
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := service.ExecuteQuery(ctx, userID, query)
if err != nil {
fmt.Printf("β User %s query failed: %v\n", userID, err)
}
}(user.id, user.query, time.Duration(i*300)*time.Millisecond)
}
wg.Wait()
// Final statistics
fmt.Println("\n=== Final Statistics ===")
stats := service.GetStats()
for key, value := range stats {
if key == "semaphore_stats" {
fmt.Printf("%s:\n", key)
semStats := value.(map[string]interface{})
for k, v := range semStats {
fmt.Printf(" %s: %v\n", k, v)
}
} else {
fmt.Printf("%s: %v\n", key, value)
}
}
}
Best Practices
- Choose Right Capacity: Set semaphore capacity based on available resources
- Always Release: Use defer to ensure resources are released
- Handle Context: Support cancellation in long-running operations
- Monitor Usage: Track semaphore statistics and resource utilization
- Avoid Deadlocks: Don’t acquire multiple semaphores in different orders
- Use Timeouts: Prevent indefinite blocking with timeouts
- Consider Weighted: Use weighted semaphores for resources with different costs
Common Pitfalls
- Resource Leaks: Forgetting to release acquired resources
- Deadlocks: Circular dependencies between semaphores
- Starvation: Large requests blocking smaller ones indefinitely
- Over-allocation: Setting capacity higher than actual resources
- Under-utilization: Setting capacity too low for available resources
The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines.
Next: Learn about Actor Model for message-passing concurrency and isolated state management.