Semaphore Pattern in Go

    Go Concurrency Patterns Series: ← Rate Limiter | Series Overview | Actor Model → What is the Semaphore Pattern? A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available. Types: Binary Semaphore: Acts like a mutex (0 or 1) Counting Semaphore: Allows N concurrent accesses Weighted Semaphore: Resources have different weights/costs Real-World Use Cases Connection Pools: Limit database/HTTP connections Resource Management: Control access to limited resources Download Managers: Limit concurrent downloads API Rate Limiting: Control concurrent API calls Worker Pools: Limit concurrent workers Memory Management: Control memory-intensive operations Basic Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // Semaphore implements a counting semaphore type Semaphore struct { ch chan struct{} } // NewSemaphore creates a new semaphore with given capacity func NewSemaphore(capacity int) *Semaphore { return &Semaphore{ ch: make(chan struct{}, capacity), } } // Acquire acquires a resource from the semaphore func (s *Semaphore) Acquire() { s.ch <- struct{}{} } // TryAcquire tries to acquire a resource without blocking func (s *Semaphore) TryAcquire() bool { select { case s.ch <- struct{}{}: return true default: return false } } // AcquireWithContext acquires a resource with context cancellation func (s *Semaphore) AcquireWithContext(ctx context.Context) error { select { case s.ch <- struct{}{}: return nil case <-ctx.Done(): return ctx.Err() } } // Release releases a resource back to the semaphore func (s *Semaphore) Release() { <-s.ch } // Available returns the number of available resources func (s *Semaphore) Available() int { return cap(s.ch) - len(s.ch) } // Used returns the number of used resources func (s *Semaphore) Used() int { return len(s.ch) } // Capacity returns the total capacity func (s *Semaphore) Capacity() int { return cap(s.ch) } // simulateWork simulates work that requires a resource func simulateWork(id int, duration time.Duration, sem *Semaphore) { fmt.Printf("Worker %d: Requesting resource...\n", id) sem.Acquire() fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) // Simulate work time.Sleep(duration) sem.Release() fmt.Printf("Worker %d: Released resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) } func main() { // Create semaphore with capacity of 3 sem := NewSemaphore(3) fmt.Println("=== Basic Semaphore Demo ===") fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity()) var wg sync.WaitGroup // Start 6 workers, but only 3 can work concurrently for i := 1; i <= 6; i++ { wg.Add(1) go func(id int) { defer wg.Done() simulateWork(id, time.Duration(1+id%3)*time.Second, sem) }(i) time.Sleep(200 * time.Millisecond) // Stagger starts } wg.Wait() fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity()) } Advanced Semaphore with Timeout and Context package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // AdvancedSemaphore provides additional features like metrics and timeouts type AdvancedSemaphore struct { ch chan struct{} capacity int // Metrics totalAcquires int64 totalReleases int64 timeouts int64 cancellations int64 // Monitoring mu sync.RWMutex waitingGoroutines int } // NewAdvancedSemaphore creates a new advanced semaphore func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore { return &AdvancedSemaphore{ ch: make(chan struct{}, capacity), capacity: capacity, } } // Acquire acquires a resource (blocking) func (as *AdvancedSemaphore) Acquire() { as.incrementWaiting() defer as.decrementWaiting() as.ch <- struct{}{} atomic.AddInt64(&as.totalAcquires, 1) } // TryAcquire tries to acquire without blocking func (as *AdvancedSemaphore) TryAcquire() bool { select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return true default: return false } } // AcquireWithTimeout acquires with a timeout func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-time.After(timeout): atomic.AddInt64(&as.timeouts, 1) return fmt.Errorf("timeout after %v", timeout) } } // AcquireWithContext acquires with context cancellation func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-ctx.Done(): atomic.AddInt64(&as.cancellations, 1) return ctx.Err() } } // Release releases a resource func (as *AdvancedSemaphore) Release() { <-as.ch atomic.AddInt64(&as.totalReleases, 1) } // incrementWaiting increments waiting goroutines counter func (as *AdvancedSemaphore) incrementWaiting() { as.mu.Lock() as.waitingGoroutines++ as.mu.Unlock() } // decrementWaiting decrements waiting goroutines counter func (as *AdvancedSemaphore) decrementWaiting() { as.mu.Lock() as.waitingGoroutines-- as.mu.Unlock() } // GetStats returns semaphore statistics func (as *AdvancedSemaphore) GetStats() map[string]interface{} { as.mu.RLock() waiting := as.waitingGoroutines as.mu.RUnlock() return map[string]interface{}{ "capacity": as.capacity, "available": as.Available(), "used": as.Used(), "waiting": waiting, "total_acquires": atomic.LoadInt64(&as.totalAcquires), "total_releases": atomic.LoadInt64(&as.totalReleases), "timeouts": atomic.LoadInt64(&as.timeouts), "cancellations": atomic.LoadInt64(&as.cancellations), } } // Available returns available resources func (as *AdvancedSemaphore) Available() int { return cap(as.ch) - len(as.ch) } // Used returns used resources func (as *AdvancedSemaphore) Used() int { return len(as.ch) } // Capacity returns total capacity func (as *AdvancedSemaphore) Capacity() int { return as.capacity } // ResourceManager demonstrates semaphore usage for resource management type ResourceManager struct { semaphore *AdvancedSemaphore resources []string } // NewResourceManager creates a new resource manager func NewResourceManager(resources []string) *ResourceManager { return &ResourceManager{ semaphore: NewAdvancedSemaphore(len(resources)), resources: resources, } } // UseResource uses a resource with timeout func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error { fmt.Printf("User %s: Requesting resource...\n", userID) // Try to acquire with timeout if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil { fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err) return err } defer rm.semaphore.Release() resourceIndex := rm.semaphore.Used() - 1 resourceName := rm.resources[resourceIndex] fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName) // Simulate resource usage select { case <-time.After(time.Duration(1+len(userID)%3) * time.Second): fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName) return nil case <-ctx.Done(): fmt.Printf("User %s: Resource usage cancelled\n", userID) return ctx.Err() } } // GetStats returns resource manager statistics func (rm *ResourceManager) GetStats() map[string]interface{} { return rm.semaphore.GetStats() } func main() { resources := []string{"Database-1", "Database-2", "API-Gateway"} manager := NewResourceManager(resources) fmt.Println("=== Advanced Semaphore Demo ===") fmt.Printf("Available resources: %v\n\n", resources) // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := manager.GetStats() fmt.Printf(" Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n", stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"]) } }() var wg sync.WaitGroup // Simulate users requesting resources users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"} for i, user := range users { wg.Add(1) go func(userID string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Some users have shorter timeouts timeout := 3 * time.Second if len(userID)%2 == 0 { timeout = 1 * time.Second } err := manager.UseResource(ctx, userID, timeout) if err != nil { fmt.Printf(" User %s failed: %v\n", userID, err) } }(user, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := manager.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Weighted Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // WeightedSemaphore allows acquiring resources with different weights type WeightedSemaphore struct { mu sync.Mutex capacity int64 current int64 waiters []waiter } // waiter represents a goroutine waiting for resources type waiter struct { weight int64 ready chan struct{} } // NewWeightedSemaphore creates a new weighted semaphore func NewWeightedSemaphore(capacity int64) *WeightedSemaphore { return &WeightedSemaphore{ capacity: capacity, waiters: make([]waiter, 0), } } // Acquire acquires resources with given weight func (ws *WeightedSemaphore) Acquire(weight int64) { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() <-ready } // TryAcquire tries to acquire resources without blocking func (ws *WeightedSemaphore) TryAcquire(weight int64) bool { ws.mu.Lock() defer ws.mu.Unlock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { ws.current += weight return true } return false } // AcquireWithContext acquires resources with context cancellation func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return nil } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() select { case <-ready: return nil case <-ctx.Done(): // Remove from waiters list ws.mu.Lock() for i, w := range ws.waiters { if w.ready == ready { ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) break } } ws.mu.Unlock() return ctx.Err() } } // Release releases resources with given weight func (ws *WeightedSemaphore) Release(weight int64) { ws.mu.Lock() defer ws.mu.Unlock() ws.current -= weight ws.notifyWaiters() } // notifyWaiters notifies waiting goroutines that can now proceed func (ws *WeightedSemaphore) notifyWaiters() { for i := 0; i < len(ws.waiters); { w := ws.waiters[i] if ws.current+w.weight <= ws.capacity { // This waiter can proceed ws.current += w.weight close(w.ready) // Remove from waiters ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) } else { i++ } } } // GetStats returns current statistics func (ws *WeightedSemaphore) GetStats() map[string]interface{} { ws.mu.Lock() defer ws.mu.Unlock() return map[string]interface{}{ "capacity": ws.capacity, "current": ws.current, "available": ws.capacity - ws.current, "waiters": len(ws.waiters), } } // Task represents a task with resource requirements type Task struct { ID string Weight int64 Duration time.Duration } // TaskProcessor processes tasks using weighted semaphore type TaskProcessor struct { semaphore *WeightedSemaphore } // NewTaskProcessor creates a new task processor func NewTaskProcessor(capacity int64) *TaskProcessor { return &TaskProcessor{ semaphore: NewWeightedSemaphore(capacity), } } // ProcessTask processes a task func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error { fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight) if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil { fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err) return err } defer tp.semaphore.Release(task.Weight) stats := tp.semaphore.GetStats() fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n", task.ID, task.Weight, stats["available"], stats["capacity"]) // Simulate task processing select { case <-time.After(task.Duration): fmt.Printf("Task %s: Completed\n", task.ID) return nil case <-ctx.Done(): fmt.Printf("Task %s: Cancelled\n", task.ID) return ctx.Err() } } // GetStats returns processor statistics func (tp *TaskProcessor) GetStats() map[string]interface{} { return tp.semaphore.GetStats() } func main() { // Create weighted semaphore with capacity of 10 units processor := NewTaskProcessor(10) fmt.Println("=== Weighted Semaphore Demo ===") fmt.Println("Total capacity: 10 units") // Define tasks with different resource requirements tasks := []Task{ {"Small-1", 2, 2 * time.Second}, {"Medium-1", 4, 3 * time.Second}, {"Large-1", 6, 4 * time.Second}, {"Small-2", 1, 1 * time.Second}, {"Small-3", 2, 2 * time.Second}, {"Medium-2", 5, 3 * time.Second}, {"Large-2", 8, 5 * time.Second}, } // Start monitoring go func() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for range ticker.C { stats := processor.GetStats() fmt.Printf(" Resources: %d/%d used, %d waiters\n", stats["current"], stats["capacity"], stats["waiters"]) } }() var wg sync.WaitGroup // Process tasks concurrently for i, task := range tasks { wg.Add(1) go func(t Task, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger task starts ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := processor.ProcessTask(ctx, t) if err != nil { fmt.Printf(" Task %s failed: %v\n", t.ID, err) } }(task, time.Duration(i*200)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := processor.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Semaphore-based Connection Pool package main import ( "context" "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int InUse bool LastUsed time.Time } // ConnectionPool manages database connections using semaphore type ConnectionPool struct { connections []*Connection semaphore *AdvancedSemaphore mu sync.Mutex } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { connections := make([]*Connection, size) for i := 0; i < size; i++ { connections[i] = &Connection{ ID: i + 1, InUse: false, LastUsed: time.Now(), } } return &ConnectionPool{ connections: connections, semaphore: NewAdvancedSemaphore(size), } } // GetConnection acquires a connection from the pool func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) { if err := cp.semaphore.AcquireWithContext(ctx); err != nil { return nil, err } cp.mu.Lock() defer cp.mu.Unlock() // Find an available connection for _, conn := range cp.connections { if !conn.InUse { conn.InUse = true conn.LastUsed = time.Now() return conn, nil } } // This shouldn't happen if semaphore is working correctly cp.semaphore.Release() return nil, fmt.Errorf("no available connections") } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { cp.mu.Lock() conn.InUse = false conn.LastUsed = time.Now() cp.mu.Unlock() cp.semaphore.Release() } // GetStats returns pool statistics func (cp *ConnectionPool) GetStats() map[string]interface{} { cp.mu.Lock() defer cp.mu.Unlock() inUse := 0 for _, conn := range cp.connections { if conn.InUse { inUse++ } } semStats := cp.semaphore.GetStats() return map[string]interface{}{ "total_connections": len(cp.connections), "in_use": inUse, "available": len(cp.connections) - inUse, "semaphore_stats": semStats, } } // DatabaseService simulates a service using the connection pool type DatabaseService struct { pool *ConnectionPool } // NewDatabaseService creates a new database service func NewDatabaseService(poolSize int) *DatabaseService { return &DatabaseService{ pool: NewConnectionPool(poolSize), } } // ExecuteQuery simulates executing a database query func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error { fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query) conn, err := ds.pool.GetConnection(ctx) if err != nil { fmt.Printf("User %s: Failed to get connection: %v\n", userID, err) return err } defer ds.pool.ReturnConnection(conn) fmt.Printf("User %s: Using connection %d\n", userID, conn.ID) // Simulate query execution queryDuration := time.Duration(500+len(query)*10) * time.Millisecond select { case <-time.After(queryDuration): fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID) return nil case <-ctx.Done(): fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID) return ctx.Err() } } // GetStats returns service statistics func (ds *DatabaseService) GetStats() map[string]interface{} { return ds.pool.GetStats() } func main() { // Create database service with 3 connections service := NewDatabaseService(3) fmt.Println("=== Connection Pool Demo ===") fmt.Println("Pool size: 3 connections") // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := service.GetStats() fmt.Printf(" Pool: %d/%d in use, %d available\n", stats["in_use"], stats["total_connections"], stats["available"]) } }() var wg sync.WaitGroup // Simulate multiple users making database queries users := []struct { id string query string }{ {"Alice", "SELECT * FROM users"}, {"Bob", "SELECT * FROM orders WHERE user_id = 123"}, {"Charlie", "UPDATE users SET last_login = NOW()"}, {"Diana", "SELECT COUNT(*) FROM products"}, {"Eve", "INSERT INTO logs (message) VALUES ('test')"}, {"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"}, } for i, user := range users { wg.Add(1) go func(userID, query string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := service.ExecuteQuery(ctx, userID, query) if err != nil { fmt.Printf(" User %s query failed: %v\n", userID, err) } }(user.id, user.query, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetStats() for key, value := range stats { if key == "semaphore_stats" { fmt.Printf("%s:\n", key) semStats := value.(map[string]interface{}) for k, v := range semStats { fmt.Printf(" %s: %v\n", k, v) } } else { fmt.Printf("%s: %v\n", key, value) } } } Best Practices Choose Right Capacity: Set semaphore capacity based on available resources Always Release: Use defer to ensure resources are released Handle Context: Support cancellation in long-running operations Monitor Usage: Track semaphore statistics and resource utilization Avoid Deadlocks: Don’t acquire multiple semaphores in different orders Use Timeouts: Prevent indefinite blocking with timeouts Consider Weighted: Use weighted semaphores for resources with different costs Common Pitfalls Resource Leaks: Forgetting to release acquired resources Deadlocks: Circular dependencies between semaphores Starvation: Large requests blocking smaller ones indefinitely Over-allocation: Setting capacity higher than actual resources Under-utilization: Setting capacity too low for available resources The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines. ...

    August 7, 2024 · 12 min · Rafiul Alam

    Request/Response Pattern in Go

    Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool → What is the Request/Response Pattern? The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation. Key Components: Request: Contains data and a response channel Response: Contains result data and/or error information Requester: Sends request and waits for response Responder: Processes request and sends response Real-World Use Cases Database Operations: Query execution with results API Gateways: Forwarding requests to microservices Cache Systems: Get/Set operations with confirmation File Operations: Read/Write with status feedback Validation Services: Input validation with results Authentication: Login requests with tokens Basic Request/Response Implementation package main import ( "fmt" "math/rand" "time" ) // Request represents a request with a response channel type Request struct { ID string Data interface{} Response chan Response } // Response represents the response to a request type Response struct { ID string Result interface{} Error error } // Server processes requests type Server struct { requests chan Request quit chan bool } // NewServer creates a new server func NewServer() *Server { return &Server{ requests: make(chan Request), quit: make(chan bool), } } // Start begins processing requests func (s *Server) Start() { go func() { for { select { case req := <-s.requests: s.processRequest(req) case <-s.quit: return } } }() } // processRequest handles a single request func (s *Server) processRequest(req Request) { // Simulate processing time time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the request (example: double the number) var response Response response.ID = req.ID if num, ok := req.Data.(int); ok { response.Result = num * 2 } else { response.Error = fmt.Errorf("invalid data type") } // Send response back req.Response <- response } // SendRequest sends a request and waits for response func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) { responseChan := make(chan Response, 1) request := Request{ ID: id, Data: data, Response: responseChan, } s.requests <- request // Wait for response response := <-responseChan return response.Result, response.Error } // Stop shuts down the server func (s *Server) Stop() { close(s.quit) } func main() { server := NewServer() server.Start() defer server.Stop() // Send multiple requests for i := 1; i <= 5; i++ { result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10) if err != nil { fmt.Printf("Request %d failed: %v\n", i, err) } else { fmt.Printf("Request %d result: %v\n", i, result) } } } Request/Response with Timeout package main import ( "context" "fmt" "math/rand" "time" ) // TimedRequest includes context for timeout handling type TimedRequest struct { ID string Data interface{} Response chan TimedResponse Context context.Context } // TimedResponse includes timing information type TimedResponse struct { ID string Result interface{} Error error Duration time.Duration Timestamp time.Time } // TimedServer processes requests with timeout support type TimedServer struct { requests chan TimedRequest quit chan bool } func NewTimedServer() *TimedServer { return &TimedServer{ requests: make(chan TimedRequest, 10), quit: make(chan bool), } } func (ts *TimedServer) Start() { go func() { for { select { case req := <-ts.requests: go ts.processTimedRequest(req) case <-ts.quit: return } } }() } func (ts *TimedServer) processTimedRequest(req TimedRequest) { start := time.Now() // Check if context is already cancelled select { case <-req.Context.Done(): ts.sendResponse(req, nil, req.Context.Err(), start) return default: } // Simulate work with random duration workDuration := time.Duration(rand.Intn(200)) * time.Millisecond select { case <-time.After(workDuration): // Work completed if num, ok := req.Data.(int); ok { ts.sendResponse(req, num*2, nil, start) } else { ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start) } case <-req.Context.Done(): // Context cancelled during work ts.sendResponse(req, nil, req.Context.Err(), start) } } func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) { response := TimedResponse{ ID: req.ID, Result: result, Error: err, Duration: time.Since(start), Timestamp: time.Now(), } select { case req.Response <- response: case <-req.Context.Done(): // Client no longer waiting } } // SendRequestWithTimeout sends a request with a timeout func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() responseChan := make(chan TimedResponse, 1) request := TimedRequest{ ID: id, Data: data, Response: responseChan, Context: ctx, } select { case ts.requests <- request: case <-ctx.Done(): return nil, ctx.Err() } select { case response := <-responseChan: fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration) return response.Result, response.Error case <-ctx.Done(): return nil, ctx.Err() } } func (ts *TimedServer) Stop() { close(ts.quit) } func main() { server := NewTimedServer() server.Start() defer server.Stop() // Send requests with different timeouts requests := []struct { id string data int timeout time.Duration }{ {"fast", 10, 300 * time.Millisecond}, {"medium", 20, 150 * time.Millisecond}, {"slow", 30, 50 * time.Millisecond}, // This might timeout } for _, req := range requests { result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout) if err != nil { fmt.Printf("Request %s failed: %v\n", req.id, err) } else { fmt.Printf("Request %s result: %v\n", req.id, result) } } } Future/Promise Pattern package main import ( "context" "fmt" "sync" "time" ) // Future represents a value that will be available in the future type Future struct { mu sync.Mutex done chan struct{} result interface{} err error computed bool } // NewFuture creates a new future func NewFuture() *Future { return &Future{ done: make(chan struct{}), } } // Set sets the future's value func (f *Future) Set(result interface{}, err error) { f.mu.Lock() defer f.mu.Unlock() if f.computed { return // Already set } f.result = result f.err = err f.computed = true close(f.done) } // Get waits for and returns the future's value func (f *Future) Get() (interface{}, error) { <-f.done return f.result, f.err } // GetWithTimeout waits for the value with a timeout func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-time.After(timeout): return nil, fmt.Errorf("timeout waiting for future") } } // GetWithContext waits for the value with context cancellation func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-ctx.Done(): return nil, ctx.Err() } } // IsReady returns true if the future has been computed func (f *Future) IsReady() bool { f.mu.Lock() defer f.mu.Unlock() return f.computed } // AsyncService demonstrates async operations with futures type AsyncService struct { workers chan struct{} } func NewAsyncService(maxWorkers int) *AsyncService { return &AsyncService{ workers: make(chan struct{}, maxWorkers), } } // ProcessAsync starts async processing and returns a future func (as *AsyncService) ProcessAsync(data interface{}) *Future { future := NewFuture() go func() { // Acquire worker slot as.workers <- struct{}{} defer func() { <-as.workers }() // Simulate processing time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) // Process data if num, ok := data.(int); ok { future.Set(num*num, nil) } else { future.Set(nil, fmt.Errorf("invalid data type")) } }() return future } func main() { service := NewAsyncService(3) // Start multiple async operations futures := make([]*Future, 5) for i := 0; i < 5; i++ { fmt.Printf("Starting async operation %d\n", i+1) futures[i] = service.ProcessAsync((i + 1) * 10) } // Wait for all results fmt.Println("\nWaiting for results...") for i, future := range futures { result, err := future.Get() if err != nil { fmt.Printf("Operation %d failed: %v\n", i+1, err) } else { fmt.Printf("Operation %d result: %v\n", i+1, result) } } // Example with timeout fmt.Println("\nTesting timeout...") timeoutFuture := service.ProcessAsync(100) result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond) if err != nil { fmt.Printf("Timeout example failed: %v\n", err) } else { fmt.Printf("Timeout example result: %v\n", result) } } Batch Request/Response package main import ( "fmt" "sync" "time" ) // BatchRequest represents multiple requests processed together type BatchRequest struct { ID string Items []interface{} Response chan BatchResponse } // BatchResponse contains results for all items in a batch type BatchResponse struct { ID string Results []BatchResult Error error } // BatchResult represents the result of processing one item type BatchResult struct { Index int Result interface{} Error error } // BatchProcessor processes requests in batches for efficiency type BatchProcessor struct { requests chan BatchRequest batchSize int batchWindow time.Duration quit chan bool } func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor { return &BatchProcessor{ requests: make(chan BatchRequest, 100), batchSize: batchSize, batchWindow: batchWindow, quit: make(chan bool), } } func (bp *BatchProcessor) Start() { go func() { batch := make([]BatchRequest, 0, bp.batchSize) timer := time.NewTimer(bp.batchWindow) timer.Stop() for { select { case req := <-bp.requests: batch = append(batch, req) if len(batch) == 1 { timer.Reset(bp.batchWindow) } if len(batch) >= bp.batchSize { bp.processBatch(batch) batch = batch[:0] timer.Stop() } case <-timer.C: if len(batch) > 0 { bp.processBatch(batch) batch = batch[:0] } case <-bp.quit: if len(batch) > 0 { bp.processBatch(batch) } return } } }() } func (bp *BatchProcessor) processBatch(batch []BatchRequest) { fmt.Printf("Processing batch of %d requests\n", len(batch)) var wg sync.WaitGroup for _, req := range batch { wg.Add(1) go func(r BatchRequest) { defer wg.Done() bp.processRequest(r) }(req) } wg.Wait() } func (bp *BatchProcessor) processRequest(req BatchRequest) { results := make([]BatchResult, len(req.Items)) for i, item := range req.Items { // Simulate processing each item time.Sleep(10 * time.Millisecond) if num, ok := item.(int); ok { results[i] = BatchResult{ Index: i, Result: num * 3, } } else { results[i] = BatchResult{ Index: i, Error: fmt.Errorf("invalid item type at index %d", i), } } } response := BatchResponse{ ID: req.ID, Results: results, } req.Response <- response } // SendBatchRequest sends a batch request and waits for response func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) { responseChan := make(chan BatchResponse, 1) request := BatchRequest{ ID: id, Items: items, Response: responseChan, } bp.requests <- request response := <-responseChan return response.Results, response.Error } func (bp *BatchProcessor) Stop() { close(bp.quit) } func main() { processor := NewBatchProcessor(3, 100*time.Millisecond) processor.Start() defer processor.Stop() // Send individual batch requests go func() { results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5}) if err != nil { fmt.Printf("Batch 1 failed: %v\n", err) return } fmt.Println("Batch 1 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() go func() { results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30}) if err != nil { fmt.Printf("Batch 2 failed: %v\n", err) return } fmt.Println("Batch 2 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() // Wait for processing time.Sleep(500 * time.Millisecond) } Best Practices Always Use Timeouts: Prevent indefinite blocking Handle Context Cancellation: Support graceful cancellation Buffer Response Channels: Avoid blocking responders Error Handling: Always include error information in responses Resource Cleanup: Ensure channels and goroutines are cleaned up Monitoring: Track request/response times and success rates Backpressure: Handle situations when responders are overwhelmed Common Pitfalls Deadlocks: Not buffering response channels Goroutine Leaks: Not handling context cancellation Memory Leaks: Not closing channels properly Blocking Operations: Long-running operations without timeouts Lost Responses: Not handling channel closure Testing Request/Response package main import ( "context" "testing" "time" ) func TestRequestResponse(t *testing.T) { server := NewTimedServer() server.Start() defer server.Stop() // Test successful request result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond) if err != nil { t.Fatalf("Request failed: %v", err) } if result != 84 { t.Errorf("Expected 84, got %v", result) } // Test timeout _, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond) if err == nil { t.Error("Expected timeout error") } } func TestFuture(t *testing.T) { future := NewFuture() // Test that future is not ready initially if future.IsReady() { t.Error("Future should not be ready initially") } // Set value in goroutine go func() { time.Sleep(50 * time.Millisecond) future.Set("test result", nil) }() // Get value result, err := future.Get() if err != nil { t.Fatalf("Future failed: %v", err) } if result != "test result" { t.Errorf("Expected 'test result', got %v", result) } // Test that future is ready after setting if !future.IsReady() { t.Error("Future should be ready after setting") } } The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation. ...

    July 31, 2024 · 10 min · Rafiul Alam

    Rate Limiter Pattern in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Semaphore Pattern → What is the Rate Limiter Pattern? Rate limiting controls the rate at which operations are performed, preventing system overload and ensuring fair resource usage. It’s essential for protecting services from abuse, managing resource consumption, and maintaining system stability under load. Common Algorithms: Token Bucket: Allows bursts up to bucket capacity Fixed Window: Fixed number of requests per time window Sliding Window: Smooth rate limiting over time Leaky Bucket: Constant output rate regardless of input Real-World Use Cases API Rate Limiting: Prevent API abuse and ensure fair usage Database Throttling: Control database query rates File Processing: Limit file processing rate Network Operations: Control bandwidth usage Background Jobs: Throttle job processing User Actions: Prevent spam and abuse Token Bucket Rate Limiter package main import ( "context" "fmt" "sync" "time" ) // TokenBucket implements the token bucket rate limiting algorithm type TokenBucket struct { mu sync.Mutex capacity int // Maximum number of tokens tokens int // Current number of tokens refillRate int // Tokens added per second lastRefill time.Time // Last refill time } // NewTokenBucket creates a new token bucket rate limiter func NewTokenBucket(capacity, refillRate int) *TokenBucket { return &TokenBucket{ capacity: capacity, tokens: capacity, // Start with full bucket refillRate: refillRate, lastRefill: time.Now(), } } // Allow checks if a request should be allowed func (tb *TokenBucket) Allow() bool { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() if tb.tokens > 0 { tb.tokens-- return true } return false } // AllowN checks if n requests should be allowed func (tb *TokenBucket) AllowN(n int) bool { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() if tb.tokens >= n { tb.tokens -= n return true } return false } // Wait waits until a token is available func (tb *TokenBucket) Wait(ctx context.Context) error { for { if tb.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } // refill adds tokens based on elapsed time func (tb *TokenBucket) refill() { now := time.Now() elapsed := now.Sub(tb.lastRefill) tokensToAdd := int(elapsed.Seconds() * float64(tb.refillRate)) if tokensToAdd > 0 { tb.tokens += tokensToAdd if tb.tokens > tb.capacity { tb.tokens = tb.capacity } tb.lastRefill = now } } // GetStats returns current bucket statistics func (tb *TokenBucket) GetStats() (tokens, capacity int) { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() return tb.tokens, tb.capacity } func main() { // Create a token bucket: 5 tokens capacity, 2 tokens per second refill rate limiter := NewTokenBucket(5, 2) fmt.Println("=== Token Bucket Rate Limiter Demo ===") // Test burst capability fmt.Println("\n--- Testing Burst Capability ---") for i := 1; i <= 7; i++ { allowed := limiter.Allow() tokens, capacity := limiter.GetStats() fmt.Printf("Request %d: %s (tokens: %d/%d)\n", i, allowedStatus(allowed), tokens, capacity) } // Wait for refill fmt.Println("\n--- Waiting 3 seconds for refill ---") time.Sleep(3 * time.Second) // Test after refill fmt.Println("\n--- Testing After Refill ---") for i := 1; i <= 4; i++ { allowed := limiter.Allow() tokens, capacity := limiter.GetStats() fmt.Printf("Request %d: %s (tokens: %d/%d)\n", i, allowedStatus(allowed), tokens, capacity) } // Test AllowN fmt.Println("\n--- Testing AllowN (requesting 3 tokens) ---") allowed := limiter.AllowN(3) tokens, capacity := limiter.GetStats() fmt.Printf("Bulk request: %s (tokens: %d/%d)\n", allowedStatus(allowed), tokens, capacity) } func allowedStatus(allowed bool) string { if allowed { return " ALLOWED" } return " DENIED" } Sliding Window Rate Limiter package main import ( "fmt" "sync" "time" ) // SlidingWindow implements sliding window rate limiting type SlidingWindow struct { mu sync.Mutex requests []time.Time limit int // Maximum requests per window window time.Duration // Time window duration } // NewSlidingWindow creates a new sliding window rate limiter func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow { return &SlidingWindow{ requests: make([]time.Time, 0), limit: limit, window: window, } } // Allow checks if a request should be allowed func (sw *SlidingWindow) Allow() bool { sw.mu.Lock() defer sw.mu.Unlock() now := time.Now() sw.cleanOldRequests(now) if len(sw.requests) < sw.limit { sw.requests = append(sw.requests, now) return true } return false } // cleanOldRequests removes requests outside the current window func (sw *SlidingWindow) cleanOldRequests(now time.Time) { cutoff := now.Add(-sw.window) // Find first request within window start := 0 for i, req := range sw.requests { if req.After(cutoff) { start = i break } start = len(sw.requests) // All requests are old } // Keep only recent requests if start > 0 { copy(sw.requests, sw.requests[start:]) sw.requests = sw.requests[:len(sw.requests)-start] } } // GetStats returns current window statistics func (sw *SlidingWindow) GetStats() (current, limit int, window time.Duration) { sw.mu.Lock() defer sw.mu.Unlock() sw.cleanOldRequests(time.Now()) return len(sw.requests), sw.limit, sw.window } // GetRequestTimes returns timestamps of requests in current window func (sw *SlidingWindow) GetRequestTimes() []time.Time { sw.mu.Lock() defer sw.mu.Unlock() sw.cleanOldRequests(time.Now()) result := make([]time.Time, len(sw.requests)) copy(result, sw.requests) return result } func main() { // Create sliding window: 3 requests per 2 seconds limiter := NewSlidingWindow(3, 2*time.Second) fmt.Println("=== Sliding Window Rate Limiter Demo ===") fmt.Println("Limit: 3 requests per 2 seconds") // Test requests over time for i := 1; i <= 8; i++ { allowed := limiter.Allow() current, limit, window := limiter.GetStats() fmt.Printf("Request %d: %s (current: %d/%d in %v window)\n", i, allowedStatus(allowed), current, limit, window) if i == 4 { fmt.Println("--- Waiting 1 second ---") time.Sleep(1 * time.Second) } else if i == 6 { fmt.Println("--- Waiting 1.5 seconds ---") time.Sleep(1500 * time.Millisecond) } else { time.Sleep(200 * time.Millisecond) } } // Show request timeline fmt.Println("\n--- Request Timeline ---") requests := limiter.GetRequestTimes() now := time.Now() for i, req := range requests { age := now.Sub(req) fmt.Printf("Request %d: %v ago\n", i+1, age.Round(time.Millisecond)) } } Fixed Window Rate Limiter package main import ( "fmt" "sync" "time" ) // FixedWindow implements fixed window rate limiting type FixedWindow struct { mu sync.Mutex limit int // Maximum requests per window window time.Duration // Window duration currentCount int // Current window request count windowStart time.Time // Current window start time } // NewFixedWindow creates a new fixed window rate limiter func NewFixedWindow(limit int, window time.Duration) *FixedWindow { return &FixedWindow{ limit: limit, window: window, windowStart: time.Now(), } } // Allow checks if a request should be allowed func (fw *FixedWindow) Allow() bool { fw.mu.Lock() defer fw.mu.Unlock() now := time.Now() // Check if we need to start a new window if now.Sub(fw.windowStart) >= fw.window { fw.currentCount = 0 fw.windowStart = now } if fw.currentCount < fw.limit { fw.currentCount++ return true } return false } // GetStats returns current window statistics func (fw *FixedWindow) GetStats() (current, limit int, windowRemaining time.Duration) { fw.mu.Lock() defer fw.mu.Unlock() now := time.Now() elapsed := now.Sub(fw.windowStart) if elapsed >= fw.window { return 0, fw.limit, fw.window } return fw.currentCount, fw.limit, fw.window - elapsed } func main() { // Create fixed window: 3 requests per 2 seconds limiter := NewFixedWindow(3, 2*time.Second) fmt.Println("=== Fixed Window Rate Limiter Demo ===") fmt.Println("Limit: 3 requests per 2 seconds") // Test requests over time for i := 1; i <= 10; i++ { allowed := limiter.Allow() current, limit, remaining := limiter.GetStats() fmt.Printf("Request %d: %s (current: %d/%d, window resets in: %v)\n", i, allowedStatus(allowed), current, limit, remaining.Round(time.Millisecond)) time.Sleep(400 * time.Millisecond) } } Advanced Rate Limiter with Multiple Algorithms package main import ( "context" "fmt" "sync" "time" ) // RateLimiterType represents different rate limiting algorithms type RateLimiterType int const ( TokenBucketType RateLimiterType = iota SlidingWindowType FixedWindowType ) // RateLimiter interface for different rate limiting algorithms type RateLimiter interface { Allow() bool Wait(ctx context.Context) error GetStats() map[string]interface{} } // MultiRateLimiter combines multiple rate limiters type MultiRateLimiter struct { limiters []RateLimiter names []string } // NewMultiRateLimiter creates a new multi-algorithm rate limiter func NewMultiRateLimiter() *MultiRateLimiter { return &MultiRateLimiter{ limiters: make([]RateLimiter, 0), names: make([]string, 0), } } // AddLimiter adds a rate limiter with a name func (mrl *MultiRateLimiter) AddLimiter(name string, limiter RateLimiter) { mrl.limiters = append(mrl.limiters, limiter) mrl.names = append(mrl.names, name) } // Allow checks if request is allowed by all limiters func (mrl *MultiRateLimiter) Allow() bool { for _, limiter := range mrl.limiters { if !limiter.Allow() { return false } } return true } // Wait waits until all limiters allow the request func (mrl *MultiRateLimiter) Wait(ctx context.Context) error { for _, limiter := range mrl.limiters { if err := limiter.Wait(ctx); err != nil { return err } } return nil } // GetStats returns stats from all limiters func (mrl *MultiRateLimiter) GetStats() map[string]interface{} { stats := make(map[string]interface{}) for i, limiter := range mrl.limiters { stats[mrl.names[i]] = limiter.GetStats() } return stats } // Enhanced TokenBucket with RateLimiter interface type EnhancedTokenBucket struct { *TokenBucket } func (etb *EnhancedTokenBucket) GetStats() map[string]interface{} { tokens, capacity := etb.TokenBucket.GetStats() return map[string]interface{}{ "type": "token_bucket", "tokens": tokens, "capacity": capacity, "rate": etb.refillRate, } } // Enhanced SlidingWindow with RateLimiter interface type EnhancedSlidingWindow struct { *SlidingWindow } func (esw *EnhancedSlidingWindow) Wait(ctx context.Context) error { for { if esw.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } func (esw *EnhancedSlidingWindow) GetStats() map[string]interface{} { current, limit, window := esw.SlidingWindow.GetStats() return map[string]interface{}{ "type": "sliding_window", "current": current, "limit": limit, "window": window.String(), } } // Enhanced FixedWindow with RateLimiter interface type EnhancedFixedWindow struct { *FixedWindow } func (efw *EnhancedFixedWindow) Wait(ctx context.Context) error { for { if efw.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } func (efw *EnhancedFixedWindow) GetStats() map[string]interface{} { current, limit, remaining := efw.FixedWindow.GetStats() return map[string]interface{}{ "type": "fixed_window", "current": current, "limit": limit, "remaining": remaining.String(), } } // RateLimitedService demonstrates rate limiting in a service type RateLimitedService struct { limiter RateLimiter mu sync.Mutex stats struct { totalRequests int allowedRequests int deniedRequests int } } // NewRateLimitedService creates a new rate limited service func NewRateLimitedService(limiter RateLimiter) *RateLimitedService { return &RateLimitedService{ limiter: limiter, } } // ProcessRequest processes a request with rate limiting func (rls *RateLimitedService) ProcessRequest(ctx context.Context, requestID string) error { rls.mu.Lock() rls.stats.totalRequests++ rls.mu.Unlock() if !rls.limiter.Allow() { rls.mu.Lock() rls.stats.deniedRequests++ rls.mu.Unlock() return fmt.Errorf("request %s denied by rate limiter", requestID) } rls.mu.Lock() rls.stats.allowedRequests++ rls.mu.Unlock() // Simulate processing time.Sleep(50 * time.Millisecond) fmt.Printf(" Processed request %s\n", requestID) return nil } // GetServiceStats returns service statistics func (rls *RateLimitedService) GetServiceStats() map[string]interface{} { rls.mu.Lock() defer rls.mu.Unlock() return map[string]interface{}{ "total_requests": rls.stats.totalRequests, "allowed_requests": rls.stats.allowedRequests, "denied_requests": rls.stats.deniedRequests, "rate_limiter": rls.limiter.GetStats(), } } func main() { // Create multi-algorithm rate limiter multiLimiter := NewMultiRateLimiter() // Add different rate limiters multiLimiter.AddLimiter("token_bucket", &EnhancedTokenBucket{ TokenBucket: NewTokenBucket(5, 2), // 5 tokens, 2 per second }) multiLimiter.AddLimiter("sliding_window", &EnhancedSlidingWindow{ SlidingWindow: NewSlidingWindow(3, 2*time.Second), // 3 requests per 2 seconds }) multiLimiter.AddLimiter("fixed_window", &EnhancedFixedWindow{ FixedWindow: NewFixedWindow(4, 3*time.Second), // 4 requests per 3 seconds }) service := NewRateLimitedService(multiLimiter) fmt.Println("=== Multi-Algorithm Rate Limiter Demo ===") fmt.Println("Using Token Bucket (5 tokens, 2/sec) + Sliding Window (3/2sec) + Fixed Window (4/3sec)") // Simulate concurrent requests var wg sync.WaitGroup for i := 1; i <= 15; i++ { wg.Add(1) go func(id int) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() requestID := fmt.Sprintf("req-%d", id) err := service.ProcessRequest(ctx, requestID) if err != nil { fmt.Printf(" %v\n", err) } }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() // Print final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetServiceStats() fmt.Printf("Total Requests: %d\n", stats["total_requests"]) fmt.Printf("Allowed Requests: %d\n", stats["allowed_requests"]) fmt.Printf("Denied Requests: %d\n", stats["denied_requests"]) fmt.Println("\nRate Limiter Details:") rateLimiterStats := stats["rate_limiter"].(map[string]interface{}) for name, limiterStats := range rateLimiterStats { fmt.Printf(" %s: %+v\n", name, limiterStats) } } Best Practices Choose Right Algorithm: Select based on your specific requirements Token Bucket: Allow bursts, good for APIs Sliding Window: Smooth rate limiting Fixed Window: Simple, memory efficient Configure Appropriately: Set limits based on system capacity Handle Rejections Gracefully: Provide meaningful error messages Monitor Metrics: Track allowed/denied requests and adjust limits Use Context: Support cancellation in Wait operations Consider Distributed Systems: Use Redis or similar for distributed rate limiting Implement Backoff: Add exponential backoff for denied requests Common Pitfalls Too Restrictive: Setting limits too low affects user experience Too Permissive: High limits don’t protect against abuse Memory Leaks: Not cleaning old requests in sliding window Race Conditions: Not properly synchronizing access to counters Ignoring Bursts: Fixed windows can allow double the limit at boundaries Rate limiting is essential for protecting services from overload and ensuring fair resource usage. Choose the right algorithm based on your requirements and always monitor the effectiveness of your rate limiting strategy. ...

    July 24, 2024 · 10 min · Rafiul Alam

    Pub/Sub Pattern in Go

    Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response → What is the Pub/Sub Pattern? The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies. Key Components: Publisher: Sends messages/events Subscriber: Receives and processes messages/events Message Broker: Routes messages from publishers to subscribers Topics/Channels: Categories for organizing messages Real-World Use Cases Event-Driven Architecture: Microservices communication Real-Time Notifications: User activity feeds, alerts Data Streaming: Log aggregation, metrics collection UI Updates: React to state changes across components Workflow Orchestration: Trigger actions based on events Cache Invalidation: Notify when data changes Basic Pub/Sub Implementation package main import ( "fmt" "sync" "time" ) // Message represents a pub/sub message type Message struct { Topic string Payload interface{} } // Subscriber represents a message handler type Subscriber func(Message) // PubSub is a simple in-memory pub/sub system type PubSub struct { mu sync.RWMutex subscribers map[string][]Subscriber closed bool } // NewPubSub creates a new pub/sub instance func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]Subscriber), } } // Subscribe adds a subscriber to a topic func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) { ps.mu.Lock() defer ps.mu.Unlock() if ps.closed { return } ps.subscribers[topic] = append(ps.subscribers[topic], subscriber) } // Publish sends a message to all subscribers of a topic func (ps *PubSub) Publish(topic string, payload interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } message := Message{ Topic: topic, Payload: payload, } // Send to all subscribers asynchronously for _, subscriber := range ps.subscribers[topic] { go subscriber(message) } } // Close shuts down the pub/sub system func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true } func main() { pubsub := NewPubSub() defer pubsub.Close() // Subscribe to user events pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Email service: Welcome %v!\n", msg.Payload) }) pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Analytics: New user registered: %v\n", msg.Payload) }) pubsub.Subscribe("user.deleted", func(msg Message) { fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload) }) // Publish events pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.deleted", "[email protected]") // Wait for async processing time.Sleep(100 * time.Millisecond) } Advanced Pub/Sub with Channels package main import ( "context" "fmt" "sync" "time" ) // Event represents a structured event type Event struct { ID string Type string Timestamp time.Time Data interface{} } // Subscription represents an active subscription type Subscription struct { ID string Topic string Channel chan Event Filter func(Event) bool cancel context.CancelFunc } // Close cancels the subscription func (s *Subscription) Close() { if s.cancel != nil { s.cancel() } } // EventBus is a channel-based pub/sub system type EventBus struct { mu sync.RWMutex subscriptions map[string][]*Subscription buffer int closed bool } // NewEventBus creates a new event bus func NewEventBus(bufferSize int) *EventBus { return &EventBus{ subscriptions: make(map[string][]*Subscription), buffer: bufferSize, } } // Subscribe creates a new subscription with optional filtering func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription { eb.mu.Lock() defer eb.mu.Unlock() if eb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) subscription := &Subscription{ ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan Event, eb.buffer), Filter: filter, cancel: cancel, } eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription) // Clean up subscription when context is cancelled go func() { <-subCtx.Done() eb.unsubscribe(subscription) close(subscription.Channel) }() return subscription } // unsubscribe removes a subscription func (eb *EventBus) unsubscribe(sub *Subscription) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscriptions[sub.Topic] for i, s := range subs { if s.ID == sub.ID { eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish sends an event to all matching subscribers func (eb *EventBus) Publish(event Event) { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return } event.Timestamp = time.Now() for _, subscription := range eb.subscriptions[event.Type] { // Apply filter if present if subscription.Filter != nil && !subscription.Filter(event) { continue } // Non-blocking send select { case subscription.Channel <- event: default: // Channel is full, could log this fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID) } } } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true // Close all subscriptions for _, subs := range eb.subscriptions { for _, sub := range subs { sub.Close() } } } func main() { ctx := context.Background() eventBus := NewEventBus(10) defer eventBus.Close() // Subscribe to all user events userSub := eventBus.Subscribe(ctx, "user", nil) // Subscribe to only high-priority events prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool { if data, ok := e.Data.(map[string]interface{}); ok { return data["priority"] == "high" } return false }) // Start event processors go func() { for event := range userSub.Channel { fmt.Printf("User processor: %s - %v\n", event.Type, event.Data) } }() go func() { for event := range prioritySub.Channel { fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data) } }() // Publish events eventBus.Publish(Event{ ID: "1", Type: "user", Data: map[string]interface{}{ "action": "login", "user": "john", "priority": "low", }, }) eventBus.Publish(Event{ ID: "2", Type: "user", Data: map[string]interface{}{ "action": "payment", "user": "jane", "priority": "high", }, }) time.Sleep(100 * time.Millisecond) } Persistent Pub/Sub with Replay package main import ( "context" "fmt" "sync" "time" ) // StoredEvent represents an event with storage metadata type StoredEvent struct { Event Sequence int64 Stored time.Time } // PersistentEventBus stores events and supports replay type PersistentEventBus struct { mu sync.RWMutex events []StoredEvent sequence int64 subs map[string][]*PersistentSubscription closed bool } // PersistentSubscription supports replay from a specific point type PersistentSubscription struct { ID string Topic string Channel chan StoredEvent FromSeq int64 cancel context.CancelFunc } func (s *PersistentSubscription) Close() { if s.cancel != nil { s.cancel() } } // NewPersistentEventBus creates a new persistent event bus func NewPersistentEventBus() *PersistentEventBus { return &PersistentEventBus{ events: make([]StoredEvent, 0), subs: make(map[string][]*PersistentSubscription), } } // Subscribe creates a subscription with optional replay func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) sub := &PersistentSubscription{ ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan StoredEvent, 100), FromSeq: fromSequence, cancel: cancel, } peb.subs[topic] = append(peb.subs[topic], sub) // Replay historical events if requested if fromSequence >= 0 { go peb.replayEvents(sub) } // Clean up on context cancellation go func() { <-subCtx.Done() peb.unsubscribe(sub) close(sub.Channel) }() return sub } // replayEvents sends historical events to a subscription func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) { peb.mu.RLock() defer peb.mu.RUnlock() for _, storedEvent := range peb.events { if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic { select { case sub.Channel <- storedEvent: default: // Channel full, skip } } } } // unsubscribe removes a subscription func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) { peb.mu.Lock() defer peb.mu.Unlock() subs := peb.subs[sub.Topic] for i, s := range subs { if s.ID == sub.ID { peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish stores and distributes an event func (peb *PersistentEventBus) Publish(event Event) int64 { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return -1 } peb.sequence++ storedEvent := StoredEvent{ Event: event, Sequence: peb.sequence, Stored: time.Now(), } // Store event peb.events = append(peb.events, storedEvent) // Distribute to current subscribers for _, sub := range peb.subs[event.Type] { select { case sub.Channel <- storedEvent: default: // Channel full } } return peb.sequence } // GetLastSequence returns the last event sequence number func (peb *PersistentEventBus) GetLastSequence() int64 { peb.mu.RLock() defer peb.mu.RUnlock() return peb.sequence } func main() { ctx := context.Background() eventBus := NewPersistentEventBus() // Publish some initial events eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"}) eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"}) eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"}) fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence()) // Subscribe from the beginning (replay all events) replaySub := eventBus.Subscribe(ctx, "order", 0) // Subscribe from current point (no replay) liveSub := eventBus.Subscribe(ctx, "order", -1) // Process replayed events go func() { fmt.Println("Replay subscription:") for event := range replaySub.Channel { fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data) } }() // Process live events go func() { fmt.Println("Live subscription:") for event := range liveSub.Channel { fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data) } }() time.Sleep(100 * time.Millisecond) // Publish new events eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"}) eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"}) time.Sleep(100 * time.Millisecond) replaySub.Close() liveSub.Close() } Typed Pub/Sub System package main import ( "context" "fmt" "reflect" "sync" ) // TypedEventBus provides type-safe pub/sub type TypedEventBus struct { mu sync.RWMutex handlers map[reflect.Type][]reflect.Value closed bool } // NewTypedEventBus creates a new typed event bus func NewTypedEventBus() *TypedEventBus { return &TypedEventBus{ handlers: make(map[reflect.Type][]reflect.Value), } } // Subscribe registers a handler for a specific event type func (teb *TypedEventBus) Subscribe(handler interface{}) { teb.mu.Lock() defer teb.mu.Unlock() if teb.closed { return } handlerValue := reflect.ValueOf(handler) handlerType := handlerValue.Type() // Validate handler signature: func(EventType) if handlerType.Kind() != reflect.Func || handlerType.NumIn() != 1 || handlerType.NumOut() != 0 { panic("Handler must be func(EventType)") } eventType := handlerType.In(0) teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue) } // Publish sends an event to all registered handlers func (teb *TypedEventBus) Publish(event interface{}) { teb.mu.RLock() defer teb.mu.RUnlock() if teb.closed { return } eventType := reflect.TypeOf(event) eventValue := reflect.ValueOf(event) for _, handler := range teb.handlers[eventType] { go handler.Call([]reflect.Value{eventValue}) } } // Event types type UserCreated struct { UserID string Email string } type OrderPlaced struct { OrderID string UserID string Amount float64 } type PaymentProcessed struct { PaymentID string OrderID string Success bool } func main() { eventBus := NewTypedEventBus() // Subscribe to different event types eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Email service: Send welcome email to %s\n", event.Email) }) eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Analytics: Track user registration %s\n", event.UserID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", event.OrderID, event.Amount) }) eventBus.Subscribe(func(event PaymentProcessed) { if event.Success { fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID) } else { fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID) } }) // Publish events eventBus.Publish(UserCreated{ UserID: "user123", Email: "[email protected]", }) eventBus.Publish(OrderPlaced{ OrderID: "order456", UserID: "user123", Amount: 99.99, }) eventBus.Publish(PaymentProcessed{ PaymentID: "pay789", OrderID: "order456", Success: true, }) // Wait for async processing time.Sleep(100 * time.Millisecond) } Best Practices Async Processing: Handle events asynchronously to avoid blocking publishers Error Handling: Implement proper error handling in subscribers Buffering: Use buffered channels to handle bursts of events Graceful Shutdown: Ensure clean shutdown of all subscribers Dead Letter Queues: Handle failed message processing Monitoring: Track message rates, processing times, and failures Type Safety: Use typed events when possible Idempotency: Design subscribers to handle duplicate messages Common Pitfalls Memory Leaks: Not closing subscriptions properly Blocking Publishers: Slow subscribers blocking the entire system Lost Messages: Not handling channel buffer overflows Circular Dependencies: Events triggering other events in loops No Error Handling: Panics in subscribers affecting the system Testing Pub/Sub Systems package main import ( "context" "testing" "time" ) func TestEventBus(t *testing.T) { eventBus := NewEventBus(10) defer eventBus.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Subscribe to events sub := eventBus.Subscribe(ctx, "test", nil) // Publish event testEvent := Event{ ID: "test1", Type: "test", Data: "test data", } eventBus.Publish(testEvent) // Verify event received select { case received := <-sub.Channel: if received.ID != testEvent.ID { t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("Event not received within timeout") } } The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication. ...

    July 17, 2024 · 9 min · Rafiul Alam

    Once Pattern in Go

    Go Concurrency Patterns Series: ← WaitGroup Pattern | Series Overview | Context Pattern → What is the Once Pattern? The Once pattern uses sync.Once to ensure that a piece of code executes exactly once, regardless of how many goroutines call it. This is essential for thread-safe initialization, singleton patterns, and one-time setup operations in concurrent programs. Key Characteristics: Thread-safe: Multiple goroutines can call it safely Exactly once: Code executes only on the first call Blocking: Subsequent calls wait for the first execution to complete No return values: The function passed to Do() cannot return values Real-World Use Cases Singleton Initialization: Create single instances of objects Configuration Loading: Load config files once at startup Database Connections: Initialize connection pools Logger Setup: Configure logging systems Resource Initialization: Set up expensive resources Feature Flags: Initialize feature flag systems Basic Once Usage package main import ( "fmt" "sync" "time" ) var ( instance *Database once sync.Once ) // Database represents a database connection type Database struct { ConnectionString string IsConnected bool } // Connect simulates database connection func (db *Database) Connect() { fmt.Println("Connecting to database...") time.Sleep(100 * time.Millisecond) // Simulate connection time db.IsConnected = true fmt.Println("Database connected!") } // GetDatabase returns the singleton database instance func GetDatabase() *Database { once.Do(func() { fmt.Println("Initializing database instance...") instance = &Database{ ConnectionString: "localhost:5432", } instance.Connect() }) return instance } func main() { var wg sync.WaitGroup // Multiple goroutines trying to get database instance for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Goroutine %d requesting database\n", id) db := GetDatabase() fmt.Printf("Goroutine %d got database: %+v\n", id, db) }(i) } wg.Wait() // Verify all goroutines got the same instance fmt.Printf("Final instance: %p\n", GetDatabase()) } Configuration Manager with Once package main import ( "encoding/json" "fmt" "os" "sync" ) // Config represents application configuration type Config struct { DatabaseURL string `json:"database_url"` APIKey string `json:"api_key"` Debug bool `json:"debug"` Port int `json:"port"` } // ConfigManager manages application configuration type ConfigManager struct { config *Config once sync.Once err error } // NewConfigManager creates a new config manager func NewConfigManager() *ConfigManager { return &ConfigManager{} } // loadConfig loads configuration from file func (cm *ConfigManager) loadConfig() { fmt.Println("Loading configuration...") // Simulate config file reading configData := `{ "database_url": "postgres://localhost:5432/myapp", "api_key": "secret-api-key-123", "debug": true, "port": 8080 }` var config Config if err := json.Unmarshal([]byte(configData), &config); err != nil { cm.err = fmt.Errorf("failed to parse config: %w", err) return } cm.config = &config fmt.Println("Configuration loaded successfully!") } // GetConfig returns the configuration, loading it once if needed func (cm *ConfigManager) GetConfig() (*Config, error) { cm.once.Do(cm.loadConfig) return cm.config, cm.err } func main() { configManager := NewConfigManager() var wg sync.WaitGroup // Multiple goroutines accessing configuration for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() config, err := configManager.GetConfig() if err != nil { fmt.Printf("Goroutine %d: Error loading config: %v\n", id, err) return } fmt.Printf("Goroutine %d: Port=%d, Debug=%v\n", id, config.Port, config.Debug) }(i) } wg.Wait() } Logger Initialization with Once package main import ( "fmt" "log" "os" "sync" ) // Logger wraps the standard logger with additional functionality type Logger struct { *log.Logger level string } var ( logger *Logger loggerOnce sync.Once ) // initLogger initializes the global logger func initLogger() { fmt.Println("Initializing logger...") // Create log file file, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalln("Failed to open log file:", err) } logger = &Logger{ Logger: log.New(file, "APP: ", log.Ldate|log.Ltime|log.Lshortfile), level: "INFO", } logger.Println("Logger initialized") fmt.Println("Logger setup complete!") } // GetLogger returns the singleton logger instance func GetLogger() *Logger { loggerOnce.Do(initLogger) return logger } // Info logs an info message func (l *Logger) Info(msg string) { l.Printf("[INFO] %s", msg) } // Error logs an error message func (l *Logger) Error(msg string) { l.Printf("[ERROR] %s", msg) } func main() { var wg sync.WaitGroup // Multiple goroutines using the logger for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() logger := GetLogger() logger.Info(fmt.Sprintf("Message from goroutine %d", id)) if id%2 == 0 { logger.Error(fmt.Sprintf("Error from goroutine %d", id)) } }(i) } wg.Wait() // Clean up if logger != nil { logger.Info("Application shutting down") } } Resource Pool Initialization package main import ( "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int Connected bool } // Connect simulates connecting to database func (c *Connection) Connect() error { time.Sleep(50 * time.Millisecond) // Simulate connection time c.Connected = true return nil } // Close simulates closing the connection func (c *Connection) Close() error { c.Connected = false return nil } // ConnectionPool manages a pool of database connections type ConnectionPool struct { connections []*Connection available chan *Connection once sync.Once initErr error } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { return &ConnectionPool{ available: make(chan *Connection, size), } } // initialize sets up the connection pool func (cp *ConnectionPool) initialize() { fmt.Println("Initializing connection pool...") poolSize := cap(cp.available) cp.connections = make([]*Connection, poolSize) // Create and connect all connections for i := 0; i < poolSize; i++ { conn := &Connection{ID: i + 1} if err := conn.Connect(); err != nil { cp.initErr = fmt.Errorf("failed to connect connection %d: %w", i+1, err) return } cp.connections[i] = conn cp.available <- conn } fmt.Printf("Connection pool initialized with %d connections\n", poolSize) } // GetConnection gets a connection from the pool func (cp *ConnectionPool) GetConnection() (*Connection, error) { cp.once.Do(cp.initialize) if cp.initErr != nil { return nil, cp.initErr } select { case conn := <-cp.available: return conn, nil case <-time.After(5 * time.Second): return nil, fmt.Errorf("timeout waiting for connection") } } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { select { case cp.available <- conn: default: // Pool is full, close the connection conn.Close() } } // Close closes all connections in the pool func (cp *ConnectionPool) Close() error { close(cp.available) for _, conn := range cp.connections { if conn != nil { conn.Close() } } return nil } func main() { pool := NewConnectionPool(3) defer pool.Close() var wg sync.WaitGroup // Multiple goroutines using the connection pool for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, err := pool.GetConnection() if err != nil { fmt.Printf("Worker %d: Failed to get connection: %v\n", id, err) return } fmt.Printf("Worker %d: Got connection %d\n", id, conn.ID) // Simulate work time.Sleep(200 * time.Millisecond) pool.ReturnConnection(conn) fmt.Printf("Worker %d: Returned connection %d\n", id, conn.ID) }(i) } wg.Wait() } Advanced Once Patterns 1. Once with Error Handling package main import ( "fmt" "sync" ) // OnceWithError provides Once functionality with error handling type OnceWithError struct { once sync.Once err error } // Do executes the function once and stores any error func (o *OnceWithError) Do(f func() error) error { o.once.Do(func() { o.err = f() }) return o.err } // ExpensiveResource represents a resource that's expensive to initialize type ExpensiveResource struct { Data string } var ( resource *ExpensiveResource resourceOnce OnceWithError ) // initResource initializes the expensive resource func initResource() error { fmt.Println("Initializing expensive resource...") // Simulate potential failure if false { // Change to true to simulate error return fmt.Errorf("failed to initialize resource") } resource = &ExpensiveResource{ Data: "Important data", } fmt.Println("Resource initialized successfully!") return nil } // GetResource returns the resource, initializing it once if needed func GetResource() (*ExpensiveResource, error) { err := resourceOnce.Do(initResource) if err != nil { return nil, err } return resource, nil } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := GetResource() if err != nil { fmt.Printf("Goroutine %d: Error: %v\n", id, err) return } fmt.Printf("Goroutine %d: Got resource: %s\n", id, resource.Data) }(i) } wg.Wait() } 2. Resettable Once package main import ( "fmt" "sync" "sync/atomic" ) // ResettableOnce allows resetting the once behavior type ResettableOnce struct { mu sync.Mutex done uint32 } // Do executes the function once func (ro *ResettableOnce) Do(f func()) { if atomic.LoadUint32(&ro.done) == 0 { ro.doSlow(f) } } func (ro *ResettableOnce) doSlow(f func()) { ro.mu.Lock() defer ro.mu.Unlock() if ro.done == 0 { defer atomic.StoreUint32(&ro.done, 1) f() } } // Reset allows the once to be used again func (ro *ResettableOnce) Reset() { ro.mu.Lock() defer ro.mu.Unlock() atomic.StoreUint32(&ro.done, 0) } // IsDone returns true if the function has been executed func (ro *ResettableOnce) IsDone() bool { return atomic.LoadUint32(&ro.done) == 1 } func main() { var once ResettableOnce counter := 0 task := func() { counter++ fmt.Printf("Task executed, counter: %d\n", counter) } // First round fmt.Println("First round:") for i := 0; i < 3; i++ { once.Do(task) } fmt.Printf("Done: %v\n", once.IsDone()) // Reset and second round fmt.Println("\nAfter reset:") once.Reset() fmt.Printf("Done: %v\n", once.IsDone()) for i := 0; i < 3; i++ { once.Do(task) } } Best Practices Use for Initialization: Perfect for one-time setup operations Keep Functions Simple: The function passed to Do() should be straightforward Handle Errors Separately: Use wrapper types for error handling Avoid Side Effects: Be careful with functions that have external side effects Don’t Nest Once Calls: Avoid calling Do() from within another Do() Consider Alternatives: Use init() for package-level initialization when appropriate Common Pitfalls 1. Expecting Return Values // Bad: Once.Do doesn't support return values var once sync.Once var result string func badExample() string { once.Do(func() { // Can't return from here result = "computed value" }) return result // This works but is not ideal } // Good: Use a wrapper or store results in accessible variables type OnceResult struct { once sync.Once result string err error } func (or *OnceResult) Get() (string, error) { or.once.Do(func() { or.result, or.err = computeValue() }) return or.result, or.err } 2. Panic in Once Function // Bad: Panic prevents future calls var once sync.Once func badOnceFunc() { once.Do(func() { panic("something went wrong") // Once will never execute again }) } // Good: Handle panics appropriately func goodOnceFunc() { once.Do(func() { defer func() { if r := recover(); r != nil { // Handle panic appropriately fmt.Printf("Recovered from panic: %v\n", r) } }() // risky operation }) } Testing Once Patterns package main import ( "sync" "testing" ) func TestOnceExecution(t *testing.T) { var once sync.Once counter := 0 var wg sync.WaitGroup // Start multiple goroutines for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() once.Do(func() { counter++ }) }() } wg.Wait() if counter != 1 { t.Errorf("Expected counter to be 1, got %d", counter) } } func TestOnceWithError(t *testing.T) { var onceErr OnceWithError callCount := 0 // First call with error err1 := onceErr.Do(func() error { callCount++ return fmt.Errorf("test error") }) // Second call should return same error without executing function err2 := onceErr.Do(func() error { callCount++ return nil }) if callCount != 1 { t.Errorf("Expected function to be called once, got %d", callCount) } if err1 == nil || err2 == nil { t.Error("Expected both calls to return error") } if err1.Error() != err2.Error() { t.Error("Expected same error from both calls") } } The Once pattern is essential for thread-safe initialization in Go. It ensures that expensive or critical setup operations happen exactly once, making it perfect for singletons, configuration loading, and resource initialization in concurrent applications. ...

    July 10, 2024 · 9 min · Rafiul Alam

    Mutex Patterns in Go

    Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern → What are Mutex Patterns? Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions. Key Types: Mutex: Exclusive access (one goroutine at a time) RWMutex: Reader-writer locks (multiple readers OR one writer) Lock-free: Atomic operations without explicit locks Real-World Use Cases Shared Counters: Statistics, metrics, rate limiting Cache Systems: Thread-safe caching with read/write operations Configuration Management: Safe updates to application config Connection Pools: Managing database/HTTP connection pools Resource Allocation: Tracking and managing limited resources State Machines: Protecting state transitions Basic Mutex Usage package main import ( "fmt" "sync" "time" ) // Counter demonstrates basic mutex usage type Counter struct { mu sync.Mutex value int } // Increment safely increments the counter func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } // Decrement safely decrements the counter func (c *Counter) Decrement() { c.mu.Lock() defer c.mu.Unlock() c.value-- } // Value safely returns the current value func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Add safely adds a value to the counter func (c *Counter) Add(delta int) { c.mu.Lock() defer c.mu.Unlock() c.value += delta } func main() { counter := &Counter{} var wg sync.WaitGroup // Start multiple goroutines incrementing the counter for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { counter.Increment() } fmt.Printf("Goroutine %d finished\n", id) }(i) } // Start some goroutines decrementing for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { counter.Decrement() } fmt.Printf("Decrementer %d finished\n", id) }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850 } RWMutex for Read-Heavy Workloads package main import ( "fmt" "math/rand" "sync" "time" ) // Cache demonstrates RWMutex usage for read-heavy scenarios type Cache struct { mu sync.RWMutex data map[string]interface{} } // NewCache creates a new cache func NewCache() *Cache { return &Cache{ data: make(map[string]interface{}), } } // Get retrieves a value (read operation) func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() defer c.mu.RUnlock() value, exists := c.data[key] return value, exists } // Set stores a value (write operation) func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() defer c.mu.Unlock() c.data[key] = value } // Delete removes a value (write operation) func (c *Cache) Delete(key string) { c.mu.Lock() defer c.mu.Unlock() delete(c.data, key) } // Keys returns all keys (read operation) func (c *Cache) Keys() []string { c.mu.RLock() defer c.mu.RUnlock() keys := make([]string, 0, len(c.data)) for key := range c.data { keys = append(keys, key) } return keys } // Size returns the number of items (read operation) func (c *Cache) Size() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) } // Clear removes all items (write operation) func (c *Cache) Clear() { c.mu.Lock() defer c.mu.Unlock() c.data = make(map[string]interface{}) } func main() { cache := NewCache() var wg sync.WaitGroup // Writers (fewer, less frequent) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { key := fmt.Sprintf("key-%d-%d", id, j) cache.Set(key, fmt.Sprintf("value-%d-%d", id, j)) time.Sleep(10 * time.Millisecond) } }(i) } // Readers (many, frequent) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { // Try to read random keys key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50)) if value, exists := cache.Get(key); exists { fmt.Printf("Reader %d found %s: %v\n", id, key, value) } time.Sleep(5 * time.Millisecond) } }(i) } // Size checker wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { size := cache.Size() fmt.Printf("Cache size: %d\n", size) time.Sleep(100 * time.Millisecond) } }() wg.Wait() fmt.Printf("Final cache size: %d\n", cache.Size()) } Advanced Mutex Patterns 1. Conditional Variables Pattern package main import ( "fmt" "sync" "time" ) // Buffer demonstrates conditional variables with mutex type Buffer struct { mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond items []interface{} capacity int } // NewBuffer creates a new bounded buffer func NewBuffer(capacity int) *Buffer { b := &Buffer{ items: make([]interface{}, 0, capacity), capacity: capacity, } b.notEmpty = sync.NewCond(&b.mu) b.notFull = sync.NewCond(&b.mu) return b } // Put adds an item to the buffer (blocks if full) func (b *Buffer) Put(item interface{}) { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is full for len(b.items) == b.capacity { b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not empty b.notEmpty.Signal() } // Get removes an item from the buffer (blocks if empty) func (b *Buffer) Get() interface{} { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is empty for len(b.items) == 0 { b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not full b.notFull.Signal() return item } // Size returns current buffer size func (b *Buffer) Size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.items) } func main() { buffer := NewBuffer(3) var wg sync.WaitGroup // Producers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := fmt.Sprintf("item-%d-%d", id, j) buffer.Put(item) time.Sleep(200 * time.Millisecond) } }(i) } // Consumers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := buffer.Get() fmt.Printf("Consumer %d processed: %v\n", id, item) time.Sleep(300 * time.Millisecond) } }(i) } wg.Wait() } 2. Lock-Free Patterns with Atomic Operations package main import ( "fmt" "sync" "sync/atomic" "time" ) // AtomicCounter demonstrates lock-free counter type AtomicCounter struct { value int64 } // Increment atomically increments the counter func (ac *AtomicCounter) Increment() int64 { return atomic.AddInt64(&ac.value, 1) } // Decrement atomically decrements the counter func (ac *AtomicCounter) Decrement() int64 { return atomic.AddInt64(&ac.value, -1) } // Value atomically reads the current value func (ac *AtomicCounter) Value() int64 { return atomic.LoadInt64(&ac.value) } // CompareAndSwap atomically compares and swaps func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&ac.value, old, new) } // AtomicFlag demonstrates atomic boolean operations type AtomicFlag struct { flag int32 } // Set atomically sets the flag to true func (af *AtomicFlag) Set() { atomic.StoreInt32(&af.flag, 1) } // Clear atomically sets the flag to false func (af *AtomicFlag) Clear() { atomic.StoreInt32(&af.flag, 0) } // IsSet atomically checks if flag is set func (af *AtomicFlag) IsSet() bool { return atomic.LoadInt32(&af.flag) == 1 } // TestAndSet atomically tests and sets the flag func (af *AtomicFlag) TestAndSet() bool { return atomic.SwapInt32(&af.flag, 1) == 1 } func main() { counter := &AtomicCounter{} flag := &AtomicFlag{} var wg sync.WaitGroup // Test atomic counter fmt.Println("Testing atomic counter...") for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Test atomic flag fmt.Println("\nTesting atomic flag...") // Multiple goroutines trying to set flag for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() if !flag.TestAndSet() { fmt.Printf("Goroutine %d acquired the flag\n", id) time.Sleep(100 * time.Millisecond) flag.Clear() fmt.Printf("Goroutine %d released the flag\n", id) } else { fmt.Printf("Goroutine %d failed to acquire flag\n", id) } }(i) } wg.Wait() } 3. Resource Pool Pattern package main import ( "errors" "fmt" "sync" "time" ) // Resource represents a limited resource type Resource struct { ID int Data string } // ResourcePool manages a pool of limited resources type ResourcePool struct { mu sync.Mutex resources []*Resource available chan *Resource maxSize int created int } // NewResourcePool creates a new resource pool func NewResourcePool(maxSize int) *ResourcePool { return &ResourcePool{ resources: make([]*Resource, 0, maxSize), available: make(chan *Resource, maxSize), maxSize: maxSize, } } // createResource creates a new resource func (rp *ResourcePool) createResource() *Resource { rp.mu.Lock() defer rp.mu.Unlock() if rp.created >= rp.maxSize { return nil } rp.created++ resource := &Resource{ ID: rp.created, Data: fmt.Sprintf("Resource-%d", rp.created), } fmt.Printf("Created resource %d\n", resource.ID) return resource } // Acquire gets a resource from the pool func (rp *ResourcePool) Acquire() (*Resource, error) { select { case resource := <-rp.available: fmt.Printf("Acquired existing resource %d\n", resource.ID) return resource, nil default: // No available resource, try to create one if resource := rp.createResource(); resource != nil { return resource, nil } // Pool is full, wait for available resource fmt.Println("Pool full, waiting for available resource...") select { case resource := <-rp.available: fmt.Printf("Acquired resource %d after waiting\n", resource.ID) return resource, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout waiting for resource") } } } // Release returns a resource to the pool func (rp *ResourcePool) Release(resource *Resource) { select { case rp.available <- resource: fmt.Printf("Released resource %d\n", resource.ID) default: // Channel full, resource will be garbage collected fmt.Printf("Pool full, discarding resource %d\n", resource.ID) } } // Size returns current pool statistics func (rp *ResourcePool) Size() (available, created int) { rp.mu.Lock() defer rp.mu.Unlock() return len(rp.available), rp.created } func main() { pool := NewResourcePool(3) var wg sync.WaitGroup // Multiple goroutines using resources for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := pool.Acquire() if err != nil { fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err) return } fmt.Printf("Worker %d using resource %d\n", id, resource.ID) // Simulate work time.Sleep(time.Duration(200+id*100) * time.Millisecond) pool.Release(resource) fmt.Printf("Worker %d finished\n", id) }(i) } wg.Wait() available, created := pool.Size() fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created) } Best Practices Always Use defer: Ensure locks are released even if panic occurs Keep Critical Sections Small: Minimize time holding locks Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies Use RWMutex for Read-Heavy: Better performance for read-heavy workloads Consider Lock-Free: Use atomic operations when possible Document Lock Order: If multiple locks needed, establish clear ordering Prefer Channels: Use channels for communication, locks for shared state Common Pitfalls 1. Deadlocks // Bad: Potential deadlock with nested locks type BadAccount struct { mu sync.Mutex balance int } func (a *BadAccount) Transfer(to *BadAccount, amount int) { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() // Potential deadlock if called concurrently defer to.mu.Unlock() a.balance -= amount to.balance += amount } // Good: Ordered locking to prevent deadlock func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) { // Always acquire locks in consistent order first, second := a, to if a.ID > to.ID { first, second = to, a } first.mu.Lock() defer first.mu.Unlock() second.mu.Lock() defer second.mu.Unlock() a.balance -= amount to.balance += amount } 2. Race Conditions // Bad: Race condition type BadCounter struct { mu sync.Mutex value int } func (c *BadCounter) IncrementIfEven() { if c.value%2 == 0 { // Race: value might change between check and increment c.mu.Lock() c.value++ c.mu.Unlock() } } // Good: Atomic check and update func (c *BadCounter) SafeIncrementIfEven() { c.mu.Lock() defer c.mu.Unlock() if c.value%2 == 0 { c.value++ } } Testing Concurrent Code package main import ( "sync" "testing" ) func TestCounter(t *testing.T) { counter := &Counter{} var wg sync.WaitGroup goroutines := 100 increments := 1000 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < increments; j++ { counter.Increment() } }() } wg.Wait() expected := goroutines * increments if counter.Value() != expected { t.Errorf("Expected %d, got %d", expected, counter.Value()) } } // Run with: go test -race func TestCounterRace(t *testing.T) { counter := &Counter{} go func() { for i := 0; i < 1000; i++ { counter.Increment() } }() go func() { for i := 0; i < 1000; i++ { _ = counter.Value() } }() } Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios. ...

    July 3, 2024 · 10 min · Rafiul Alam

    Distributed Tracing in Go

    Go Concurrency Patterns Series: ← Go Generics Patterns | Series Overview What is Distributed Tracing? Distributed tracing tracks requests as they flow through microservices, providing visibility into performance bottlenecks, service dependencies, and error propagation in distributed systems. Key Concepts: Trace: End-to-end journey of a request across services Span: Single unit of work within a trace Context Propagation: Carrying trace information across boundaries Sampling: Controlling which traces to collect Why OpenTelemetry? OpenTelemetry (OTel) is the industry standard for observability: ...

    June 29, 2024 · 10 min · Rafiul Alam

    Go Generics Design Patterns

    Go Concurrency Patterns Series: ← Graceful Shutdown | Series Overview | Distributed Tracing → What Are Go Generics? Go 1.18 introduced generics (type parameters), enabling type-safe, reusable code without sacrificing performance. This opens up new possibilities for implementing classic design patterns with compile-time type safety. Key Features: Type Parameters: Functions and types that work with any type Constraints: Restrict type parameters to specific interfaces Type Inference: Compiler deduces type arguments automatically Zero Runtime Cost: No boxing/unboxing like interface{} Real-World Use Cases Collections: Type-safe lists, maps, sets without reflection Algorithms: Generic sort, filter, map operations Data Structures: Stacks, queues, trees with any element type Caching: Generic cache implementations Functional Patterns: Map, filter, reduce with type safety Concurrent Patterns: Type-safe worker pools and pipelines Generic Data Structures Generic Stack package main import ( "fmt" "sync" ) // Stack is a generic LIFO data structure type Stack[T any] struct { items []T mu sync.RWMutex } func NewStack[T any]() *Stack[T] { return &Stack[T]{ items: make([]T, 0), } } func (s *Stack[T]) Push(item T) { s.mu.Lock() defer s.mu.Unlock() s.items = append(s.items, item) } func (s *Stack[T]) Pop() (T, bool) { s.mu.Lock() defer s.mu.Unlock() if len(s.items) == 0 { var zero T return zero, false } item := s.items[len(s.items)-1] s.items = s.items[:len(s.items)-1] return item, true } func (s *Stack[T]) Peek() (T, bool) { s.mu.RLock() defer s.mu.RUnlock() if len(s.items) == 0 { var zero T return zero, false } return s.items[len(s.items)-1], true } func (s *Stack[T]) Size() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.items) } func main() { // Integer stack intStack := NewStack[int]() intStack.Push(1) intStack.Push(2) intStack.Push(3) if val, ok := intStack.Pop(); ok { fmt.Printf("Popped: %d\n", val) // 3 } // String stack strStack := NewStack[string]() strStack.Push("hello") strStack.Push("world") if val, ok := strStack.Peek(); ok { fmt.Printf("Peek: %s\n", val) // world } } Generic Queue package main import ( "fmt" "sync" ) // Queue is a generic FIFO data structure type Queue[T any] struct { items []T mu sync.RWMutex } func NewQueue[T any]() *Queue[T] { return &Queue[T]{ items: make([]T, 0), } } func (q *Queue[T]) Enqueue(item T) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) } func (q *Queue[T]) Dequeue() (T, bool) { q.mu.Lock() defer q.mu.Unlock() if len(q.items) == 0 { var zero T return zero, false } item := q.items[0] q.items = q.items[1:] return item, true } func (q *Queue[T]) IsEmpty() bool { q.mu.RLock() defer q.mu.RUnlock() return len(q.items) == 0 } func main() { queue := NewQueue[string]() queue.Enqueue("first") queue.Enqueue("second") if item, ok := queue.Dequeue(); ok { fmt.Println(item) // first } } Generic Set package main import ( "fmt" "sync" ) // Set is a generic collection of unique elements type Set[T comparable] struct { items map[T]struct{} mu sync.RWMutex } func NewSet[T comparable]() *Set[T] { return &Set[T]{ items: make(map[T]struct{}), } } func (s *Set[T]) Add(item T) { s.mu.Lock() defer s.mu.Unlock() s.items[item] = struct{}{} } func (s *Set[T]) Remove(item T) { s.mu.Lock() defer s.mu.Unlock() delete(s.items, item) } func (s *Set[T]) Contains(item T) bool { s.mu.RLock() defer s.mu.RUnlock() _, exists := s.items[item] return exists } func (s *Set[T]) Size() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.items) } func (s *Set[T]) Items() []T { s.mu.RLock() defer s.mu.RUnlock() items := make([]T, 0, len(s.items)) for item := range s.items { items = append(items, item) } return items } // Union returns a new set with elements from both sets func (s *Set[T]) Union(other *Set[T]) *Set[T] { result := NewSet[T]() s.mu.RLock() for item := range s.items { result.Add(item) } s.mu.RUnlock() other.mu.RLock() for item := range other.items { result.Add(item) } other.mu.RUnlock() return result } // Intersection returns a new set with common elements func (s *Set[T]) Intersection(other *Set[T]) *Set[T] { result := NewSet[T]() s.mu.RLock() defer s.mu.RUnlock() other.mu.RLock() defer other.mu.RUnlock() for item := range s.items { if _, exists := other.items[item]; exists { result.Add(item) } } return result } func main() { set1 := NewSet[int]() set1.Add(1) set1.Add(2) set1.Add(3) set2 := NewSet[int]() set2.Add(2) set2.Add(3) set2.Add(4) union := set1.Union(set2) fmt.Println("Union:", union.Items()) // [1 2 3 4] intersection := set1.Intersection(set2) fmt.Println("Intersection:", intersection.Items()) // [2 3] } Generic Cache Pattern package main import ( "fmt" "sync" "time" ) // CacheItem holds cached value with expiration type CacheItem[V any] struct { Value V Expiration time.Time } // Cache is a generic thread-safe cache with expiration type Cache[K comparable, V any] struct { items map[K]CacheItem[V] mu sync.RWMutex ttl time.Duration } func NewCache[K comparable, V any](ttl time.Duration) *Cache[K, V] { cache := &Cache[K, V]{ items: make(map[K]CacheItem[V]), ttl: ttl, } // Start cleanup goroutine go cache.cleanup() return cache } func (c *Cache[K, V]) Set(key K, value V) { c.mu.Lock() defer c.mu.Unlock() c.items[key] = CacheItem[V]{ Value: value, Expiration: time.Now().Add(c.ttl), } } func (c *Cache[K, V]) Get(key K) (V, bool) { c.mu.RLock() defer c.mu.RUnlock() item, exists := c.items[key] if !exists { var zero V return zero, false } // Check expiration if time.Now().After(item.Expiration) { var zero V return zero, false } return item.Value, true } func (c *Cache[K, V]) Delete(key K) { c.mu.Lock() defer c.mu.Unlock() delete(c.items, key) } func (c *Cache[K, V]) cleanup() { ticker := time.NewTicker(c.ttl) defer ticker.Stop() for range ticker.C { c.mu.Lock() now := time.Now() for key, item := range c.items { if now.After(item.Expiration) { delete(c.items, key) } } c.mu.Unlock() } } func main() { // String -> User cache type User struct { ID int Name string } cache := NewCache[string, User](5 * time.Second) cache.Set("user1", User{ID: 1, Name: "Alice"}) cache.Set("user2", User{ID: 2, Name: "Bob"}) if user, ok := cache.Get("user1"); ok { fmt.Printf("Found: %+v\n", user) } // Wait for expiration time.Sleep(6 * time.Second) if _, ok := cache.Get("user1"); !ok { fmt.Println("Cache expired") } } Generic Repository Pattern package main import ( "errors" "fmt" "sync" ) // Entity is a constraint for types with an ID type Entity interface { GetID() string } // User implements Entity type User struct { ID string Name string Email string } func (u User) GetID() string { return u.ID } // Product implements Entity type Product struct { ID string Name string Price float64 } func (p Product) GetID() string { return p.ID } // Repository is a generic CRUD interface type Repository[T Entity] interface { Create(item T) error Read(id string) (T, error) Update(item T) error Delete(id string) error List() []T } // InMemoryRepository is a generic in-memory implementation type InMemoryRepository[T Entity] struct { items map[string]T mu sync.RWMutex } func NewInMemoryRepository[T Entity]() *InMemoryRepository[T] { return &InMemoryRepository[T]{ items: make(map[string]T), } } func (r *InMemoryRepository[T]) Create(item T) error { r.mu.Lock() defer r.mu.Unlock() id := item.GetID() if _, exists := r.items[id]; exists { return errors.New("item already exists") } r.items[id] = item return nil } func (r *InMemoryRepository[T]) Read(id string) (T, error) { r.mu.RLock() defer r.mu.RUnlock() item, exists := r.items[id] if !exists { var zero T return zero, errors.New("item not found") } return item, nil } func (r *InMemoryRepository[T]) Update(item T) error { r.mu.Lock() defer r.mu.Unlock() id := item.GetID() if _, exists := r.items[id]; !exists { return errors.New("item not found") } r.items[id] = item return nil } func (r *InMemoryRepository[T]) Delete(id string) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.items[id]; !exists { return errors.New("item not found") } delete(r.items, id) return nil } func (r *InMemoryRepository[T]) List() []T { r.mu.RLock() defer r.mu.RUnlock() items := make([]T, 0, len(r.items)) for _, item := range r.items { items = append(items, item) } return items } func main() { // User repository userRepo := NewInMemoryRepository[User]() userRepo.Create(User{ID: "1", Name: "Alice", Email: "[email protected]"}) userRepo.Create(User{ID: "2", Name: "Bob", Email: "[email protected]"}) if user, err := userRepo.Read("1"); err == nil { fmt.Printf("User: %+v\n", user) } // Product repository productRepo := NewInMemoryRepository[Product]() productRepo.Create(Product{ID: "p1", Name: "Laptop", Price: 999.99}) productRepo.Create(Product{ID: "p2", Name: "Mouse", Price: 29.99}) products := productRepo.List() fmt.Printf("Products: %+v\n", products) } Generic Builder Pattern package main import "fmt" // Builder is a generic builder pattern type Builder[T any] struct { build func() T } func NewBuilder[T any](buildFunc func() T) *Builder[T] { return &Builder[T]{build: buildFunc} } func (b *Builder[T]) Build() T { return b.build() } // Fluent builder for complex types type HTTPRequest struct { Method string URL string Headers map[string]string Body string } type HTTPRequestBuilder struct { req HTTPRequest } func NewHTTPRequestBuilder() *HTTPRequestBuilder { return &HTTPRequestBuilder{ req: HTTPRequest{ Headers: make(map[string]string), }, } } func (b *HTTPRequestBuilder) Method(method string) *HTTPRequestBuilder { b.req.Method = method return b } func (b *HTTPRequestBuilder) URL(url string) *HTTPRequestBuilder { b.req.URL = url return b } func (b *HTTPRequestBuilder) Header(key, value string) *HTTPRequestBuilder { b.req.Headers[key] = value return b } func (b *HTTPRequestBuilder) Body(body string) *HTTPRequestBuilder { b.req.Body = body return b } func (b *HTTPRequestBuilder) Build() HTTPRequest { return b.req } // Generic fluent builder type FluentBuilder[T any] struct { value T } func NewFluentBuilder[T any](initial T) *FluentBuilder[T] { return &FluentBuilder[T]{value: initial} } func (b *FluentBuilder[T]) Apply(fn func(T) T) *FluentBuilder[T] { b.value = fn(b.value) return b } func (b *FluentBuilder[T]) Build() T { return b.value } func main() { // HTTP Request builder req := NewHTTPRequestBuilder(). Method("POST"). URL("https://api.example.com/users"). Header("Content-Type", "application/json"). Body(`{"name": "Alice"}`). Build() fmt.Printf("Request: %+v\n", req) // Generic fluent builder result := NewFluentBuilder(0). Apply(func(n int) int { return n + 10 }). Apply(func(n int) int { return n * 2 }). Apply(func(n int) int { return n - 5 }). Build() fmt.Printf("Result: %d\n", result) // 15 } Generic Option Pattern package main import ( "fmt" "time" ) // Option is a generic option function type Option[T any] func(*T) // Server configuration type ServerConfig struct { Host string Port int Timeout time.Duration MaxConns int ReadTimeout time.Duration WriteTimeout time.Duration } // Option functions func WithHost[T interface{ Host string }](host string) Option[T] { return func(c *T) { c.Host = host } } func WithPort[T interface{ Port int }](port int) Option[T] { return func(c *T) { c.Port = port } } func WithTimeout[T interface{ Timeout time.Duration }](timeout time.Duration) Option[T] { return func(c *T) { c.Timeout = timeout } } // Generic constructor with options func NewWithOptions[T any](initial T, opts ...Option[T]) T { for _, opt := range opts { opt(&initial) } return initial } // Server-specific options func ServerWithHost(host string) Option[ServerConfig] { return func(c *ServerConfig) { c.Host = host } } func ServerWithPort(port int) Option[ServerConfig] { return func(c *ServerConfig) { c.Port = port } } func ServerWithTimeout(timeout time.Duration) Option[ServerConfig] { return func(c *ServerConfig) { c.Timeout = timeout } } func NewServer(opts ...Option[ServerConfig]) ServerConfig { config := ServerConfig{ Host: "localhost", Port: 8080, Timeout: 30 * time.Second, MaxConns: 100, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } for _, opt := range opts { opt(&config) } return config } func main() { server := NewServer( ServerWithHost("0.0.0.0"), ServerWithPort(9000), ServerWithTimeout(60*time.Second), ) fmt.Printf("Server config: %+v\n", server) } Generic Result Type package main import ( "errors" "fmt" ) // Result represents either a value or an error type Result[T any] struct { value T err error } func Ok[T any](value T) Result[T] { return Result[T]{value: value} } func Err[T any](err error) Result[T] { var zero T return Result[T]{value: zero, err: err} } func (r Result[T]) IsOk() bool { return r.err == nil } func (r Result[T]) IsErr() bool { return r.err != nil } func (r Result[T]) Unwrap() (T, error) { return r.value, r.err } func (r Result[T]) UnwrapOr(defaultValue T) T { if r.IsErr() { return defaultValue } return r.value } // Map transforms the value if Ok func (r Result[T]) Map(fn func(T) T) Result[T] { if r.IsErr() { return r } return Ok(fn(r.value)) } // FlatMap chains operations func FlatMap[T any, U any](r Result[T], fn func(T) Result[U]) Result[U] { if r.IsErr() { return Err[U](r.err) } return fn(r.value) } // Example usage func divide(a, b int) Result[int] { if b == 0 { return Err[int](errors.New("division by zero")) } return Ok(a / b) } func main() { // Success case result1 := divide(10, 2) if value, err := result1.Unwrap(); err == nil { fmt.Printf("Result: %d\n", value) // 5 } // Error case result2 := divide(10, 0) value := result2.UnwrapOr(-1) fmt.Printf("Result with default: %d\n", value) // -1 // Chaining operations result3 := divide(20, 2). Map(func(n int) int { return n * 2 }). Map(func(n int) int { return n + 5 }) if value, err := result3.Unwrap(); err == nil { fmt.Printf("Chained result: %d\n", value) // 25 } } Generic Pipeline Pattern package main import ( "fmt" ) // Pipeline represents a chain of transformations type Pipeline[T any] struct { stages []func(T) T } func NewPipeline[T any]() *Pipeline[T] { return &Pipeline[T]{ stages: make([]func(T) T, 0), } } func (p *Pipeline[T]) Add(stage func(T) T) *Pipeline[T] { p.stages = append(p.stages, stage) return p } func (p *Pipeline[T]) Execute(input T) T { result := input for _, stage := range p.stages { result = stage(result) } return result } // Generic filter, map, reduce func Filter[T any](items []T, predicate func(T) bool) []T { result := make([]T, 0) for _, item := range items { if predicate(item) { result = append(result, item) } } return result } func Map[T any, U any](items []T, mapper func(T) U) []U { result := make([]U, len(items)) for i, item := range items { result[i] = mapper(item) } return result } func Reduce[T any, U any](items []T, initial U, reducer func(U, T) U) U { result := initial for _, item := range items { result = reducer(result, item) } return result } func main() { // Pipeline example pipeline := NewPipeline[int](). Add(func(n int) int { return n * 2 }). Add(func(n int) int { return n + 10 }). Add(func(n int) int { return n / 2 }) result := pipeline.Execute(5) fmt.Printf("Pipeline result: %d\n", result) // 10 // Filter, map, reduce numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} // Filter even numbers evens := Filter(numbers, func(n int) bool { return n%2 == 0 }) fmt.Printf("Evens: %v\n", evens) // Map to squares squares := Map(evens, func(n int) int { return n * n }) fmt.Printf("Squares: %v\n", squares) // Reduce to sum sum := Reduce(squares, 0, func(acc, n int) int { return acc + n }) fmt.Printf("Sum: %d\n", sum) } Generic Worker Pool package main import ( "context" "fmt" "sync" ) // Task represents a generic task type Task[T any, R any] struct { Input T Result chan R } // WorkerPool is a generic worker pool type WorkerPool[T any, R any] struct { workers int tasks chan Task[T, R] process func(T) R wg sync.WaitGroup } func NewWorkerPool[T any, R any](workers int, process func(T) R) *WorkerPool[T, R] { return &WorkerPool[T, R]{ workers: workers, tasks: make(chan Task[T, R], workers*2), process: process, } } func (wp *WorkerPool[T, R]) Start(ctx context.Context) { for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go wp.worker(ctx) } } func (wp *WorkerPool[T, R]) worker(ctx context.Context) { defer wp.wg.Done() for { select { case task, ok := <-wp.tasks: if !ok { return } result := wp.process(task.Input) task.Result <- result close(task.Result) case <-ctx.Done(): return } } } func (wp *WorkerPool[T, R]) Submit(input T) <-chan R { resultChan := make(chan R, 1) task := Task[T, R]{ Input: input, Result: resultChan, } wp.tasks <- task return resultChan } func (wp *WorkerPool[T, R]) Shutdown() { close(wp.tasks) wp.wg.Wait() } func main() { // Integer -> String worker pool pool := NewWorkerPool(3, func(n int) string { return fmt.Sprintf("Result: %d", n*n) }) ctx := context.Background() pool.Start(ctx) // Submit tasks results := make([]<-chan string, 0) for i := 1; i <= 10; i++ { results = append(results, pool.Submit(i)) } // Collect results for i, resultChan := range results { result := <-resultChan fmt.Printf("Task %d: %s\n", i+1, result) } pool.Shutdown() } Best Practices 1. Use Constraints Wisely // Too restrictive func Sum[T int | int64](values []T) T { ... } // Better: Use constraints package import "golang.org/x/exp/constraints" func Sum[T constraints.Ordered](values []T) T { ... } 2. Prefer Type Inference // Explicit type arguments result := Map[int, string](numbers, toString) // Better: Let compiler infer result := Map(numbers, toString) 3. Keep It Simple // Overly complex func Process[T any, U any, V any](a T, fn1 func(T) U, fn2 func(U) V) V { ... } // Better: Multiple simpler functions func Step1[T, U any](a T, fn func(T) U) U { ... } func Step2[U, V any](u U, fn func(U) V) V { ... } Performance Considerations Compile Time: Generics increase compile time slightly Runtime: Zero runtime cost - monomorphization at compile time Code Size: Can increase binary size due to type specialization Type Inference: Reduces verbosity but can slow compilation Conclusion Go generics enable type-safe, reusable patterns while maintaining Go’s simplicity and performance. ...

    June 28, 2024 · 13 min · Rafiul Alam

    Graceful Shutdown Patterns in Go

    Go Concurrency Patterns Series: ← Go Memory Model | Series Overview | Generics Patterns → What is Graceful Shutdown? Graceful shutdown is the process of cleanly stopping a running application by: Receiving shutdown signals (SIGTERM, SIGINT) Stopping acceptance of new requests Finishing in-flight requests Closing database connections and other resources Flushing logs and metrics Exiting with appropriate status code Why It Matters: Zero-downtime deployments: No dropped requests during rollouts Data integrity: Complete ongoing transactions Resource cleanup: Prevent leaks and corruption Observability: Flush pending logs and metrics Container orchestration: Proper Kubernetes pod termination Real-World Use Cases HTTP/gRPC servers: Drain active connections before shutdown Background workers: Complete current jobs, reject new ones Message consumers: Finish processing messages, commit offsets Database connections: Close pools cleanly Caching layers: Persist in-memory state Kubernetes deployments: Respect termination grace period Basic Signal Handling Simple Shutdown Handler package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" ) func main() { // Create signal channel sigChan := make(chan os.Signal, 1) // Register for SIGINT and SIGTERM signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Simulate application work done := make(chan bool) go func() { fmt.Println("Application running...") time.Sleep(30 * time.Second) done <- true }() // Wait for signal or completion select { case sig := <-sigChan: fmt.Printf("\nReceived signal: %v\n", sig) fmt.Println("Initiating graceful shutdown...") // Perform cleanup cleanup() fmt.Println("Shutdown complete") os.Exit(0) case <-done: fmt.Println("Application completed normally") } } func cleanup() { fmt.Println("Cleaning up resources...") time.Sleep(2 * time.Second) fmt.Println("Cleanup complete") } HTTP Server Graceful Shutdown Basic HTTP Server Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { // Create HTTP server server := &http.Server{ Addr: ":8080", Handler: setupRoutes(), } // Channel to listen for errors from the server serverErrors := make(chan error, 1) // Start HTTP server in goroutine go func() { log.Printf("Server starting on %s", server.Addr) serverErrors <- server.ListenAndServe() }() // Channel to listen for interrupt signals shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) // Block until we receive a signal or server error select { case err := <-serverErrors: log.Fatalf("Server error: %v", err) case sig := <-shutdown: log.Printf("Received signal: %v. Starting graceful shutdown...", sig) // Create context with timeout for shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Attempt graceful shutdown if err := server.Shutdown(ctx); err != nil { log.Printf("Graceful shutdown failed: %v", err) // Force close if graceful shutdown fails if err := server.Close(); err != nil { log.Fatalf("Force close failed: %v", err) } } log.Println("Server shutdown complete") } } func setupRoutes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello, World!") }) mux.HandleFunc("/slow", func(w http.ResponseWriter, r *http.Request) { // Simulate slow endpoint time.Sleep(5 * time.Second) fmt.Fprintf(w, "Slow response complete") }) return mux } Advanced HTTP Server with Multiple Resources package main import ( "context" "database/sql" "fmt" "log" "net/http" "os" "os/signal" "sync" "syscall" "time" _ "github.com/lib/pq" ) type Application struct { server *http.Server db *sql.DB logger *log.Logger shutdown chan struct{} wg sync.WaitGroup } func NewApplication() (*Application, error) { // Initialize database db, err := sql.Open("postgres", "postgres://localhost/mydb") if err != nil { return nil, fmt.Errorf("database connection failed: %w", err) } app := &Application{ db: db, logger: log.New(os.Stdout, "APP: ", log.LstdFlags), shutdown: make(chan struct{}), } // Setup HTTP server app.server = &http.Server{ Addr: ":8080", Handler: app.routes(), ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } return app, nil } func (app *Application) routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/health", app.healthHandler) mux.HandleFunc("/api/data", app.dataHandler) return mux } func (app *Application) healthHandler(w http.ResponseWriter, r *http.Request) { select { case <-app.shutdown: // Signal shutdown in progress w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "Shutting down") default: w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "OK") } } func (app *Application) dataHandler(w http.ResponseWriter, r *http.Request) { // Check if shutdown initiated select { case <-app.shutdown: http.Error(w, "Service shutting down", http.StatusServiceUnavailable) return default: } // Simulate database query ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() var count int err := app.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM items").Scan(&count) if err != nil { http.Error(w, "Database error", http.StatusInternalServerError) return } fmt.Fprintf(w, "Count: %d", count) } func (app *Application) Run() error { // Start HTTP server app.wg.Add(1) go func() { defer app.wg.Done() app.logger.Printf("Starting server on %s", app.server.Addr) if err := app.server.ListenAndServe(); err != http.ErrServerClosed { app.logger.Printf("Server error: %v", err) } }() // Start background worker app.wg.Add(1) go func() { defer app.wg.Done() app.backgroundWorker() }() return nil } func (app *Application) backgroundWorker() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: app.logger.Println("Background job executing...") // Do work case <-app.shutdown: app.logger.Println("Background worker shutting down...") return } } } func (app *Application) Shutdown(ctx context.Context) error { app.logger.Println("Starting graceful shutdown...") // Signal all components to stop close(app.shutdown) // Shutdown HTTP server app.logger.Println("Shutting down HTTP server...") if err := app.server.Shutdown(ctx); err != nil { return fmt.Errorf("HTTP server shutdown failed: %w", err) } // Wait for background workers to finish app.logger.Println("Waiting for background workers...") done := make(chan struct{}) go func() { app.wg.Wait() close(done) }() select { case <-done: app.logger.Println("All workers stopped") case <-ctx.Done(): return fmt.Errorf("shutdown timeout exceeded") } // Close database connections app.logger.Println("Closing database connections...") if err := app.db.Close(); err != nil { return fmt.Errorf("database close failed: %w", err) } app.logger.Println("Graceful shutdown complete") return nil } func main() { app, err := NewApplication() if err != nil { log.Fatalf("Application initialization failed: %v", err) } if err := app.Run(); err != nil { log.Fatalf("Application run failed: %v", err) } // Wait for shutdown signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) // Graceful shutdown with 30 second timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Application stopped") } Worker Pool Graceful Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Job struct { ID int Data string } type WorkerPool struct { jobs chan Job results chan int numWorkers int wg sync.WaitGroup shutdown chan struct{} } func NewWorkerPool(numWorkers, queueSize int) *WorkerPool { return &WorkerPool{ jobs: make(chan Job, queueSize), results: make(chan int, queueSize), numWorkers: numWorkers, shutdown: make(chan struct{}), } } func (wp *WorkerPool) Start() { for i := 0; i < wp.numWorkers; i++ { wp.wg.Add(1) go wp.worker(i) } log.Printf("Started %d workers", wp.numWorkers) } func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() log.Printf("Worker %d started", id) for { select { case job, ok := <-wp.jobs: if !ok { log.Printf("Worker %d: job channel closed, exiting", id) return } // Process job log.Printf("Worker %d processing job %d", id, job.ID) result := wp.processJob(job) wp.results <- result case <-wp.shutdown: // Drain remaining jobs before shutdown log.Printf("Worker %d: shutdown signal received, draining jobs...", id) for job := range wp.jobs { log.Printf("Worker %d processing remaining job %d", id, job.ID) result := wp.processJob(job) wp.results <- result } log.Printf("Worker %d: shutdown complete", id) return } } } func (wp *WorkerPool) processJob(job Job) int { // Simulate work time.Sleep(1 * time.Second) return job.ID * 2 } func (wp *WorkerPool) Submit(job Job) bool { select { case <-wp.shutdown: return false // Pool is shutting down case wp.jobs <- job: return true } } func (wp *WorkerPool) Shutdown(ctx context.Context) error { log.Println("WorkerPool: initiating shutdown...") // Signal workers to start draining close(wp.shutdown) // Close job channel to signal no more jobs close(wp.jobs) // Wait for workers with timeout done := make(chan struct{}) go func() { wp.wg.Wait() close(done) close(wp.results) }() select { case <-done: log.Println("WorkerPool: all workers completed") return nil case <-ctx.Done(): return fmt.Errorf("shutdown timeout: %w", ctx.Err()) } } func main() { pool := NewWorkerPool(3, 10) pool.Start() // Submit jobs go func() { for i := 1; i <= 20; i++ { job := Job{ID: i, Data: fmt.Sprintf("Job %d", i)} if !pool.Submit(job) { log.Printf("Failed to submit job %d (pool shutting down)", i) return } time.Sleep(200 * time.Millisecond) } }() // Collect results go func() { for result := range pool.results { log.Printf("Result: %d", result) } log.Println("All results collected") }() // Wait a bit then shutdown time.Sleep(5 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := pool.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Kubernetes-Aware Graceful Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) type KubernetesServer struct { server *http.Server shutdownDelay time.Duration terminationPeriod time.Duration } func NewKubernetesServer() *KubernetesServer { ks := &KubernetesServer{ // Delay before starting shutdown to allow load balancer de-registration shutdownDelay: 5 * time.Second, // Total Kubernetes termination grace period terminationPeriod: 30 * time.Second, } mux := http.NewServeMux() mux.HandleFunc("/health", ks.healthHandler) mux.HandleFunc("/readiness", ks.readinessHandler) mux.HandleFunc("/", ks.requestHandler) ks.server = &http.Server{ Addr: ":8080", Handler: mux, } return ks } var ( isHealthy = true isReady = true ) func (ks *KubernetesServer) healthHandler(w http.ResponseWriter, r *http.Request) { if isHealthy { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "healthy") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "unhealthy") } } func (ks *KubernetesServer) readinessHandler(w http.ResponseWriter, r *http.Request) { if isReady { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "ready") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "not ready") } } func (ks *KubernetesServer) requestHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Request processed at %s", time.Now().Format(time.RFC3339)) } func (ks *KubernetesServer) Run() error { go func() { log.Printf("Server starting on %s", ks.server.Addr) if err := ks.server.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } }() return nil } func (ks *KubernetesServer) GracefulShutdown() error { // Step 1: Mark as not ready (stop receiving new traffic from load balancer) log.Println("Step 1: Marking pod as not ready...") isReady = false // Step 2: Wait for load balancer to de-register log.Printf("Step 2: Waiting %v for load balancer de-registration...", ks.shutdownDelay) time.Sleep(ks.shutdownDelay) // Step 3: Stop accepting new connections and drain existing ones log.Println("Step 3: Shutting down HTTP server...") // Calculate remaining time for shutdown shutdownTimeout := ks.terminationPeriod - ks.shutdownDelay - (2 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := ks.server.Shutdown(ctx); err != nil { log.Printf("Server shutdown error: %v", err) return err } log.Println("Step 4: Graceful shutdown complete") return nil } func main() { server := NewKubernetesServer() if err := server.Run(); err != nil { log.Fatalf("Server run failed: %v", err) } // Wait for termination signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) if err := server.GracefulShutdown(); err != nil { log.Fatalf("Graceful shutdown failed: %v", err) os.Exit(1) } log.Println("Server stopped successfully") } Database Connection Cleanup package main import ( "context" "database/sql" "log" "time" _ "github.com/lib/pq" ) type DatabaseManager struct { db *sql.DB } func NewDatabaseManager(connString string) (*DatabaseManager, error) { db, err := sql.Open("postgres", connString) if err != nil { return nil, err } // Configure connection pool db.SetMaxOpenConns(25) db.SetMaxIdleConns(5) db.SetConnMaxLifetime(5 * time.Minute) db.SetConnMaxIdleTime(10 * time.Minute) return &DatabaseManager{db: db}, nil } func (dm *DatabaseManager) Shutdown(ctx context.Context) error { log.Println("Closing database connections...") // Wait for active queries to complete or timeout done := make(chan error, 1) go func() { // Close will wait for all connections to be returned to pool done <- dm.db.Close() }() select { case err := <-done: if err != nil { return err } log.Println("Database connections closed successfully") return nil case <-ctx.Done(): log.Println("Database close timeout exceeded") return ctx.Err() } } // QueryWithShutdown performs query with shutdown awareness func (dm *DatabaseManager) QueryWithShutdown(ctx context.Context, query string) error { // Check if context is already cancelled (shutdown initiated) if ctx.Err() != nil { return ctx.Err() } rows, err := dm.db.QueryContext(ctx, query) if err != nil { return err } defer rows.Close() // Process rows for rows.Next() { // Check for shutdown during processing if ctx.Err() != nil { return ctx.Err() } // Process row... } return rows.Err() } Message Queue Consumer Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Message struct { ID string Payload string } type Consumer struct { messages chan Message shutdown chan struct{} wg sync.WaitGroup } func NewConsumer() *Consumer { return &Consumer{ messages: make(chan Message, 100), shutdown: make(chan struct{}), } } func (c *Consumer) Start() { c.wg.Add(1) go c.consume() log.Println("Consumer started") } func (c *Consumer) consume() { defer c.wg.Done() for { select { case msg := <-c.messages: // Process message if err := c.processMessage(msg); err != nil { log.Printf("Error processing message %s: %v", msg.ID, err) // In production: send to dead letter queue } case <-c.shutdown: log.Println("Consumer: shutdown initiated, processing remaining messages...") // Drain remaining messages for msg := range c.messages { log.Printf("Consumer: processing remaining message %s", msg.ID) if err := c.processMessage(msg); err != nil { log.Printf("Error processing remaining message %s: %v", msg.ID, err) } } log.Println("Consumer: all messages processed") return } } } func (c *Consumer) processMessage(msg Message) error { log.Printf("Processing message: %s", msg.ID) // Simulate processing time.Sleep(500 * time.Millisecond) // Acknowledge message log.Printf("Message %s processed successfully", msg.ID) return nil } func (c *Consumer) Shutdown(ctx context.Context) error { log.Println("Consumer: initiating shutdown...") // Stop accepting new messages close(c.shutdown) close(c.messages) // Wait for processing to complete done := make(chan struct{}) go func() { c.wg.Wait() close(done) }() select { case <-done: log.Println("Consumer: shutdown complete") return nil case <-ctx.Done(): return fmt.Errorf("consumer shutdown timeout: %w", ctx.Err()) } } func main() { consumer := NewConsumer() consumer.Start() // Simulate receiving messages go func() { for i := 1; i <= 10; i++ { msg := Message{ ID: fmt.Sprintf("msg-%d", i), Payload: fmt.Sprintf("Payload %d", i), } consumer.messages <- msg time.Sleep(300 * time.Millisecond) } }() // Wait then shutdown time.Sleep(3 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := consumer.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Best Practices 1. Use Context for Timeouts // Set realistic shutdown timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { // Handle timeout } 2. Implement Shutdown Sequence func (app *Application) Shutdown(ctx context.Context) error { // 1. Stop health checks (remove from load balancer) app.setUnhealthy() // 2. Wait for de-registration time.Sleep(5 * time.Second) // 3. Stop accepting new requests app.server.Shutdown(ctx) // 4. Finish in-flight requests app.wg.Wait() // 5. Close resources app.db.Close() return nil } 3. Test Graceful Shutdown func TestGracefulShutdown(t *testing.T) { app := NewApplication() app.Run() // Start long-running request go func() { resp, err := http.Get("http://localhost:8080/slow") if err != nil { t.Errorf("Request failed: %v", err) } defer resp.Body.Close() }() // Wait for request to start time.Sleep(100 * time.Millisecond) // Initiate shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { t.Fatalf("Shutdown failed: %v", err) } } Kubernetes Configuration Pod Termination Lifecycle apiVersion: apps/v1 kind: Deployment metadata: name: myapp spec: template: spec: containers: - name: myapp image: myapp:latest ports: - containerPort: 8080 # Liveness probe (restart if unhealthy) livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 # Readiness probe (remove from service if not ready) readinessProbe: httpGet: path: /readiness port: 8080 initialDelaySeconds: 5 periodSeconds: 5 # Graceful shutdown configuration lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 5"] # Termination grace period (must be > shutdown delay + max request time) terminationGracePeriodSeconds: 30 Common Pitfalls Not Handling SIGTERM: Container orchestrators send SIGTERM for graceful shutdown Insufficient Timeout: Set timeout longer than longest request duration Ignoring In-Flight Requests: Always wait for active requests to complete Not Closing Resources: Explicitly close databases, files, connections Immediate Exit: Don’t call os.Exit(0) without cleanup Performance Considerations Shutdown Delay: Balance between deployment speed and zero downtime Timeout Values: Consider 95th percentile request duration Resource Cleanup: Close in order of dependency Logging: Flush logs before exit Metrics: Report shutdown metrics for monitoring Conclusion Graceful shutdown is essential for production Go services to ensure zero-downtime deployments and data integrity. ...

    June 27, 2024 · 13 min · Rafiul Alam

    Fan-Out/Fan-In Pattern in Go

    Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern → What is the Fan-Out/Fan-In Pattern? The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently. Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination ...

    June 26, 2024 · 8 min · Rafiul Alam