Go Memory Model Explained

    Go Concurrency Patterns Series: ← Context Propagation | Series Overview | Graceful Shutdown → What is the Go Memory Model? The Go Memory Model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine. Understanding this model is crucial for writing correct concurrent code without data races. Core Concepts: Happens-Before: Ordering guarantees between memory operations Memory Visibility: When writes in one goroutine are visible to reads in another Synchronization: Mechanisms that establish happens-before relationships Data Races: Concurrent memory accesses without proper synchronization Real-World Impact Correctness: Prevent subtle bugs in concurrent code Performance: Understand when synchronization is necessary Debugging: Diagnose race conditions and memory visibility issues Optimization: Make informed decisions about lock-free algorithms Code Review: Identify potential concurrency bugs The Happens-Before Relationship Definition A happens-before relationship guarantees that one event occurs before another in program order, and that effects of the first event are visible to the second. ...

    June 25, 2024 · 12 min · Rafiul Alam

    Context Propagation Patterns in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Memory Model → What is Context Propagation? Context propagation is the practice of threading context.Context through your application to carry cancellation signals, deadlines, and request-scoped values across API boundaries, goroutines, and service boundaries. This is critical for building observable, responsive distributed systems. Key Capabilities: Distributed Tracing: Propagate trace IDs across services Cancellation Cascades: Cancel entire request trees Deadline Enforcement: Ensure requests complete within time budgets Request-Scoped Values: Carry metadata without polluting function signatures Real-World Use Cases Microservices: Trace requests across multiple services API Gateways: Propagate timeouts and user context Database Layers: Cancel queries when requests are abandoned Message Queues: Propagate processing deadlines HTTP Middleware: Extract and inject trace headers gRPC Services: Automatic context propagation Basic Context Propagation Propagating Through Function Calls package main import ( "context" "fmt" "time" ) // ServiceA calls ServiceB which calls ServiceC // Context propagates through all layers func ServiceA(ctx context.Context, userID string) error { // Add request-scoped value ctx = context.WithValue(ctx, "user_id", userID) ctx = context.WithValue(ctx, "request_id", generateRequestID()) fmt.Printf("[ServiceA] Processing request for user: %s\n", userID) // Propagate context to next service return ServiceB(ctx) } func ServiceB(ctx context.Context) error { // Retrieve values from context userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceB] User: %s, Request: %s\n", userID, requestID) // Add timeout for downstream call ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() return ServiceC(ctx) } func ServiceC(ctx context.Context) error { userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceC] Processing for User: %s, Request: %s\n", userID, requestID) // Simulate work select { case <-time.After(1 * time.Second): fmt.Println("[ServiceC] Work completed") return nil case <-ctx.Done(): fmt.Printf("[ServiceC] Cancelled: %v\n", ctx.Err()) return ctx.Err() } } func generateRequestID() string { return fmt.Sprintf("req-%d", time.Now().UnixNano()) } func main() { ctx := context.Background() err := ServiceA(ctx, "user-123") if err != nil { fmt.Printf("Error: %v\n", err) } } Output: ...

    June 24, 2024 · 11 min · Rafiul Alam

    Context Pattern in Go

    Go Concurrency Patterns Series: ← Once Pattern | Series Overview | Circuit Breaker → What is the Context Pattern? The Context pattern uses Go’s context package to carry cancellation signals, deadlines, timeouts, and request-scoped values across API boundaries and between goroutines. It’s essential for building responsive, cancellable operations and managing request lifecycles. Key Features: Cancellation: Signal when operations should stop Timeouts: Automatically cancel after a duration Deadlines: Cancel at a specific time Values: Carry request-scoped data Real-World Use Cases HTTP Servers: Request cancellation and timeouts Database Operations: Query timeouts and cancellation API Calls: External service timeouts Background Jobs: Graceful shutdown Microservices: Request tracing and correlation IDs File Operations: Long-running I/O with cancellation Basic Context Usage package main import ( "context" "fmt" "math/rand" "time" ) // simulateWork simulates a long-running operation func simulateWork(ctx context.Context, name string, duration time.Duration) error { fmt.Printf("%s: Starting work (expected duration: %v)\n", name, duration) select { case <-time.After(duration): fmt.Printf("%s: Work completed successfully\n", name) return nil case <-ctx.Done(): fmt.Printf("%s: Work cancelled: %v\n", name, ctx.Err()) return ctx.Err() } } func main() { // Example 1: Context with timeout fmt.Println("=== Context with Timeout ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel1() err := simulateWork(ctx1, "Task1", 1*time.Second) // Should complete if err != nil { fmt.Printf("Task1 error: %v\n", err) } err = simulateWork(ctx1, "Task2", 3*time.Second) // Should timeout if err != nil { fmt.Printf("Task2 error: %v\n", err) } // Example 2: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx2, cancel2 := context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) fmt.Println("Cancelling context...") cancel2() }() err = simulateWork(ctx2, "Task3", 3*time.Second) // Should be cancelled if err != nil { fmt.Printf("Task3 error: %v\n", err) } // Example 3: Context with deadline fmt.Println("\n=== Context with Deadline ===") deadline := time.Now().Add(1500 * time.Millisecond) ctx3, cancel3 := context.WithDeadline(context.Background(), deadline) defer cancel3() err = simulateWork(ctx3, "Task4", 2*time.Second) // Should hit deadline if err != nil { fmt.Printf("Task4 error: %v\n", err) } } Context with Values package main import ( "context" "fmt" "log" "net/http" "time" ) // Key types for context values type contextKey string const ( RequestIDKey contextKey = "requestID" UserIDKey contextKey = "userID" TraceIDKey contextKey = "traceID" ) // RequestInfo holds request-scoped information type RequestInfo struct { RequestID string UserID string TraceID string StartTime time.Time } // withRequestInfo adds request information to context func withRequestInfo(ctx context.Context, info RequestInfo) context.Context { ctx = context.WithValue(ctx, RequestIDKey, info.RequestID) ctx = context.WithValue(ctx, UserIDKey, info.UserID) ctx = context.WithValue(ctx, TraceIDKey, info.TraceID) return ctx } // getRequestID extracts request ID from context func getRequestID(ctx context.Context) string { if id, ok := ctx.Value(RequestIDKey).(string); ok { return id } return "unknown" } // getUserID extracts user ID from context func getUserID(ctx context.Context) string { if id, ok := ctx.Value(UserIDKey).(string); ok { return id } return "anonymous" } // getTraceID extracts trace ID from context func getTraceID(ctx context.Context) string { if id, ok := ctx.Value(TraceIDKey).(string); ok { return id } return "no-trace" } // logWithContext logs with context information func logWithContext(ctx context.Context, message string) { requestID := getRequestID(ctx) userID := getUserID(ctx) traceID := getTraceID(ctx) fmt.Printf("[%s][%s][%s] %s\n", requestID, userID, traceID, message) } // businessLogic simulates business logic that uses context func businessLogic(ctx context.Context) error { logWithContext(ctx, "Starting business logic") // Simulate some work select { case <-time.After(500 * time.Millisecond): logWithContext(ctx, "Business logic completed") return nil case <-ctx.Done(): logWithContext(ctx, "Business logic cancelled") return ctx.Err() } } // databaseOperation simulates a database operation func databaseOperation(ctx context.Context, query string) error { logWithContext(ctx, fmt.Sprintf("Executing query: %s", query)) select { case <-time.After(200 * time.Millisecond): logWithContext(ctx, "Database operation completed") return nil case <-ctx.Done(): logWithContext(ctx, "Database operation cancelled") return ctx.Err() } } // externalAPICall simulates calling an external API func externalAPICall(ctx context.Context, endpoint string) error { logWithContext(ctx, fmt.Sprintf("Calling external API: %s", endpoint)) select { case <-time.After(300 * time.Millisecond): logWithContext(ctx, "External API call completed") return nil case <-ctx.Done(): logWithContext(ctx, "External API call cancelled") return ctx.Err() } } // handleRequest simulates handling an HTTP request func handleRequest(ctx context.Context) error { logWithContext(ctx, "Handling request") // Perform multiple operations if err := databaseOperation(ctx, "SELECT * FROM users"); err != nil { return err } if err := externalAPICall(ctx, "/api/v1/data"); err != nil { return err } if err := businessLogic(ctx); err != nil { return err } logWithContext(ctx, "Request handled successfully") return nil } func main() { // Simulate incoming request requestInfo := RequestInfo{ RequestID: "req-12345", UserID: "user-67890", TraceID: "trace-abcdef", StartTime: time.Now(), } // Create context with timeout and request info ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() ctx = withRequestInfo(ctx, requestInfo) // Handle the request if err := handleRequest(ctx); err != nil { logWithContext(ctx, fmt.Sprintf("Request failed: %v", err)) } // Example with early cancellation fmt.Println("\n=== Early Cancellation Example ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) requestInfo2 := RequestInfo{ RequestID: "req-54321", UserID: "user-09876", TraceID: "trace-fedcba", StartTime: time.Now(), } ctx2 = withRequestInfo(ctx2, requestInfo2) // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) logWithContext(ctx2, "Cancelling request early") cancel2() }() if err := handleRequest(ctx2); err != nil { logWithContext(ctx2, fmt.Sprintf("Request failed: %v", err)) } } HTTP Server with Context package main import ( "context" "encoding/json" "fmt" "log" "math/rand" "net/http" "strconv" "time" ) // Response represents an API response type Response struct { Message string `json:"message"` RequestID string `json:"request_id"` Duration time.Duration `json:"duration"` Data interface{} `json:"data,omitempty"` } // middleware adds request ID and timeout to context func middleware(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Generate request ID requestID := fmt.Sprintf("req-%d", time.Now().UnixNano()) // Get timeout from query parameter (default 5 seconds) timeoutStr := r.URL.Query().Get("timeout") timeout := 5 * time.Second if timeoutStr != "" { if t, err := time.ParseDuration(timeoutStr); err == nil { timeout = t } } // Create context with timeout ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() // Add request ID to context ctx = context.WithValue(ctx, RequestIDKey, requestID) // Create new request with updated context r = r.WithContext(ctx) // Add request ID to response headers w.Header().Set("X-Request-ID", requestID) next(w, r) } } // simulateSlowOperation simulates a slow operation that respects context func simulateSlowOperation(ctx context.Context, duration time.Duration) (string, error) { select { case <-time.After(duration): return fmt.Sprintf("Operation completed after %v", duration), nil case <-ctx.Done(): return "", ctx.Err() } } // fastHandler handles requests quickly func fastHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) result, err := simulateSlowOperation(ctx, 100*time.Millisecond) duration := time.Since(start) response := Response{ RequestID: requestID, Duration: duration, } if err != nil { response.Message = "Request failed" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // slowHandler handles requests that might timeout func slowHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Random duration between 1-10 seconds duration := time.Duration(1+rand.Intn(10)) * time.Second result, err := simulateSlowOperation(ctx, duration) elapsed := time.Since(start) response := Response{ RequestID: requestID, Duration: elapsed, } if err != nil { response.Message = "Request timed out or cancelled" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // batchHandler processes multiple operations func batchHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Get batch size from query parameter batchSizeStr := r.URL.Query().Get("size") batchSize := 3 if batchSizeStr != "" { if size, err := strconv.Atoi(batchSizeStr); err == nil && size > 0 { batchSize = size } } results := make([]string, 0, batchSize) // Process operations sequentially, checking context each time for i := 0; i < batchSize; i++ { select { case <-ctx.Done(): // Context cancelled, return partial results response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch cancelled after %d/%d operations", i, batchSize), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return default: } result, err := simulateSlowOperation(ctx, 200*time.Millisecond) if err != nil { response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch failed at operation %d: %v", i+1, err), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return } results = append(results, fmt.Sprintf("Op%d: %s", i+1, result)) } response := Response{ RequestID: requestID, Duration: time.Since(start), Message: "Batch completed successfully", Data: results, } json.NewEncoder(w).Encode(response) } func main() { http.HandleFunc("/fast", middleware(fastHandler)) http.HandleFunc("/slow", middleware(slowHandler)) http.HandleFunc("/batch", middleware(batchHandler)) fmt.Println("Server starting on :8080") fmt.Println("Endpoints:") fmt.Println(" GET /fast - Fast operation (100ms)") fmt.Println(" GET /slow - Slow operation (1-10s random)") fmt.Println(" GET /batch?size=N - Batch operations") fmt.Println(" Add ?timeout=5s to set custom timeout") log.Fatal(http.ListenAndServe(":8080", nil)) } Context Propagation in Goroutines package main import ( "context" "fmt" "sync" "time" ) // Worker represents a worker that processes tasks type Worker struct { ID int Name string } // ProcessTask processes a task with context func (w *Worker) ProcessTask(ctx context.Context, taskID int) error { requestID := getRequestID(ctx) fmt.Printf("Worker %d (%s) [%s]: Starting task %d\n", w.ID, w.Name, requestID, taskID) // Simulate work with multiple steps for step := 1; step <= 3; step++ { select { case <-time.After(200 * time.Millisecond): fmt.Printf("Worker %d (%s) [%s]: Task %d step %d completed\n", w.ID, w.Name, requestID, taskID, step) case <-ctx.Done(): fmt.Printf("Worker %d (%s) [%s]: Task %d cancelled at step %d: %v\n", w.ID, w.Name, requestID, taskID, step, ctx.Err()) return ctx.Err() } } fmt.Printf("Worker %d (%s) [%s]: Task %d completed successfully\n", w.ID, w.Name, requestID, taskID) return nil } // TaskManager manages task distribution type TaskManager struct { workers []Worker } // NewTaskManager creates a new task manager func NewTaskManager() *TaskManager { return &TaskManager{ workers: []Worker{ {ID: 1, Name: "Alice"}, {ID: 2, Name: "Bob"}, {ID: 3, Name: "Charlie"}, }, } } // ProcessTasksConcurrently processes tasks using multiple workers func (tm *TaskManager) ProcessTasksConcurrently(ctx context.Context, taskCount int) error { var wg sync.WaitGroup taskChan := make(chan int, taskCount) errorChan := make(chan error, len(tm.workers)) // Send tasks to channel go func() { defer close(taskChan) for i := 1; i <= taskCount; i++ { select { case taskChan <- i: case <-ctx.Done(): return } } }() // Start workers for _, worker := range tm.workers { wg.Add(1) go func(w Worker) { defer wg.Done() for { select { case taskID, ok := <-taskChan: if !ok { return // No more tasks } if err := w.ProcessTask(ctx, taskID); err != nil { select { case errorChan <- err: case <-ctx.Done(): } return } case <-ctx.Done(): return } } }(worker) } // Wait for completion or cancellation done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: close(errorChan) // Check for errors for err := range errorChan { if err != nil { return err } } return nil case <-ctx.Done(): return ctx.Err() } } func main() { manager := NewTaskManager() // Example 1: Normal completion fmt.Println("=== Normal Completion ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) ctx1 = context.WithValue(ctx1, RequestIDKey, "batch-001") defer cancel1() err := manager.ProcessTasksConcurrently(ctx1, 6) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 2: Timeout scenario fmt.Println("\n=== Timeout Scenario ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) ctx2 = context.WithValue(ctx2, RequestIDKey, "batch-002") defer cancel2() err = manager.ProcessTasksConcurrently(ctx2, 10) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 3: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx3, cancel3 := context.WithCancel(context.Background()) ctx3 = context.WithValue(ctx3, RequestIDKey, "batch-003") // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) fmt.Println("Manually cancelling batch...") cancel3() }() err = manager.ProcessTasksConcurrently(ctx3, 8) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } } Best Practices Always Accept Context: Functions that might block should accept context as first parameter Don’t Store Context: Pass context as parameter, don’t store in structs Use context.TODO(): When you don’t have context but need one Derive Contexts: Create child contexts from parent contexts Handle Cancellation: Always check ctx.Done() in long-running operations Limit Context Values: Use sparingly and for request-scoped data only Use Typed Keys: Define custom types for context keys to avoid collisions Common Pitfalls 1. Ignoring Context Cancellation // Bad: Ignoring context cancellation func badOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { // Long operation without checking context time.Sleep(10 * time.Millisecond) // Process item i } return nil } // Good: Checking context regularly func goodOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { select { case <-ctx.Done(): return ctx.Err() default: } time.Sleep(10 * time.Millisecond) // Process item i } return nil } 2. Using Context for Optional Parameters // Bad: Using context for optional parameters func badFunction(ctx context.Context) { if timeout, ok := ctx.Value("timeout").(time.Duration); ok { // Use timeout } } // Good: Use function parameters for optional values func goodFunction(ctx context.Context, timeout time.Duration) { // Use timeout parameter } The Context pattern is fundamental for building robust, cancellable operations in Go. It enables graceful handling of timeouts, cancellations, and request-scoped data, making your applications more responsive and resource-efficient. ...

    June 19, 2024 · 10 min · Rafiul Alam

    Circuit Breaker Pattern in Go

    Go Concurrency Patterns Series: ← Context Pattern | Series Overview | Rate Limiter → What is the Circuit Breaker Pattern? The Circuit Breaker pattern prevents cascading failures in distributed systems by monitoring for failures and temporarily stopping calls to failing services. Like an electrical circuit breaker, it “trips” when failures exceed a threshold, giving the failing service time to recover. States: Closed: Normal operation, requests pass through Open: Failing fast, requests are rejected immediately Half-Open: Testing if service has recovered Real-World Use Cases Microservices: Prevent cascade failures between services Database Connections: Handle database outages gracefully External APIs: Deal with third-party service failures Payment Processing: Handle payment gateway issues File Systems: Manage disk I/O failures Network Operations: Handle network partitions Basic Circuit Breaker Implementation package main import ( "context" "errors" "fmt" "sync" "time" ) // State represents the circuit breaker state type State int const ( StateClosed State = iota StateOpen StateHalfOpen ) func (s State) String() string { switch s { case StateClosed: return "CLOSED" case StateOpen: return "OPEN" case StateHalfOpen: return "HALF_OPEN" default: return "UNKNOWN" } } // CircuitBreaker implements the circuit breaker pattern type CircuitBreaker struct { mu sync.RWMutex state State failureCount int successCount int lastFailureTime time.Time nextAttemptTime time.Time // Configuration maxFailures int resetTimeout time.Duration halfOpenMaxCalls int } // Config holds circuit breaker configuration type Config struct { MaxFailures int // Number of failures before opening ResetTimeout time.Duration // Time to wait before trying half-open HalfOpenMaxCalls int // Max calls allowed in half-open state } // NewCircuitBreaker creates a new circuit breaker func NewCircuitBreaker(config Config) *CircuitBreaker { return &CircuitBreaker{ state: StateClosed, maxFailures: config.MaxFailures, resetTimeout: config.ResetTimeout, halfOpenMaxCalls: config.HalfOpenMaxCalls, } } // Execute runs the given function with circuit breaker protection func (cb *CircuitBreaker) Execute(fn func() error) error { if !cb.allowRequest() { return errors.New("circuit breaker is open") } err := fn() cb.recordResult(err) return err } // allowRequest determines if a request should be allowed func (cb *CircuitBreaker) allowRequest() bool { cb.mu.Lock() defer cb.mu.Unlock() now := time.Now() switch cb.state { case StateClosed: return true case StateOpen: if now.After(cb.nextAttemptTime) { cb.state = StateHalfOpen cb.successCount = 0 cb.failureCount = 0 fmt.Printf("Circuit breaker transitioning to HALF_OPEN\n") return true } return false case StateHalfOpen: return cb.successCount + cb.failureCount < cb.halfOpenMaxCalls default: return false } } // recordResult records the result of a function call func (cb *CircuitBreaker) recordResult(err error) { cb.mu.Lock() defer cb.mu.Unlock() if err != nil { cb.onFailure() } else { cb.onSuccess() } } // onFailure handles a failure func (cb *CircuitBreaker) onFailure() { cb.failureCount++ cb.lastFailureTime = time.Now() switch cb.state { case StateClosed: if cb.failureCount >= cb.maxFailures { cb.state = StateOpen cb.nextAttemptTime = time.Now().Add(cb.resetTimeout) fmt.Printf("Circuit breaker OPENED after %d failures\n", cb.failureCount) } case StateHalfOpen: cb.state = StateOpen cb.nextAttemptTime = time.Now().Add(cb.resetTimeout) fmt.Printf("Circuit breaker returned to OPEN from HALF_OPEN\n") } } // onSuccess handles a success func (cb *CircuitBreaker) onSuccess() { switch cb.state { case StateClosed: cb.failureCount = 0 case StateHalfOpen: cb.successCount++ if cb.successCount >= cb.halfOpenMaxCalls { cb.state = StateClosed cb.failureCount = 0 fmt.Printf("Circuit breaker CLOSED after successful recovery\n") } } } // GetState returns the current state func (cb *CircuitBreaker) GetState() State { cb.mu.RLock() defer cb.mu.RUnlock() return cb.state } // GetStats returns current statistics func (cb *CircuitBreaker) GetStats() (State, int, int) { cb.mu.RLock() defer cb.mu.RUnlock() return cb.state, cb.failureCount, cb.successCount } // simulateService simulates a service that might fail func simulateService(shouldFail bool, delay time.Duration) func() error { return func() error { time.Sleep(delay) if shouldFail { return errors.New("service failure") } return nil } } func main() { config := Config{ MaxFailures: 3, ResetTimeout: 2 * time.Second, HalfOpenMaxCalls: 2, } cb := NewCircuitBreaker(config) // Test scenario: failures followed by recovery scenarios := []struct { name string shouldFail bool delay time.Duration }{ {"Success 1", false, 100 * time.Millisecond}, {"Success 2", false, 100 * time.Millisecond}, {"Failure 1", true, 100 * time.Millisecond}, {"Failure 2", true, 100 * time.Millisecond}, {"Failure 3", true, 100 * time.Millisecond}, // Should open circuit {"Blocked 1", false, 100 * time.Millisecond}, // Should be blocked {"Blocked 2", false, 100 * time.Millisecond}, // Should be blocked } for i, scenario := range scenarios { fmt.Printf("\n--- Test %d: %s ---\n", i+1, scenario.name) err := cb.Execute(simulateService(scenario.shouldFail, scenario.delay)) state, failures, successes := cb.GetStats() if err != nil { fmt.Printf("Result: ERROR - %v\n", err) } else { fmt.Printf("Result: SUCCESS\n") } fmt.Printf("State: %s, Failures: %d, Successes: %d\n", state, failures, successes) time.Sleep(100 * time.Millisecond) } // Wait for reset timeout and test recovery fmt.Printf("\n--- Waiting for reset timeout (%v) ---\n", config.ResetTimeout) time.Sleep(config.ResetTimeout + 100*time.Millisecond) // Test recovery recoveryTests := []struct { name string shouldFail bool }{ {"Recovery 1", false}, // Should succeed and move to half-open {"Recovery 2", false}, // Should succeed and close circuit {"Success after recovery", false}, } for i, test := range recoveryTests { fmt.Printf("\n--- Recovery Test %d: %s ---\n", i+1, test.name) err := cb.Execute(simulateService(test.shouldFail, 100*time.Millisecond)) state, failures, successes := cb.GetStats() if err != nil { fmt.Printf("Result: ERROR - %v\n", err) } else { fmt.Printf("Result: SUCCESS\n") } fmt.Printf("State: %s, Failures: %d, Successes: %d\n", state, failures, successes) } } Advanced Circuit Breaker with Metrics package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // Metrics tracks circuit breaker statistics type Metrics struct { totalRequests int64 successfulCalls int64 failedCalls int64 rejectedCalls int64 timeouts int64 stateChanges int64 } // AdvancedCircuitBreaker with comprehensive metrics and monitoring type AdvancedCircuitBreaker struct { mu sync.RWMutex state State failureCount int successCount int lastFailureTime time.Time nextAttemptTime time.Time stateChangeTime time.Time // Configuration maxFailures int resetTimeout time.Duration halfOpenMaxCalls int callTimeout time.Duration // Metrics metrics *Metrics // Monitoring onStateChange func(from, to State) } // AdvancedConfig holds advanced circuit breaker configuration type AdvancedConfig struct { MaxFailures int ResetTimeout time.Duration HalfOpenMaxCalls int CallTimeout time.Duration OnStateChange func(from, to State) } // NewAdvancedCircuitBreaker creates a new advanced circuit breaker func NewAdvancedCircuitBreaker(config AdvancedConfig) *AdvancedCircuitBreaker { return &AdvancedCircuitBreaker{ state: StateClosed, maxFailures: config.MaxFailures, resetTimeout: config.ResetTimeout, halfOpenMaxCalls: config.HalfOpenMaxCalls, callTimeout: config.CallTimeout, metrics: &Metrics{}, onStateChange: config.OnStateChange, stateChangeTime: time.Now(), } } // ExecuteWithContext runs function with context and timeout func (acb *AdvancedCircuitBreaker) ExecuteWithContext(ctx context.Context, fn func(context.Context) error) error { atomic.AddInt64(&acb.metrics.totalRequests, 1) if !acb.allowRequest() { atomic.AddInt64(&acb.metrics.rejectedCalls, 1) return fmt.Errorf("circuit breaker is %s", acb.GetState()) } // Create context with timeout if specified if acb.callTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, acb.callTimeout) defer cancel() } // Execute with timeout monitoring done := make(chan error, 1) go func() { done <- fn(ctx) }() select { case err := <-done: acb.recordResult(err) return err case <-ctx.Done(): atomic.AddInt64(&acb.metrics.timeouts, 1) acb.recordResult(ctx.Err()) return ctx.Err() } } // allowRequest determines if a request should be allowed func (acb *AdvancedCircuitBreaker) allowRequest() bool { acb.mu.Lock() defer acb.mu.Unlock() now := time.Now() switch acb.state { case StateClosed: return true case StateOpen: if now.After(acb.nextAttemptTime) { acb.changeState(StateHalfOpen) acb.successCount = 0 acb.failureCount = 0 return true } return false case StateHalfOpen: return acb.successCount + acb.failureCount < acb.halfOpenMaxCalls default: return false } } // recordResult records the result of a function call func (acb *AdvancedCircuitBreaker) recordResult(err error) { acb.mu.Lock() defer acb.mu.Unlock() if err != nil { atomic.AddInt64(&acb.metrics.failedCalls, 1) acb.onFailure() } else { atomic.AddInt64(&acb.metrics.successfulCalls, 1) acb.onSuccess() } } // changeState changes the circuit breaker state func (acb *AdvancedCircuitBreaker) changeState(newState State) { if acb.state != newState { oldState := acb.state acb.state = newState acb.stateChangeTime = time.Now() atomic.AddInt64(&acb.metrics.stateChanges, 1) if acb.onStateChange != nil { go acb.onStateChange(oldState, newState) } } } // onFailure handles a failure func (acb *AdvancedCircuitBreaker) onFailure() { acb.failureCount++ acb.lastFailureTime = time.Now() switch acb.state { case StateClosed: if acb.failureCount >= acb.maxFailures { acb.changeState(StateOpen) acb.nextAttemptTime = time.Now().Add(acb.resetTimeout) } case StateHalfOpen: acb.changeState(StateOpen) acb.nextAttemptTime = time.Now().Add(acb.resetTimeout) } } // onSuccess handles a success func (acb *AdvancedCircuitBreaker) onSuccess() { switch acb.state { case StateClosed: acb.failureCount = 0 case StateHalfOpen: acb.successCount++ if acb.successCount >= acb.halfOpenMaxCalls { acb.changeState(StateClosed) acb.failureCount = 0 } } } // GetMetrics returns current metrics func (acb *AdvancedCircuitBreaker) GetMetrics() Metrics { return Metrics{ totalRequests: atomic.LoadInt64(&acb.metrics.totalRequests), successfulCalls: atomic.LoadInt64(&acb.metrics.successfulCalls), failedCalls: atomic.LoadInt64(&acb.metrics.failedCalls), rejectedCalls: atomic.LoadInt64(&acb.metrics.rejectedCalls), timeouts: atomic.LoadInt64(&acb.metrics.timeouts), stateChanges: atomic.LoadInt64(&acb.metrics.stateChanges), } } // GetState returns current state func (acb *AdvancedCircuitBreaker) GetState() State { acb.mu.RLock() defer acb.mu.RUnlock() return acb.state } // HealthCheck returns health information func (acb *AdvancedCircuitBreaker) HealthCheck() map[string]interface{} { acb.mu.RLock() defer acb.mu.RUnlock() metrics := acb.GetMetrics() var successRate float64 if metrics.totalRequests > 0 { successRate = float64(metrics.successfulCalls) / float64(metrics.totalRequests) * 100 } return map[string]interface{}{ "state": acb.state.String(), "failure_count": acb.failureCount, "success_count": acb.successCount, "last_failure_time": acb.lastFailureTime, "state_change_time": acb.stateChangeTime, "next_attempt_time": acb.nextAttemptTime, "total_requests": metrics.totalRequests, "successful_calls": metrics.successfulCalls, "failed_calls": metrics.failedCalls, "rejected_calls": metrics.rejectedCalls, "timeouts": metrics.timeouts, "state_changes": metrics.stateChanges, "success_rate": fmt.Sprintf("%.2f%%", successRate), } } // Service simulation type ExternalService struct { failureRate float64 latency time.Duration } func (es *ExternalService) Call(ctx context.Context, data string) error { // Simulate latency select { case <-time.After(es.latency): case <-ctx.Done(): return ctx.Err() } // Simulate random failures if time.Now().UnixNano()%100 < int64(es.failureRate*100) { return fmt.Errorf("service failure for data: %s", data) } return nil } func main() { // Create service that fails 30% of the time service := &ExternalService{ failureRate: 0.3, latency: 100 * time.Millisecond, } config := AdvancedConfig{ MaxFailures: 3, ResetTimeout: 2 * time.Second, HalfOpenMaxCalls: 2, CallTimeout: 500 * time.Millisecond, OnStateChange: func(from, to State) { fmt.Printf(" Circuit breaker state changed: %s -> %s\n", from, to) }, } cb := NewAdvancedCircuitBreaker(config) // Monitor circuit breaker health go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { health := cb.HealthCheck() fmt.Printf(" Health: State=%s, Success Rate=%s, Total=%d, Failed=%d, Rejected=%d\n", health["state"], health["success_rate"], health["total_requests"], health["failed_calls"], health["rejected_calls"]) } }() // Simulate load var wg sync.WaitGroup for i := 0; i < 50; i++ { wg.Add(1) go func(id int) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err := cb.ExecuteWithContext(ctx, func(ctx context.Context) error { return service.Call(ctx, fmt.Sprintf("request-%d", id)) }) if err != nil { fmt.Printf(" Request %d failed: %v\n", id, err) } else { fmt.Printf(" Request %d succeeded\n", id) } time.Sleep(200 * time.Millisecond) }(i) } wg.Wait() // Final health report fmt.Println("\n Final Health Report:") health := cb.HealthCheck() for key, value := range health { fmt.Printf(" %s: %v\n", key, value) } } HTTP Client with Circuit Breaker package main import ( "context" "encoding/json" "fmt" "io" "net/http" "time" ) // HTTPClient wraps http.Client with circuit breaker type HTTPClient struct { client *http.Client circuitBreaker *AdvancedCircuitBreaker } // NewHTTPClient creates a new HTTP client with circuit breaker func NewHTTPClient(timeout time.Duration, cbConfig AdvancedConfig) *HTTPClient { return &HTTPClient{ client: &http.Client{ Timeout: timeout, }, circuitBreaker: NewAdvancedCircuitBreaker(cbConfig), } } // Get performs a GET request with circuit breaker protection func (hc *HTTPClient) Get(ctx context.Context, url string) (*http.Response, error) { var resp *http.Response err := hc.circuitBreaker.ExecuteWithContext(ctx, func(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err } var httpErr error resp, httpErr = hc.client.Do(req) if httpErr != nil { return httpErr } // Consider 5xx status codes as failures if resp.StatusCode >= 500 { resp.Body.Close() return fmt.Errorf("server error: %d", resp.StatusCode) } return nil }) return resp, err } // GetJSON performs a GET request and unmarshals JSON response func (hc *HTTPClient) GetJSON(ctx context.Context, url string, target interface{}) error { resp, err := hc.Get(ctx, url) if err != nil { return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } return json.Unmarshal(body, target) } // GetHealth returns circuit breaker health func (hc *HTTPClient) GetHealth() map[string]interface{} { return hc.circuitBreaker.HealthCheck() } // Example usage func main() { config := AdvancedConfig{ MaxFailures: 3, ResetTimeout: 5 * time.Second, HalfOpenMaxCalls: 2, CallTimeout: 2 * time.Second, OnStateChange: func(from, to State) { fmt.Printf(" HTTP Client circuit breaker: %s -> %s\n", from, to) }, } client := NewHTTPClient(3*time.Second, config) // Test URLs (some will fail) urls := []string{ "https://httpbin.org/status/200", // Success "https://httpbin.org/status/500", // Server error "https://httpbin.org/delay/1", // Success with delay "https://httpbin.org/status/503", // Server error "https://httpbin.org/status/500", // Server error "https://httpbin.org/status/502", // Server error (should open circuit) "https://httpbin.org/status/200", // Should be rejected "https://httpbin.org/status/200", // Should be rejected } for i, url := range urls { fmt.Printf("\n--- Request %d: %s ---\n", i+1, url) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := client.Get(ctx, url) if err != nil { fmt.Printf(" Error: %v\n", err) } else { fmt.Printf(" Success: %d %s\n", resp.StatusCode, resp.Status) resp.Body.Close() } cancel() // Show current health health := client.GetHealth() fmt.Printf("State: %s, Success Rate: %s\n", health["state"], health["success_rate"]) time.Sleep(1 * time.Second) } // Wait for circuit to potentially reset fmt.Println("\n--- Waiting for potential reset ---") time.Sleep(6 * time.Second) // Try again fmt.Println("\n--- Testing recovery ---") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := client.Get(ctx, "https://httpbin.org/status/200") if err != nil { fmt.Printf(" Recovery test failed: %v\n", err) } else { fmt.Printf(" Recovery test succeeded: %d %s\n", resp.StatusCode, resp.Status) resp.Body.Close() } // Final health report fmt.Println("\n Final Health Report:") health := client.GetHealth() for key, value := range health { fmt.Printf(" %s: %v\n", key, value) } } Best Practices Choose Appropriate Thresholds: Set failure thresholds based on service characteristics Monitor State Changes: Log state transitions for debugging Implement Fallbacks: Provide alternative responses when circuit is open Use Timeouts: Combine with timeouts to handle slow responses Gradual Recovery: Use half-open state to test service recovery Metrics Collection: Track success rates, response times, and state changes Configuration: Make thresholds configurable for different environments Common Pitfalls Too Sensitive: Setting thresholds too low causes unnecessary trips Too Tolerant: High thresholds don’t protect against cascading failures No Fallbacks: Not providing alternative responses when circuit is open Ignoring Context: Not respecting context cancellation in protected functions Poor Monitoring: Not tracking circuit breaker metrics and health The Circuit Breaker pattern is essential for building resilient distributed systems. It prevents cascading failures, provides fast failure responses, and allows services time to recover, making your applications more robust and reliable. ...

    June 12, 2024 · 11 min · Rafiul Alam

    Actor Model Pattern in Go

    Go Concurrency Patterns Series: ← Semaphore Pattern | Series Overview What is the Actor Model? The Actor Model is a conceptual model for concurrent computation where “actors” are the fundamental units of computation. Each actor has its own isolated state, processes messages sequentially, and can create other actors, send messages, or change its behavior in response to messages. Key Principles: Isolated State: Each actor maintains its own private state Message Passing: Actors communicate only through asynchronous messages Sequential Processing: Each actor processes one message at a time Location Transparency: Actors can be local or remote Fault Tolerance: Actor failures are isolated and recoverable Real-World Use Cases Distributed Systems: Microservices communication Game Servers: Player state management IoT Systems: Device state and communication Financial Systems: Transaction processing Chat Applications: User session management Workflow Engines: Task orchestration Basic Actor Implementation package main import ( "context" "fmt" "sync" "time" ) // Message represents a message sent to an actor type Message interface{} // Actor represents the basic actor interface type Actor interface { Receive(message Message) Start(ctx context.Context) Stop() Send(message Message) GetID() string } // BaseActor provides basic actor functionality type BaseActor struct { id string mailbox chan Message quit chan struct{} wg sync.WaitGroup behavior func(Message) } // NewBaseActor creates a new base actor func NewBaseActor(id string, behavior func(Message)) *BaseActor { return &BaseActor{ id: id, mailbox: make(chan Message, 100), quit: make(chan struct{}), behavior: behavior, } } // GetID returns the actor ID func (ba *BaseActor) GetID() string { return ba.id } // Send sends a message to the actor func (ba *BaseActor) Send(message Message) { select { case ba.mailbox <- message: case <-ba.quit: fmt.Printf("Actor %s: Cannot send message, actor is stopped\n", ba.id) } } // Start starts the actor's message processing loop func (ba *BaseActor) Start(ctx context.Context) { ba.wg.Add(1) go func() { defer ba.wg.Done() fmt.Printf("Actor %s: Started\n", ba.id) for { select { case message := <-ba.mailbox: ba.Receive(message) case <-ba.quit: fmt.Printf("Actor %s: Stopped\n", ba.id) return case <-ctx.Done(): fmt.Printf("Actor %s: Context cancelled\n", ba.id) return } } }() } // Receive processes a received message func (ba *BaseActor) Receive(message Message) { if ba.behavior != nil { ba.behavior(message) } } // Stop stops the actor func (ba *BaseActor) Stop() { close(ba.quit) ba.wg.Wait() } // Common message types type StartMessage struct{} type StopMessage struct{} type PingMessage struct { Sender Actor } type PongMessage struct { Sender Actor } // CounterActor demonstrates a stateful actor type CounterActor struct { *BaseActor count int } // CounterMessage types type IncrementMessage struct{} type DecrementMessage struct{} type GetCountMessage struct { ResponseChan chan int } // NewCounterActor creates a new counter actor func NewCounterActor(id string) *CounterActor { ca := &CounterActor{ count: 0, } ca.BaseActor = NewBaseActor(id, ca.handleMessage) return ca } // handleMessage handles counter-specific messages func (ca *CounterActor) handleMessage(message Message) { switch msg := message.(type) { case IncrementMessage: ca.count++ fmt.Printf("Counter %s: Incremented to %d\n", ca.id, ca.count) case DecrementMessage: ca.count-- fmt.Printf("Counter %s: Decremented to %d\n", ca.id, ca.count) case GetCountMessage: fmt.Printf("Counter %s: Current count is %d\n", ca.id, ca.count) msg.ResponseChan <- ca.count case PingMessage: fmt.Printf("Counter %s: Received ping from %s\n", ca.id, msg.Sender.GetID()) msg.Sender.Send(PongMessage{Sender: ca}) default: fmt.Printf("Counter %s: Unknown message type: %T\n", ca.id, message) } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("=== Basic Actor Model Demo ===") // Create counter actors counter1 := NewCounterActor("counter-1") counter2 := NewCounterActor("counter-2") // Start actors counter1.Start(ctx) counter2.Start(ctx) defer counter1.Stop() defer counter2.Stop() // Send messages to actors counter1.Send(IncrementMessage{}) counter1.Send(IncrementMessage{}) counter1.Send(IncrementMessage{}) counter2.Send(IncrementMessage{}) counter2.Send(DecrementMessage{}) // Ping-pong between actors counter1.Send(PingMessage{Sender: counter2}) // Get current counts responseChan1 := make(chan int) responseChan2 := make(chan int) counter1.Send(GetCountMessage{ResponseChan: responseChan1}) counter2.Send(GetCountMessage{ResponseChan: responseChan2}) count1 := <-responseChan1 count2 := <-responseChan2 fmt.Printf("Final counts - Counter1: %d, Counter2: %d\n", count1, count2) time.Sleep(1 * time.Second) } Advanced Actor System with Supervision package main import ( "context" "fmt" "sync" "time" ) // ActorRef represents a reference to an actor type ActorRef struct { id string mailbox chan Message } // Send sends a message to the actor func (ref *ActorRef) Send(message Message) { select { case ref.mailbox <- message: default: fmt.Printf("ActorRef %s: Mailbox full, dropping message\n", ref.id) } } // GetID returns the actor ID func (ref *ActorRef) GetID() string { return ref.id } // ActorSystem manages actors and provides supervision type ActorSystem struct { mu sync.RWMutex actors map[string]*ManagedActor ctx context.Context cancel context.CancelFunc } // ManagedActor wraps an actor with management capabilities type ManagedActor struct { ref *ActorRef behavior ActorBehavior supervisor *ActorRef children map[string]*ActorRef mu sync.RWMutex restarts int maxRestarts int } // ActorBehavior defines how an actor processes messages type ActorBehavior func(ctx ActorContext, message Message) // ActorContext provides context for actor operations type ActorContext struct { Self *ActorRef System *ActorSystem Sender *ActorRef } // NewActorSystem creates a new actor system func NewActorSystem() *ActorSystem { ctx, cancel := context.WithCancel(context.Background()) return &ActorSystem{ actors: make(map[string]*ManagedActor), ctx: ctx, cancel: cancel, } } // ActorOf creates a new actor func (as *ActorSystem) ActorOf(id string, behavior ActorBehavior, supervisor *ActorRef) *ActorRef { as.mu.Lock() defer as.mu.Unlock() ref := &ActorRef{ id: id, mailbox: make(chan Message, 100), } actor := &ManagedActor{ ref: ref, behavior: behavior, supervisor: supervisor, children: make(map[string]*ActorRef), maxRestarts: 3, } as.actors[id] = actor // Start actor goroutine go as.runActor(actor) fmt.Printf("ActorSystem: Created actor %s\n", id) return ref } // runActor runs the actor's message processing loop func (as *ActorSystem) runActor(actor *ManagedActor) { defer func() { if r := recover(); r != nil { fmt.Printf("Actor %s: Panic recovered: %v\n", actor.ref.id, r) as.handleActorFailure(actor, fmt.Errorf("panic: %v", r)) } }() for { select { case message := <-actor.ref.mailbox: ctx := ActorContext{ Self: actor.ref, System: as, } actor.behavior(ctx, message) case <-as.ctx.Done(): fmt.Printf("Actor %s: System shutdown\n", actor.ref.id) return } } } // handleActorFailure handles actor failures and restarts func (as *ActorSystem) handleActorFailure(actor *ManagedActor, err error) { actor.mu.Lock() defer actor.mu.Unlock() actor.restarts++ fmt.Printf("Actor %s: Failed with error: %v (restart %d/%d)\n", actor.ref.id, err, actor.restarts, actor.maxRestarts) if actor.restarts <= actor.maxRestarts { // Restart the actor go as.runActor(actor) fmt.Printf("Actor %s: Restarted\n", actor.ref.id) } else { // Stop the actor and notify supervisor fmt.Printf("Actor %s: Max restarts exceeded, stopping\n", actor.ref.id) if actor.supervisor != nil { actor.supervisor.Send(ActorFailedMessage{ FailedActor: actor.ref, Error: err, }) } } } // Stop stops the actor system func (as *ActorSystem) Stop() { as.cancel() // Wait a bit for actors to stop gracefully time.Sleep(100 * time.Millisecond) fmt.Println("ActorSystem: Stopped") } // Message types for actor system type ActorFailedMessage struct { FailedActor *ActorRef Error error } type CreateChildMessage struct { ChildID string Behavior ActorBehavior ResponseChan chan *ActorRef } type WorkMessage struct { Data string } type ResultMessage struct { Result string From *ActorRef } // WorkerActor demonstrates a worker that can fail func WorkerBehavior(ctx ActorContext, message Message) { switch msg := message.(type) { case WorkMessage: fmt.Printf("Worker %s: Processing work: %s\n", ctx.Self.id, msg.Data) // Simulate work time.Sleep(100 * time.Millisecond) // Simulate random failures if len(msg.Data)%7 == 0 { panic("simulated worker failure") } result := fmt.Sprintf("processed-%s", msg.Data) fmt.Printf("Worker %s: Work completed: %s\n", ctx.Self.id, result) default: fmt.Printf("Worker %s: Unknown message: %T\n", ctx.Self.id, message) } } // SupervisorActor manages worker actors func SupervisorBehavior(ctx ActorContext, message Message) { switch msg := message.(type) { case CreateChildMessage: childRef := ctx.System.ActorOf(msg.ChildID, msg.Behavior, ctx.Self) msg.ResponseChan <- childRef case ActorFailedMessage: fmt.Printf("Supervisor %s: Child actor %s failed: %v\n", ctx.Self.id, msg.FailedActor.id, msg.Error) // Create replacement worker newWorkerID := fmt.Sprintf("%s-replacement", msg.FailedActor.id) ctx.System.ActorOf(newWorkerID, WorkerBehavior, ctx.Self) fmt.Printf("Supervisor %s: Created replacement worker %s\n", ctx.Self.id, newWorkerID) case WorkMessage: // Delegate work to children (simplified) fmt.Printf("Supervisor %s: Delegating work: %s\n", ctx.Self.id, msg.Data) default: fmt.Printf("Supervisor %s: Unknown message: %T\n", ctx.Self.id, message) } } func main() { fmt.Println("=== Advanced Actor System Demo ===") system := NewActorSystem() defer system.Stop() // Create supervisor supervisor := system.ActorOf("supervisor", SupervisorBehavior, nil) // Create workers through supervisor responseChan := make(chan *ActorRef) supervisor.Send(CreateChildMessage{ ChildID: "worker-1", Behavior: WorkerBehavior, ResponseChan: responseChan, }) worker1 := <-responseChan supervisor.Send(CreateChildMessage{ ChildID: "worker-2", Behavior: WorkerBehavior, ResponseChan: responseChan, }) worker2 := <-responseChan // Send work to workers workItems := []string{ "task-1", "task-2", "task-3", "failure", // "failure" will cause panic "task-4", "task-5", "another", "task-6", } for i, work := range workItems { var worker *ActorRef if i%2 == 0 { worker = worker1 } else { worker = worker2 } worker.Send(WorkMessage{Data: work}) time.Sleep(200 * time.Millisecond) } // Wait for processing time.Sleep(3 * time.Second) } Distributed Actor System package main import ( "context" "encoding/json" "fmt" "net" "sync" "time" ) // RemoteMessage represents a message that can be sent over network type RemoteMessage struct { Type string `json:"type"` Payload interface{} `json:"payload"` From string `json:"from"` To string `json:"to"` } // DistributedActorSystem extends ActorSystem with network capabilities type DistributedActorSystem struct { *ActorSystem nodeID string address string listener net.Listener peers map[string]net.Conn peersMu sync.RWMutex } // NewDistributedActorSystem creates a new distributed actor system func NewDistributedActorSystem(nodeID, address string) *DistributedActorSystem { return &DistributedActorSystem{ ActorSystem: NewActorSystem(), nodeID: nodeID, address: address, peers: make(map[string]net.Conn), } } // Start starts the distributed actor system func (das *DistributedActorSystem) Start() error { listener, err := net.Listen("tcp", das.address) if err != nil { return err } das.listener = listener // Accept incoming connections go func() { for { conn, err := listener.Accept() if err != nil { return } go das.handleConnection(conn) } }() fmt.Printf("DistributedActorSystem %s: Started on %s\n", das.nodeID, das.address) return nil } // ConnectToPeer connects to a peer node func (das *DistributedActorSystem) ConnectToPeer(peerID, peerAddress string) error { conn, err := net.Dial("tcp", peerAddress) if err != nil { return err } das.peersMu.Lock() das.peers[peerID] = conn das.peersMu.Unlock() go das.handleConnection(conn) fmt.Printf("DistributedActorSystem %s: Connected to peer %s\n", das.nodeID, peerID) return nil } // handleConnection handles incoming network connections func (das *DistributedActorSystem) handleConnection(conn net.Conn) { defer conn.Close() decoder := json.NewDecoder(conn) for { var msg RemoteMessage if err := decoder.Decode(&msg); err != nil { return } das.handleRemoteMessage(msg) } } // handleRemoteMessage processes remote messages func (das *DistributedActorSystem) handleRemoteMessage(msg RemoteMessage) { fmt.Printf("DistributedActorSystem %s: Received remote message from %s to %s\n", das.nodeID, msg.From, msg.To) // Find local actor and deliver message das.ActorSystem.mu.RLock() actor, exists := das.ActorSystem.actors[msg.To] das.ActorSystem.mu.RUnlock() if exists { // Convert payload back to proper message type switch msg.Type { case "work": if data, ok := msg.Payload.(string); ok { actor.ref.Send(WorkMessage{Data: data}) } case "result": if result, ok := msg.Payload.(string); ok { actor.ref.Send(ResultMessage{Result: result}) } } } } // SendRemoteMessage sends a message to a remote actor func (das *DistributedActorSystem) SendRemoteMessage(peerID, actorID string, message Message) error { das.peersMu.RLock() conn, exists := das.peers[peerID] das.peersMu.RUnlock() if !exists { return fmt.Errorf("peer %s not connected", peerID) } var msgType string var payload interface{} switch msg := message.(type) { case WorkMessage: msgType = "work" payload = msg.Data case ResultMessage: msgType = "result" payload = msg.Result default: return fmt.Errorf("unsupported message type: %T", message) } remoteMsg := RemoteMessage{ Type: msgType, Payload: payload, From: das.nodeID, To: actorID, } encoder := json.NewEncoder(conn) return encoder.Encode(remoteMsg) } // Stop stops the distributed actor system func (das *DistributedActorSystem) Stop() { if das.listener != nil { das.listener.Close() } das.peersMu.Lock() for _, conn := range das.peers { conn.Close() } das.peersMu.Unlock() das.ActorSystem.Stop() } // DistributedWorkerBehavior for distributed workers func DistributedWorkerBehavior(system *DistributedActorSystem) ActorBehavior { return func(ctx ActorContext, message Message) { switch msg := message.(type) { case WorkMessage: fmt.Printf("DistributedWorker %s: Processing work: %s\n", ctx.Self.id, msg.Data) // Simulate work time.Sleep(500 * time.Millisecond) result := fmt.Sprintf("processed-%s-by-%s", msg.Data, system.nodeID) fmt.Printf("DistributedWorker %s: Work completed: %s\n", ctx.Self.id, result) // Send result back (in real system, would send to requester) default: fmt.Printf("DistributedWorker %s: Unknown message: %T\n", ctx.Self.id, message) } } } func main() { fmt.Println("=== Distributed Actor System Demo ===") // Create two distributed actor systems system1 := NewDistributedActorSystem("node1", "localhost:8001") system2 := NewDistributedActorSystem("node2", "localhost:8002") // Start systems if err := system1.Start(); err != nil { panic(err) } defer system1.Stop() if err := system2.Start(); err != nil { panic(err) } defer system2.Stop() // Wait for systems to start time.Sleep(100 * time.Millisecond) // Connect systems if err := system1.ConnectToPeer("node2", "localhost:8002"); err != nil { panic(err) } if err := system2.ConnectToPeer("node1", "localhost:8001"); err != nil { panic(err) } // Create distributed workers worker1 := system1.ActorOf("worker1", DistributedWorkerBehavior(system1), nil) worker2 := system2.ActorOf("worker2", DistributedWorkerBehavior(system2), nil) // Send local work worker1.Send(WorkMessage{Data: "local-task-1"}) worker2.Send(WorkMessage{Data: "local-task-2"}) // Send remote work system1.SendRemoteMessage("node2", "worker2", WorkMessage{Data: "remote-task-from-node1"}) system2.SendRemoteMessage("node1", "worker1", WorkMessage{Data: "remote-task-from-node2"}) // Wait for processing time.Sleep(2 * time.Second) } Best Practices Keep Actors Small: Each actor should have a single responsibility Immutable Messages: Use immutable data structures for messages Avoid Blocking: Don’t block in actor message handlers Handle Failures: Implement proper supervision and error handling Message Ordering: Design for message reordering in distributed systems Backpressure: Handle mailbox overflow gracefully Testing: Test actors in isolation with mock messages Common Pitfalls Shared State: Accidentally sharing mutable state between actors Blocking Operations: Performing blocking I/O in message handlers Large Messages: Sending large objects instead of references Circular Dependencies: Creating circular message dependencies Resource Leaks: Not properly cleaning up actor resources Synchronous Communication: Trying to make actor communication synchronous When to Use Actor Model Use When: ...

    June 5, 2024 · 11 min · Rafiul Alam