Complete Guide to Go Concurrency Patterns: Visual Patterns & Code Examples

    Concurrency is one of Go’s most powerful features, built into the language from the ground up. This comprehensive guide covers all essential concurrency patterns with visual diagrams and practical code examples. Table of Contents Goroutines - Basic Concurrency Channels - Communication Select Statement - Multiplexing Worker Pool Pattern Fan-In Pattern Fan-Out Pattern Pipeline Pattern Semaphore Pattern Barrier Pattern Future/Promise Pattern Rate Limiting Pattern Circuit Breaker Pattern Context Pattern Mutex Pattern WaitGroup Pattern ErrGroup Pattern Goroutines - Basic Concurrency Goroutines are lightweight threads managed by the Go runtime. They enable concurrent execution with minimal overhead. ...

    November 18, 2025 · 17 min · Rafiul Alam

    WaitGroup Pattern in Go

    Go Concurrency Patterns Series: ← Mutex Patterns | Series Overview | Once Pattern → What is the WaitGroup Pattern? The WaitGroup pattern uses sync.WaitGroup to coordinate the completion of multiple goroutines. It acts as a counter that blocks until all registered goroutines have finished executing, making it perfect for implementing barriers and waiting for parallel tasks to complete. Key Operations: Add(n): Increment the counter by n Done(): Decrement the counter by 1 (usually called with defer) Wait(): Block until counter reaches zero Real-World Use Cases Parallel Processing: Wait for all workers to complete Batch Operations: Process multiple items concurrently Service Initialization: Wait for all services to start Data Collection: Gather results from multiple sources Cleanup Operations: Ensure all cleanup tasks finish Testing: Coordinate test goroutines Basic WaitGroup Usage package main import ( "fmt" "math/rand" "sync" "time" ) // Task represents work to be done type Task struct { ID int Name string } // processTask simulates processing a task func processTask(task Task, wg *sync.WaitGroup) { defer wg.Done() // Always call Done when goroutine finishes fmt.Printf("Starting task %d: %s\n", task.ID, task.Name) // Simulate work duration := time.Duration(rand.Intn(1000)) * time.Millisecond time.Sleep(duration) fmt.Printf("Completed task %d: %s (took %v)\n", task.ID, task.Name, duration) } func main() { tasks := []Task{ {1, "Process images"}, {2, "Send emails"}, {3, "Update database"}, {4, "Generate reports"}, {5, "Backup files"}, } var wg sync.WaitGroup fmt.Println("Starting parallel task processing...") // Start all tasks for _, task := range tasks { wg.Add(1) // Increment counter for each goroutine go processTask(task, &wg) } // Wait for all tasks to complete wg.Wait() fmt.Println("All tasks completed!") } WaitGroup with Error Handling package main import ( "fmt" "math/rand" "sync" "time" ) // Result represents the outcome of a task type Result struct { TaskID int Data interface{} Error error } // TaskProcessor handles tasks with error collection type TaskProcessor struct { wg sync.WaitGroup results chan Result errors []error mu sync.Mutex } // NewTaskProcessor creates a new task processor func NewTaskProcessor(bufferSize int) *TaskProcessor { return &TaskProcessor{ results: make(chan Result, bufferSize), } } // processTaskWithError simulates task processing that might fail func (tp *TaskProcessor) processTaskWithError(taskID int, data interface{}) { defer tp.wg.Done() fmt.Printf("Processing task %d\n", taskID) // Simulate work time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate random failures if rand.Float32() < 0.3 { err := fmt.Errorf("task %d failed", taskID) tp.results <- Result{TaskID: taskID, Error: err} // Collect error tp.mu.Lock() tp.errors = append(tp.errors, err) tp.mu.Unlock() fmt.Printf("Task %d failed\n", taskID) return } // Success result := fmt.Sprintf("Result from task %d", taskID) tp.results <- Result{TaskID: taskID, Data: result} fmt.Printf("Task %d completed successfully\n", taskID) } // ProcessTasks processes multiple tasks and collects results func (tp *TaskProcessor) ProcessTasks(taskCount int) ([]Result, []error) { // Start all tasks for i := 1; i <= taskCount; i++ { tp.wg.Add(1) go tp.processTaskWithError(i, fmt.Sprintf("data-%d", i)) } // Close results channel when all tasks complete go func() { tp.wg.Wait() close(tp.results) }() // Collect results var results []Result for result := range tp.results { results = append(results, result) } tp.mu.Lock() errors := make([]error, len(tp.errors)) copy(errors, tp.errors) tp.mu.Unlock() return results, errors } func main() { processor := NewTaskProcessor(10) fmt.Println("Starting task processing with error handling...") results, errors := processor.ProcessTasks(8) fmt.Printf("\nProcessing complete!\n") fmt.Printf("Successful tasks: %d\n", len(results)-len(errors)) fmt.Printf("Failed tasks: %d\n", len(errors)) if len(errors) > 0 { fmt.Println("\nErrors:") for _, err := range errors { fmt.Printf(" - %v\n", err) } } fmt.Println("\nResults:") for _, result := range results { if result.Error == nil { fmt.Printf(" Task %d: %v\n", result.TaskID, result.Data) } } } Nested WaitGroups for Hierarchical Tasks package main import ( "fmt" "sync" "time" ) // Department represents a department with multiple teams type Department struct { Name string Teams []Team } // Team represents a team with multiple workers type Team struct { Name string Workers []string } // processDepartment processes all teams in a department func processDepartment(dept Department, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Department %s starting work\n", dept.Name) var teamWG sync.WaitGroup // Process all teams in parallel for _, team := range dept.Teams { teamWG.Add(1) go processTeam(team, &teamWG) } // Wait for all teams to complete teamWG.Wait() fmt.Printf("Department %s completed all work\n", dept.Name) } // processTeam processes all workers in a team func processTeam(team Team, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(" Team %s starting work\n", team.Name) var workerWG sync.WaitGroup // Process all workers in parallel for _, worker := range team.Workers { workerWG.Add(1) go processWorker(worker, &workerWG) } // Wait for all workers to complete workerWG.Wait() fmt.Printf(" Team %s completed all work\n", team.Name) } // processWorker simulates worker processing func processWorker(worker string, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(" Worker %s working...\n", worker) time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) fmt.Printf(" Worker %s finished\n", worker) } func main() { departments := []Department{ { Name: "Engineering", Teams: []Team{ { Name: "Backend", Workers: []string{"Alice", "Bob", "Charlie"}, }, { Name: "Frontend", Workers: []string{"Diana", "Eve"}, }, }, }, { Name: "Marketing", Teams: []Team{ { Name: "Digital", Workers: []string{"Frank", "Grace"}, }, { Name: "Content", Workers: []string{"Henry", "Ivy", "Jack"}, }, }, }, } var deptWG sync.WaitGroup fmt.Println("Starting company-wide project...") // Process all departments in parallel for _, dept := range departments { deptWG.Add(1) go processDepartment(dept, &deptWG) } // Wait for all departments to complete deptWG.Wait() fmt.Println("Company-wide project completed!") } WaitGroup with Timeout package main import ( "context" "fmt" "sync" "time" ) // TimedTaskRunner runs tasks with timeout support type TimedTaskRunner struct { timeout time.Duration } // NewTimedTaskRunner creates a new timed task runner func NewTimedTaskRunner(timeout time.Duration) *TimedTaskRunner { return &TimedTaskRunner{timeout: timeout} } // RunWithTimeout runs tasks with a timeout func (ttr *TimedTaskRunner) RunWithTimeout(tasks []func()) error { ctx, cancel := context.WithTimeout(context.Background(), ttr.timeout) defer cancel() var wg sync.WaitGroup done := make(chan struct{}) // Start all tasks for i, task := range tasks { wg.Add(1) go func(taskID int, taskFunc func()) { defer wg.Done() fmt.Printf("Starting task %d\n", taskID) taskFunc() fmt.Printf("Completed task %d\n", taskID) }(i+1, task) } // Wait for completion in separate goroutine go func() { wg.Wait() close(done) }() // Wait for either completion or timeout select { case <-done: fmt.Println("All tasks completed successfully") return nil case <-ctx.Done(): fmt.Println("Tasks timed out") return ctx.Err() } } // simulateTask creates a task that takes a specific duration func simulateTask(duration time.Duration, name string) func() { return func() { fmt.Printf(" %s working for %v\n", name, duration) time.Sleep(duration) fmt.Printf(" %s finished\n", name) } } func main() { runner := NewTimedTaskRunner(2 * time.Second) // Test with tasks that complete within timeout fmt.Println("Test 1: Tasks that complete within timeout") tasks1 := []func(){ simulateTask(300*time.Millisecond, "Quick task 1"), simulateTask(500*time.Millisecond, "Quick task 2"), simulateTask(400*time.Millisecond, "Quick task 3"), } if err := runner.RunWithTimeout(tasks1); err != nil { fmt.Printf("Error: %v\n", err) } fmt.Println("\nTest 2: Tasks that exceed timeout") tasks2 := []func(){ simulateTask(800*time.Millisecond, "Slow task 1"), simulateTask(1500*time.Millisecond, "Slow task 2"), simulateTask(2000*time.Millisecond, "Very slow task"), } if err := runner.RunWithTimeout(tasks2); err != nil { fmt.Printf("Error: %v\n", err) } } Dynamic WaitGroup Management package main import ( "fmt" "sync" "time" ) // DynamicTaskManager manages tasks that can spawn other tasks type DynamicTaskManager struct { wg sync.WaitGroup taskChan chan func() quit chan struct{} active sync.WaitGroup } // NewDynamicTaskManager creates a new dynamic task manager func NewDynamicTaskManager() *DynamicTaskManager { return &DynamicTaskManager{ taskChan: make(chan func(), 100), quit: make(chan struct{}), } } // Start begins processing tasks func (dtm *DynamicTaskManager) Start() { go dtm.taskProcessor() } // taskProcessor processes tasks from the channel func (dtm *DynamicTaskManager) taskProcessor() { for { select { case task := <-dtm.taskChan: dtm.active.Add(1) go func() { defer dtm.active.Done() task() }() case <-dtm.quit: return } } } // AddTask adds a task to be processed func (dtm *DynamicTaskManager) AddTask(task func()) { select { case dtm.taskChan <- task: case <-dtm.quit: } } // Wait waits for all active tasks to complete func (dtm *DynamicTaskManager) Wait() { dtm.active.Wait() } // Stop stops the task manager func (dtm *DynamicTaskManager) Stop() { close(dtm.quit) dtm.Wait() } // recursiveTask demonstrates a task that spawns other tasks func recursiveTask(manager *DynamicTaskManager, depth int, maxDepth int, id string) func() { return func() { fmt.Printf("Task %s (depth %d) starting\n", id, depth) time.Sleep(100 * time.Millisecond) if depth < maxDepth { // Spawn child tasks for i := 0; i < 2; i++ { childID := fmt.Sprintf("%s.%d", id, i+1) manager.AddTask(recursiveTask(manager, depth+1, maxDepth, childID)) } } fmt.Printf("Task %s (depth %d) completed\n", id, depth) } } func main() { manager := NewDynamicTaskManager() manager.Start() defer manager.Stop() fmt.Println("Starting dynamic task processing...") // Add initial tasks that will spawn more tasks for i := 0; i < 3; i++ { taskID := fmt.Sprintf("root-%d", i+1) manager.AddTask(recursiveTask(manager, 0, 2, taskID)) } // Wait for all tasks (including dynamically created ones) to complete manager.Wait() fmt.Println("All tasks completed!") } Best Practices Always Use defer: Call Done() with defer to ensure it’s called even if panic occurs Add Before Starting: Call Add() before starting goroutines to avoid race conditions Don’t Reuse WaitGroups: Create new WaitGroup for each batch of operations Handle Panics: Use recover in goroutines to prevent panic from affecting WaitGroup Avoid Negative Counters: Don’t call Done() more times than Add() Use Timeouts: Combine with context for timeout handling Consider Alternatives: Use channels for complex coordination scenarios Common Pitfalls 1. Race Condition with Add/Done // Bad: Race condition func badExample() { var wg sync.WaitGroup for i := 0; i < 5; i++ { go func() { wg.Add(1) // Race: might be called after Wait() defer wg.Done() // do work }() } wg.Wait() // Might not wait for all goroutines } // Good: Add before starting goroutines func goodExample() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) // Add before starting goroutine go func() { defer wg.Done() // do work }() } wg.Wait() } 2. Forgetting to Call Done // Bad: Missing Done() call func badTask(wg *sync.WaitGroup) { // do work if someCondition { return // Forgot to call Done()! } wg.Done() } // Good: Always use defer func goodTask(wg *sync.WaitGroup) { defer wg.Done() // Always called // do work if someCondition { return // Done() still called } } Testing WaitGroup Patterns package main import ( "sync" "testing" "time" ) func TestWaitGroupCompletion(t *testing.T) { var wg sync.WaitGroup completed := make([]bool, 5) for i := 0; i < 5; i++ { wg.Add(1) go func(index int) { defer wg.Done() time.Sleep(10 * time.Millisecond) completed[index] = true }(i) } wg.Wait() // Verify all tasks completed for i, done := range completed { if !done { t.Errorf("Task %d did not complete", i) } } } func TestWaitGroupWithTimeout(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) wg.Add(1) go func() { defer wg.Done() time.Sleep(50 * time.Millisecond) }() go func() { wg.Wait() close(done) }() select { case <-done: // Success case <-time.After(100 * time.Millisecond): t.Error("WaitGroup did not complete within timeout") } } The WaitGroup pattern is essential for coordinating goroutines in Go. It provides a simple yet powerful way to wait for multiple concurrent operations to complete, making it perfect for parallel processing, batch operations, and synchronization barriers. ...

    August 14, 2024 · 9 min · Rafiul Alam

    Request/Response Pattern in Go

    Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool → What is the Request/Response Pattern? The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation. Key Components: Request: Contains data and a response channel Response: Contains result data and/or error information Requester: Sends request and waits for response Responder: Processes request and sends response Real-World Use Cases Database Operations: Query execution with results API Gateways: Forwarding requests to microservices Cache Systems: Get/Set operations with confirmation File Operations: Read/Write with status feedback Validation Services: Input validation with results Authentication: Login requests with tokens Basic Request/Response Implementation package main import ( "fmt" "math/rand" "time" ) // Request represents a request with a response channel type Request struct { ID string Data interface{} Response chan Response } // Response represents the response to a request type Response struct { ID string Result interface{} Error error } // Server processes requests type Server struct { requests chan Request quit chan bool } // NewServer creates a new server func NewServer() *Server { return &Server{ requests: make(chan Request), quit: make(chan bool), } } // Start begins processing requests func (s *Server) Start() { go func() { for { select { case req := <-s.requests: s.processRequest(req) case <-s.quit: return } } }() } // processRequest handles a single request func (s *Server) processRequest(req Request) { // Simulate processing time time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the request (example: double the number) var response Response response.ID = req.ID if num, ok := req.Data.(int); ok { response.Result = num * 2 } else { response.Error = fmt.Errorf("invalid data type") } // Send response back req.Response <- response } // SendRequest sends a request and waits for response func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) { responseChan := make(chan Response, 1) request := Request{ ID: id, Data: data, Response: responseChan, } s.requests <- request // Wait for response response := <-responseChan return response.Result, response.Error } // Stop shuts down the server func (s *Server) Stop() { close(s.quit) } func main() { server := NewServer() server.Start() defer server.Stop() // Send multiple requests for i := 1; i <= 5; i++ { result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10) if err != nil { fmt.Printf("Request %d failed: %v\n", i, err) } else { fmt.Printf("Request %d result: %v\n", i, result) } } } Request/Response with Timeout package main import ( "context" "fmt" "math/rand" "time" ) // TimedRequest includes context for timeout handling type TimedRequest struct { ID string Data interface{} Response chan TimedResponse Context context.Context } // TimedResponse includes timing information type TimedResponse struct { ID string Result interface{} Error error Duration time.Duration Timestamp time.Time } // TimedServer processes requests with timeout support type TimedServer struct { requests chan TimedRequest quit chan bool } func NewTimedServer() *TimedServer { return &TimedServer{ requests: make(chan TimedRequest, 10), quit: make(chan bool), } } func (ts *TimedServer) Start() { go func() { for { select { case req := <-ts.requests: go ts.processTimedRequest(req) case <-ts.quit: return } } }() } func (ts *TimedServer) processTimedRequest(req TimedRequest) { start := time.Now() // Check if context is already cancelled select { case <-req.Context.Done(): ts.sendResponse(req, nil, req.Context.Err(), start) return default: } // Simulate work with random duration workDuration := time.Duration(rand.Intn(200)) * time.Millisecond select { case <-time.After(workDuration): // Work completed if num, ok := req.Data.(int); ok { ts.sendResponse(req, num*2, nil, start) } else { ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start) } case <-req.Context.Done(): // Context cancelled during work ts.sendResponse(req, nil, req.Context.Err(), start) } } func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) { response := TimedResponse{ ID: req.ID, Result: result, Error: err, Duration: time.Since(start), Timestamp: time.Now(), } select { case req.Response <- response: case <-req.Context.Done(): // Client no longer waiting } } // SendRequestWithTimeout sends a request with a timeout func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() responseChan := make(chan TimedResponse, 1) request := TimedRequest{ ID: id, Data: data, Response: responseChan, Context: ctx, } select { case ts.requests <- request: case <-ctx.Done(): return nil, ctx.Err() } select { case response := <-responseChan: fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration) return response.Result, response.Error case <-ctx.Done(): return nil, ctx.Err() } } func (ts *TimedServer) Stop() { close(ts.quit) } func main() { server := NewTimedServer() server.Start() defer server.Stop() // Send requests with different timeouts requests := []struct { id string data int timeout time.Duration }{ {"fast", 10, 300 * time.Millisecond}, {"medium", 20, 150 * time.Millisecond}, {"slow", 30, 50 * time.Millisecond}, // This might timeout } for _, req := range requests { result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout) if err != nil { fmt.Printf("Request %s failed: %v\n", req.id, err) } else { fmt.Printf("Request %s result: %v\n", req.id, result) } } } Future/Promise Pattern package main import ( "context" "fmt" "sync" "time" ) // Future represents a value that will be available in the future type Future struct { mu sync.Mutex done chan struct{} result interface{} err error computed bool } // NewFuture creates a new future func NewFuture() *Future { return &Future{ done: make(chan struct{}), } } // Set sets the future's value func (f *Future) Set(result interface{}, err error) { f.mu.Lock() defer f.mu.Unlock() if f.computed { return // Already set } f.result = result f.err = err f.computed = true close(f.done) } // Get waits for and returns the future's value func (f *Future) Get() (interface{}, error) { <-f.done return f.result, f.err } // GetWithTimeout waits for the value with a timeout func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-time.After(timeout): return nil, fmt.Errorf("timeout waiting for future") } } // GetWithContext waits for the value with context cancellation func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-ctx.Done(): return nil, ctx.Err() } } // IsReady returns true if the future has been computed func (f *Future) IsReady() bool { f.mu.Lock() defer f.mu.Unlock() return f.computed } // AsyncService demonstrates async operations with futures type AsyncService struct { workers chan struct{} } func NewAsyncService(maxWorkers int) *AsyncService { return &AsyncService{ workers: make(chan struct{}, maxWorkers), } } // ProcessAsync starts async processing and returns a future func (as *AsyncService) ProcessAsync(data interface{}) *Future { future := NewFuture() go func() { // Acquire worker slot as.workers <- struct{}{} defer func() { <-as.workers }() // Simulate processing time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) // Process data if num, ok := data.(int); ok { future.Set(num*num, nil) } else { future.Set(nil, fmt.Errorf("invalid data type")) } }() return future } func main() { service := NewAsyncService(3) // Start multiple async operations futures := make([]*Future, 5) for i := 0; i < 5; i++ { fmt.Printf("Starting async operation %d\n", i+1) futures[i] = service.ProcessAsync((i + 1) * 10) } // Wait for all results fmt.Println("\nWaiting for results...") for i, future := range futures { result, err := future.Get() if err != nil { fmt.Printf("Operation %d failed: %v\n", i+1, err) } else { fmt.Printf("Operation %d result: %v\n", i+1, result) } } // Example with timeout fmt.Println("\nTesting timeout...") timeoutFuture := service.ProcessAsync(100) result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond) if err != nil { fmt.Printf("Timeout example failed: %v\n", err) } else { fmt.Printf("Timeout example result: %v\n", result) } } Batch Request/Response package main import ( "fmt" "sync" "time" ) // BatchRequest represents multiple requests processed together type BatchRequest struct { ID string Items []interface{} Response chan BatchResponse } // BatchResponse contains results for all items in a batch type BatchResponse struct { ID string Results []BatchResult Error error } // BatchResult represents the result of processing one item type BatchResult struct { Index int Result interface{} Error error } // BatchProcessor processes requests in batches for efficiency type BatchProcessor struct { requests chan BatchRequest batchSize int batchWindow time.Duration quit chan bool } func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor { return &BatchProcessor{ requests: make(chan BatchRequest, 100), batchSize: batchSize, batchWindow: batchWindow, quit: make(chan bool), } } func (bp *BatchProcessor) Start() { go func() { batch := make([]BatchRequest, 0, bp.batchSize) timer := time.NewTimer(bp.batchWindow) timer.Stop() for { select { case req := <-bp.requests: batch = append(batch, req) if len(batch) == 1 { timer.Reset(bp.batchWindow) } if len(batch) >= bp.batchSize { bp.processBatch(batch) batch = batch[:0] timer.Stop() } case <-timer.C: if len(batch) > 0 { bp.processBatch(batch) batch = batch[:0] } case <-bp.quit: if len(batch) > 0 { bp.processBatch(batch) } return } } }() } func (bp *BatchProcessor) processBatch(batch []BatchRequest) { fmt.Printf("Processing batch of %d requests\n", len(batch)) var wg sync.WaitGroup for _, req := range batch { wg.Add(1) go func(r BatchRequest) { defer wg.Done() bp.processRequest(r) }(req) } wg.Wait() } func (bp *BatchProcessor) processRequest(req BatchRequest) { results := make([]BatchResult, len(req.Items)) for i, item := range req.Items { // Simulate processing each item time.Sleep(10 * time.Millisecond) if num, ok := item.(int); ok { results[i] = BatchResult{ Index: i, Result: num * 3, } } else { results[i] = BatchResult{ Index: i, Error: fmt.Errorf("invalid item type at index %d", i), } } } response := BatchResponse{ ID: req.ID, Results: results, } req.Response <- response } // SendBatchRequest sends a batch request and waits for response func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) { responseChan := make(chan BatchResponse, 1) request := BatchRequest{ ID: id, Items: items, Response: responseChan, } bp.requests <- request response := <-responseChan return response.Results, response.Error } func (bp *BatchProcessor) Stop() { close(bp.quit) } func main() { processor := NewBatchProcessor(3, 100*time.Millisecond) processor.Start() defer processor.Stop() // Send individual batch requests go func() { results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5}) if err != nil { fmt.Printf("Batch 1 failed: %v\n", err) return } fmt.Println("Batch 1 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() go func() { results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30}) if err != nil { fmt.Printf("Batch 2 failed: %v\n", err) return } fmt.Println("Batch 2 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() // Wait for processing time.Sleep(500 * time.Millisecond) } Best Practices Always Use Timeouts: Prevent indefinite blocking Handle Context Cancellation: Support graceful cancellation Buffer Response Channels: Avoid blocking responders Error Handling: Always include error information in responses Resource Cleanup: Ensure channels and goroutines are cleaned up Monitoring: Track request/response times and success rates Backpressure: Handle situations when responders are overwhelmed Common Pitfalls Deadlocks: Not buffering response channels Goroutine Leaks: Not handling context cancellation Memory Leaks: Not closing channels properly Blocking Operations: Long-running operations without timeouts Lost Responses: Not handling channel closure Testing Request/Response package main import ( "context" "testing" "time" ) func TestRequestResponse(t *testing.T) { server := NewTimedServer() server.Start() defer server.Stop() // Test successful request result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond) if err != nil { t.Fatalf("Request failed: %v", err) } if result != 84 { t.Errorf("Expected 84, got %v", result) } // Test timeout _, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond) if err == nil { t.Error("Expected timeout error") } } func TestFuture(t *testing.T) { future := NewFuture() // Test that future is not ready initially if future.IsReady() { t.Error("Future should not be ready initially") } // Set value in goroutine go func() { time.Sleep(50 * time.Millisecond) future.Set("test result", nil) }() // Get value result, err := future.Get() if err != nil { t.Fatalf("Future failed: %v", err) } if result != "test result" { t.Errorf("Expected 'test result', got %v", result) } // Test that future is ready after setting if !future.IsReady() { t.Error("Future should be ready after setting") } } The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation. ...

    July 31, 2024 · 10 min · Rafiul Alam

    Once Pattern in Go

    Go Concurrency Patterns Series: ← WaitGroup Pattern | Series Overview | Context Pattern → What is the Once Pattern? The Once pattern uses sync.Once to ensure that a piece of code executes exactly once, regardless of how many goroutines call it. This is essential for thread-safe initialization, singleton patterns, and one-time setup operations in concurrent programs. Key Characteristics: Thread-safe: Multiple goroutines can call it safely Exactly once: Code executes only on the first call Blocking: Subsequent calls wait for the first execution to complete No return values: The function passed to Do() cannot return values Real-World Use Cases Singleton Initialization: Create single instances of objects Configuration Loading: Load config files once at startup Database Connections: Initialize connection pools Logger Setup: Configure logging systems Resource Initialization: Set up expensive resources Feature Flags: Initialize feature flag systems Basic Once Usage package main import ( "fmt" "sync" "time" ) var ( instance *Database once sync.Once ) // Database represents a database connection type Database struct { ConnectionString string IsConnected bool } // Connect simulates database connection func (db *Database) Connect() { fmt.Println("Connecting to database...") time.Sleep(100 * time.Millisecond) // Simulate connection time db.IsConnected = true fmt.Println("Database connected!") } // GetDatabase returns the singleton database instance func GetDatabase() *Database { once.Do(func() { fmt.Println("Initializing database instance...") instance = &Database{ ConnectionString: "localhost:5432", } instance.Connect() }) return instance } func main() { var wg sync.WaitGroup // Multiple goroutines trying to get database instance for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Goroutine %d requesting database\n", id) db := GetDatabase() fmt.Printf("Goroutine %d got database: %+v\n", id, db) }(i) } wg.Wait() // Verify all goroutines got the same instance fmt.Printf("Final instance: %p\n", GetDatabase()) } Configuration Manager with Once package main import ( "encoding/json" "fmt" "os" "sync" ) // Config represents application configuration type Config struct { DatabaseURL string `json:"database_url"` APIKey string `json:"api_key"` Debug bool `json:"debug"` Port int `json:"port"` } // ConfigManager manages application configuration type ConfigManager struct { config *Config once sync.Once err error } // NewConfigManager creates a new config manager func NewConfigManager() *ConfigManager { return &ConfigManager{} } // loadConfig loads configuration from file func (cm *ConfigManager) loadConfig() { fmt.Println("Loading configuration...") // Simulate config file reading configData := `{ "database_url": "postgres://localhost:5432/myapp", "api_key": "secret-api-key-123", "debug": true, "port": 8080 }` var config Config if err := json.Unmarshal([]byte(configData), &config); err != nil { cm.err = fmt.Errorf("failed to parse config: %w", err) return } cm.config = &config fmt.Println("Configuration loaded successfully!") } // GetConfig returns the configuration, loading it once if needed func (cm *ConfigManager) GetConfig() (*Config, error) { cm.once.Do(cm.loadConfig) return cm.config, cm.err } func main() { configManager := NewConfigManager() var wg sync.WaitGroup // Multiple goroutines accessing configuration for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() config, err := configManager.GetConfig() if err != nil { fmt.Printf("Goroutine %d: Error loading config: %v\n", id, err) return } fmt.Printf("Goroutine %d: Port=%d, Debug=%v\n", id, config.Port, config.Debug) }(i) } wg.Wait() } Logger Initialization with Once package main import ( "fmt" "log" "os" "sync" ) // Logger wraps the standard logger with additional functionality type Logger struct { *log.Logger level string } var ( logger *Logger loggerOnce sync.Once ) // initLogger initializes the global logger func initLogger() { fmt.Println("Initializing logger...") // Create log file file, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalln("Failed to open log file:", err) } logger = &Logger{ Logger: log.New(file, "APP: ", log.Ldate|log.Ltime|log.Lshortfile), level: "INFO", } logger.Println("Logger initialized") fmt.Println("Logger setup complete!") } // GetLogger returns the singleton logger instance func GetLogger() *Logger { loggerOnce.Do(initLogger) return logger } // Info logs an info message func (l *Logger) Info(msg string) { l.Printf("[INFO] %s", msg) } // Error logs an error message func (l *Logger) Error(msg string) { l.Printf("[ERROR] %s", msg) } func main() { var wg sync.WaitGroup // Multiple goroutines using the logger for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() logger := GetLogger() logger.Info(fmt.Sprintf("Message from goroutine %d", id)) if id%2 == 0 { logger.Error(fmt.Sprintf("Error from goroutine %d", id)) } }(i) } wg.Wait() // Clean up if logger != nil { logger.Info("Application shutting down") } } Resource Pool Initialization package main import ( "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int Connected bool } // Connect simulates connecting to database func (c *Connection) Connect() error { time.Sleep(50 * time.Millisecond) // Simulate connection time c.Connected = true return nil } // Close simulates closing the connection func (c *Connection) Close() error { c.Connected = false return nil } // ConnectionPool manages a pool of database connections type ConnectionPool struct { connections []*Connection available chan *Connection once sync.Once initErr error } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { return &ConnectionPool{ available: make(chan *Connection, size), } } // initialize sets up the connection pool func (cp *ConnectionPool) initialize() { fmt.Println("Initializing connection pool...") poolSize := cap(cp.available) cp.connections = make([]*Connection, poolSize) // Create and connect all connections for i := 0; i < poolSize; i++ { conn := &Connection{ID: i + 1} if err := conn.Connect(); err != nil { cp.initErr = fmt.Errorf("failed to connect connection %d: %w", i+1, err) return } cp.connections[i] = conn cp.available <- conn } fmt.Printf("Connection pool initialized with %d connections\n", poolSize) } // GetConnection gets a connection from the pool func (cp *ConnectionPool) GetConnection() (*Connection, error) { cp.once.Do(cp.initialize) if cp.initErr != nil { return nil, cp.initErr } select { case conn := <-cp.available: return conn, nil case <-time.After(5 * time.Second): return nil, fmt.Errorf("timeout waiting for connection") } } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { select { case cp.available <- conn: default: // Pool is full, close the connection conn.Close() } } // Close closes all connections in the pool func (cp *ConnectionPool) Close() error { close(cp.available) for _, conn := range cp.connections { if conn != nil { conn.Close() } } return nil } func main() { pool := NewConnectionPool(3) defer pool.Close() var wg sync.WaitGroup // Multiple goroutines using the connection pool for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, err := pool.GetConnection() if err != nil { fmt.Printf("Worker %d: Failed to get connection: %v\n", id, err) return } fmt.Printf("Worker %d: Got connection %d\n", id, conn.ID) // Simulate work time.Sleep(200 * time.Millisecond) pool.ReturnConnection(conn) fmt.Printf("Worker %d: Returned connection %d\n", id, conn.ID) }(i) } wg.Wait() } Advanced Once Patterns 1. Once with Error Handling package main import ( "fmt" "sync" ) // OnceWithError provides Once functionality with error handling type OnceWithError struct { once sync.Once err error } // Do executes the function once and stores any error func (o *OnceWithError) Do(f func() error) error { o.once.Do(func() { o.err = f() }) return o.err } // ExpensiveResource represents a resource that's expensive to initialize type ExpensiveResource struct { Data string } var ( resource *ExpensiveResource resourceOnce OnceWithError ) // initResource initializes the expensive resource func initResource() error { fmt.Println("Initializing expensive resource...") // Simulate potential failure if false { // Change to true to simulate error return fmt.Errorf("failed to initialize resource") } resource = &ExpensiveResource{ Data: "Important data", } fmt.Println("Resource initialized successfully!") return nil } // GetResource returns the resource, initializing it once if needed func GetResource() (*ExpensiveResource, error) { err := resourceOnce.Do(initResource) if err != nil { return nil, err } return resource, nil } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := GetResource() if err != nil { fmt.Printf("Goroutine %d: Error: %v\n", id, err) return } fmt.Printf("Goroutine %d: Got resource: %s\n", id, resource.Data) }(i) } wg.Wait() } 2. Resettable Once package main import ( "fmt" "sync" "sync/atomic" ) // ResettableOnce allows resetting the once behavior type ResettableOnce struct { mu sync.Mutex done uint32 } // Do executes the function once func (ro *ResettableOnce) Do(f func()) { if atomic.LoadUint32(&ro.done) == 0 { ro.doSlow(f) } } func (ro *ResettableOnce) doSlow(f func()) { ro.mu.Lock() defer ro.mu.Unlock() if ro.done == 0 { defer atomic.StoreUint32(&ro.done, 1) f() } } // Reset allows the once to be used again func (ro *ResettableOnce) Reset() { ro.mu.Lock() defer ro.mu.Unlock() atomic.StoreUint32(&ro.done, 0) } // IsDone returns true if the function has been executed func (ro *ResettableOnce) IsDone() bool { return atomic.LoadUint32(&ro.done) == 1 } func main() { var once ResettableOnce counter := 0 task := func() { counter++ fmt.Printf("Task executed, counter: %d\n", counter) } // First round fmt.Println("First round:") for i := 0; i < 3; i++ { once.Do(task) } fmt.Printf("Done: %v\n", once.IsDone()) // Reset and second round fmt.Println("\nAfter reset:") once.Reset() fmt.Printf("Done: %v\n", once.IsDone()) for i := 0; i < 3; i++ { once.Do(task) } } Best Practices Use for Initialization: Perfect for one-time setup operations Keep Functions Simple: The function passed to Do() should be straightforward Handle Errors Separately: Use wrapper types for error handling Avoid Side Effects: Be careful with functions that have external side effects Don’t Nest Once Calls: Avoid calling Do() from within another Do() Consider Alternatives: Use init() for package-level initialization when appropriate Common Pitfalls 1. Expecting Return Values // Bad: Once.Do doesn't support return values var once sync.Once var result string func badExample() string { once.Do(func() { // Can't return from here result = "computed value" }) return result // This works but is not ideal } // Good: Use a wrapper or store results in accessible variables type OnceResult struct { once sync.Once result string err error } func (or *OnceResult) Get() (string, error) { or.once.Do(func() { or.result, or.err = computeValue() }) return or.result, or.err } 2. Panic in Once Function // Bad: Panic prevents future calls var once sync.Once func badOnceFunc() { once.Do(func() { panic("something went wrong") // Once will never execute again }) } // Good: Handle panics appropriately func goodOnceFunc() { once.Do(func() { defer func() { if r := recover(); r != nil { // Handle panic appropriately fmt.Printf("Recovered from panic: %v\n", r) } }() // risky operation }) } Testing Once Patterns package main import ( "sync" "testing" ) func TestOnceExecution(t *testing.T) { var once sync.Once counter := 0 var wg sync.WaitGroup // Start multiple goroutines for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() once.Do(func() { counter++ }) }() } wg.Wait() if counter != 1 { t.Errorf("Expected counter to be 1, got %d", counter) } } func TestOnceWithError(t *testing.T) { var onceErr OnceWithError callCount := 0 // First call with error err1 := onceErr.Do(func() error { callCount++ return fmt.Errorf("test error") }) // Second call should return same error without executing function err2 := onceErr.Do(func() error { callCount++ return nil }) if callCount != 1 { t.Errorf("Expected function to be called once, got %d", callCount) } if err1 == nil || err2 == nil { t.Error("Expected both calls to return error") } if err1.Error() != err2.Error() { t.Error("Expected same error from both calls") } } The Once pattern is essential for thread-safe initialization in Go. It ensures that expensive or critical setup operations happen exactly once, making it perfect for singletons, configuration loading, and resource initialization in concurrent applications. ...

    July 10, 2024 · 9 min · Rafiul Alam

    Mutex Patterns in Go

    Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern → What are Mutex Patterns? Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions. Key Types: Mutex: Exclusive access (one goroutine at a time) RWMutex: Reader-writer locks (multiple readers OR one writer) Lock-free: Atomic operations without explicit locks Real-World Use Cases Shared Counters: Statistics, metrics, rate limiting Cache Systems: Thread-safe caching with read/write operations Configuration Management: Safe updates to application config Connection Pools: Managing database/HTTP connection pools Resource Allocation: Tracking and managing limited resources State Machines: Protecting state transitions Basic Mutex Usage package main import ( "fmt" "sync" "time" ) // Counter demonstrates basic mutex usage type Counter struct { mu sync.Mutex value int } // Increment safely increments the counter func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } // Decrement safely decrements the counter func (c *Counter) Decrement() { c.mu.Lock() defer c.mu.Unlock() c.value-- } // Value safely returns the current value func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Add safely adds a value to the counter func (c *Counter) Add(delta int) { c.mu.Lock() defer c.mu.Unlock() c.value += delta } func main() { counter := &Counter{} var wg sync.WaitGroup // Start multiple goroutines incrementing the counter for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { counter.Increment() } fmt.Printf("Goroutine %d finished\n", id) }(i) } // Start some goroutines decrementing for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { counter.Decrement() } fmt.Printf("Decrementer %d finished\n", id) }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850 } RWMutex for Read-Heavy Workloads package main import ( "fmt" "math/rand" "sync" "time" ) // Cache demonstrates RWMutex usage for read-heavy scenarios type Cache struct { mu sync.RWMutex data map[string]interface{} } // NewCache creates a new cache func NewCache() *Cache { return &Cache{ data: make(map[string]interface{}), } } // Get retrieves a value (read operation) func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() defer c.mu.RUnlock() value, exists := c.data[key] return value, exists } // Set stores a value (write operation) func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() defer c.mu.Unlock() c.data[key] = value } // Delete removes a value (write operation) func (c *Cache) Delete(key string) { c.mu.Lock() defer c.mu.Unlock() delete(c.data, key) } // Keys returns all keys (read operation) func (c *Cache) Keys() []string { c.mu.RLock() defer c.mu.RUnlock() keys := make([]string, 0, len(c.data)) for key := range c.data { keys = append(keys, key) } return keys } // Size returns the number of items (read operation) func (c *Cache) Size() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) } // Clear removes all items (write operation) func (c *Cache) Clear() { c.mu.Lock() defer c.mu.Unlock() c.data = make(map[string]interface{}) } func main() { cache := NewCache() var wg sync.WaitGroup // Writers (fewer, less frequent) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { key := fmt.Sprintf("key-%d-%d", id, j) cache.Set(key, fmt.Sprintf("value-%d-%d", id, j)) time.Sleep(10 * time.Millisecond) } }(i) } // Readers (many, frequent) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { // Try to read random keys key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50)) if value, exists := cache.Get(key); exists { fmt.Printf("Reader %d found %s: %v\n", id, key, value) } time.Sleep(5 * time.Millisecond) } }(i) } // Size checker wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { size := cache.Size() fmt.Printf("Cache size: %d\n", size) time.Sleep(100 * time.Millisecond) } }() wg.Wait() fmt.Printf("Final cache size: %d\n", cache.Size()) } Advanced Mutex Patterns 1. Conditional Variables Pattern package main import ( "fmt" "sync" "time" ) // Buffer demonstrates conditional variables with mutex type Buffer struct { mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond items []interface{} capacity int } // NewBuffer creates a new bounded buffer func NewBuffer(capacity int) *Buffer { b := &Buffer{ items: make([]interface{}, 0, capacity), capacity: capacity, } b.notEmpty = sync.NewCond(&b.mu) b.notFull = sync.NewCond(&b.mu) return b } // Put adds an item to the buffer (blocks if full) func (b *Buffer) Put(item interface{}) { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is full for len(b.items) == b.capacity { b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not empty b.notEmpty.Signal() } // Get removes an item from the buffer (blocks if empty) func (b *Buffer) Get() interface{} { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is empty for len(b.items) == 0 { b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not full b.notFull.Signal() return item } // Size returns current buffer size func (b *Buffer) Size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.items) } func main() { buffer := NewBuffer(3) var wg sync.WaitGroup // Producers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := fmt.Sprintf("item-%d-%d", id, j) buffer.Put(item) time.Sleep(200 * time.Millisecond) } }(i) } // Consumers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := buffer.Get() fmt.Printf("Consumer %d processed: %v\n", id, item) time.Sleep(300 * time.Millisecond) } }(i) } wg.Wait() } 2. Lock-Free Patterns with Atomic Operations package main import ( "fmt" "sync" "sync/atomic" "time" ) // AtomicCounter demonstrates lock-free counter type AtomicCounter struct { value int64 } // Increment atomically increments the counter func (ac *AtomicCounter) Increment() int64 { return atomic.AddInt64(&ac.value, 1) } // Decrement atomically decrements the counter func (ac *AtomicCounter) Decrement() int64 { return atomic.AddInt64(&ac.value, -1) } // Value atomically reads the current value func (ac *AtomicCounter) Value() int64 { return atomic.LoadInt64(&ac.value) } // CompareAndSwap atomically compares and swaps func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&ac.value, old, new) } // AtomicFlag demonstrates atomic boolean operations type AtomicFlag struct { flag int32 } // Set atomically sets the flag to true func (af *AtomicFlag) Set() { atomic.StoreInt32(&af.flag, 1) } // Clear atomically sets the flag to false func (af *AtomicFlag) Clear() { atomic.StoreInt32(&af.flag, 0) } // IsSet atomically checks if flag is set func (af *AtomicFlag) IsSet() bool { return atomic.LoadInt32(&af.flag) == 1 } // TestAndSet atomically tests and sets the flag func (af *AtomicFlag) TestAndSet() bool { return atomic.SwapInt32(&af.flag, 1) == 1 } func main() { counter := &AtomicCounter{} flag := &AtomicFlag{} var wg sync.WaitGroup // Test atomic counter fmt.Println("Testing atomic counter...") for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Test atomic flag fmt.Println("\nTesting atomic flag...") // Multiple goroutines trying to set flag for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() if !flag.TestAndSet() { fmt.Printf("Goroutine %d acquired the flag\n", id) time.Sleep(100 * time.Millisecond) flag.Clear() fmt.Printf("Goroutine %d released the flag\n", id) } else { fmt.Printf("Goroutine %d failed to acquire flag\n", id) } }(i) } wg.Wait() } 3. Resource Pool Pattern package main import ( "errors" "fmt" "sync" "time" ) // Resource represents a limited resource type Resource struct { ID int Data string } // ResourcePool manages a pool of limited resources type ResourcePool struct { mu sync.Mutex resources []*Resource available chan *Resource maxSize int created int } // NewResourcePool creates a new resource pool func NewResourcePool(maxSize int) *ResourcePool { return &ResourcePool{ resources: make([]*Resource, 0, maxSize), available: make(chan *Resource, maxSize), maxSize: maxSize, } } // createResource creates a new resource func (rp *ResourcePool) createResource() *Resource { rp.mu.Lock() defer rp.mu.Unlock() if rp.created >= rp.maxSize { return nil } rp.created++ resource := &Resource{ ID: rp.created, Data: fmt.Sprintf("Resource-%d", rp.created), } fmt.Printf("Created resource %d\n", resource.ID) return resource } // Acquire gets a resource from the pool func (rp *ResourcePool) Acquire() (*Resource, error) { select { case resource := <-rp.available: fmt.Printf("Acquired existing resource %d\n", resource.ID) return resource, nil default: // No available resource, try to create one if resource := rp.createResource(); resource != nil { return resource, nil } // Pool is full, wait for available resource fmt.Println("Pool full, waiting for available resource...") select { case resource := <-rp.available: fmt.Printf("Acquired resource %d after waiting\n", resource.ID) return resource, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout waiting for resource") } } } // Release returns a resource to the pool func (rp *ResourcePool) Release(resource *Resource) { select { case rp.available <- resource: fmt.Printf("Released resource %d\n", resource.ID) default: // Channel full, resource will be garbage collected fmt.Printf("Pool full, discarding resource %d\n", resource.ID) } } // Size returns current pool statistics func (rp *ResourcePool) Size() (available, created int) { rp.mu.Lock() defer rp.mu.Unlock() return len(rp.available), rp.created } func main() { pool := NewResourcePool(3) var wg sync.WaitGroup // Multiple goroutines using resources for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := pool.Acquire() if err != nil { fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err) return } fmt.Printf("Worker %d using resource %d\n", id, resource.ID) // Simulate work time.Sleep(time.Duration(200+id*100) * time.Millisecond) pool.Release(resource) fmt.Printf("Worker %d finished\n", id) }(i) } wg.Wait() available, created := pool.Size() fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created) } Best Practices Always Use defer: Ensure locks are released even if panic occurs Keep Critical Sections Small: Minimize time holding locks Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies Use RWMutex for Read-Heavy: Better performance for read-heavy workloads Consider Lock-Free: Use atomic operations when possible Document Lock Order: If multiple locks needed, establish clear ordering Prefer Channels: Use channels for communication, locks for shared state Common Pitfalls 1. Deadlocks // Bad: Potential deadlock with nested locks type BadAccount struct { mu sync.Mutex balance int } func (a *BadAccount) Transfer(to *BadAccount, amount int) { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() // Potential deadlock if called concurrently defer to.mu.Unlock() a.balance -= amount to.balance += amount } // Good: Ordered locking to prevent deadlock func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) { // Always acquire locks in consistent order first, second := a, to if a.ID > to.ID { first, second = to, a } first.mu.Lock() defer first.mu.Unlock() second.mu.Lock() defer second.mu.Unlock() a.balance -= amount to.balance += amount } 2. Race Conditions // Bad: Race condition type BadCounter struct { mu sync.Mutex value int } func (c *BadCounter) IncrementIfEven() { if c.value%2 == 0 { // Race: value might change between check and increment c.mu.Lock() c.value++ c.mu.Unlock() } } // Good: Atomic check and update func (c *BadCounter) SafeIncrementIfEven() { c.mu.Lock() defer c.mu.Unlock() if c.value%2 == 0 { c.value++ } } Testing Concurrent Code package main import ( "sync" "testing" ) func TestCounter(t *testing.T) { counter := &Counter{} var wg sync.WaitGroup goroutines := 100 increments := 1000 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < increments; j++ { counter.Increment() } }() } wg.Wait() expected := goroutines * increments if counter.Value() != expected { t.Errorf("Expected %d, got %d", expected, counter.Value()) } } // Run with: go test -race func TestCounterRace(t *testing.T) { counter := &Counter{} go func() { for i := 0; i < 1000; i++ { counter.Increment() } }() go func() { for i := 0; i < 1000; i++ { _ = counter.Value() } }() } Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios. ...

    July 3, 2024 · 10 min · Rafiul Alam