Two Generals' Problem: The Impossibility of Perfect Consensus

    The Two Generals’ Problem The Two Generals’ Problem, also known as the Two Armies Problem, is a classic thought experiment that demonstrates the impossibility of achieving perfect consensus over an unreliable communication channel. It was first formulated by E. A. Akkoyunlu, K. Ekanadham, and R. V. Huber in 1975. The Scenario Two armies need to coordinate an attack: General A and General B surround an enemy They must attack simultaneously to win If only one attacks → defeat They communicate via messengers through enemy territory Messages can be lost or intercepted Question: Can they guarantee coordinated attack? The Impossible Dilemma %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% sequenceDiagram participant GA as General A participant Enemy as Enemy Territory participant GB as General B Note over GA: Wants to attackat dawn GA->>Enemy: "Attack at dawn" Enemy->>GB: Message delivered Note over GB: Received message,but A doesn't know! GB->>Enemy: "Acknowledged" Enemy->>GA: ACK delivered? Note over GA: Received ACK,but B doesn't know! GA->>Enemy: "ACK of ACK" Enemy->>GB: Delivered? Note over GA,GB: This never ends! The Core Problem The infinite regress: ...

    October 14, 2025 · 9 min · Rafiul Alam

    The Bully Election: Leader Election in Distributed Systems

    The Bully Election Algorithm The Bully Algorithm, proposed by Hector Garcia-Molina in 1982, is a classic leader election algorithm for distributed systems. It’s called “bully” because the highest-numbered process always wins and “bullies” the others into accepting it as leader. The Scenario A distributed system needs a coordinator: N nodes in a network Each node has a unique ID (priority) One node must be elected as leader When the leader fails, a new leader must be elected Rule: The node with the highest ID wins The protocol: ...

    August 20, 2025 · 12 min · Rafiul Alam

    The Byzantine Generals: Achieving Consensus with Traitors

    The Byzantine Generals Problem The Byzantine Generals Problem, proposed by Leslie Lamport, Robert Shostak, and Marshall Pease in 1982, is one of the most important problems in distributed systems. It addresses the challenge of achieving consensus when some participants may be faulty or malicious. The Scenario Byzantine army divisions surround a city: N generals command their divisions They must coordinate: attack or retreat They communicate via messengers Some generals are traitors who send conflicting messages Goal: All loyal generals must agree on the same plan The challenge: ...

    August 14, 2025 · 11 min · Rafiul Alam

    Building Payment Gateway Integrations in Go: A Complete Guide

    Go Architecture Patterns Series: ← Previous: Saga Pattern | Series Overview Introduction Building a robust payment gateway integration is one of the most critical components of any e-commerce or financial application. Payment systems must handle multiple providers, ensure transactional integrity, implement retry mechanisms, support scheduled payments, and maintain comprehensive audit trails. In this guide, we’ll explore how to build a production-ready payment gateway integration system in Go that handles: Multiple Payment Providers: Stripe, PayPal, Square, and custom gateways Transaction Management: Atomic operations with proper rollback Retry Logic: Exponential backoff and idempotency Scheduled Payments: Recurring billing and delayed charges Data Persistence: Both SQL and NoSQL approaches Security: PCI compliance and sensitive data handling Architecture Overview Our payment system follows the Strategy pattern to support multiple payment gateways while maintaining a consistent interface. ...

    February 17, 2025 · 28 min · Rafiul Alam

    From Choreography to Durable Execution: Why Temporal Changes Everything

    The Choreography Problem: Hope-Driven Development For years, we’ve built distributed systems using choreography-Service A fires an event into the void, hoping Service B hears it. Service B processes it and fires another event, hoping Service C is listening. When something fails (and it will), we’re left scrambling through logs across multiple services, trying to piece together what happened. This is hope-driven development, and it’s fundamentally broken. Enter Temporal and the concept of Durable Execution-a paradigm shift that replaces hope with guarantees. ...

    February 8, 2025 · 11 min · Rafiul Alam

    Microservices Architecture in Go: Building Distributed Systems

    Go Architecture Patterns Series: ← Previous: Modular Monolith | Series Overview | Next: Event-Driven Architecture → What is Microservices Architecture? Microservices Architecture is an approach where an application is composed of small, independent services that communicate over a network. Each service is self-contained, owns its data, and can be deployed, scaled, and updated independently. Key Principles: Service Independence: Each service is deployed and scaled independently Single Responsibility: Each service handles a specific business capability Decentralized Data: Each service owns its database API-First Design: Services communicate through well-defined APIs Resilience: Services handle failures gracefully Technology Diversity: Services can use different technologies Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% graph TB subgraph "Client Layer" Client[Web/Mobile Client] end subgraph "API Gateway" Gateway[API Gateway / Load Balancer] end subgraph "Service Mesh" UserService[User Service] ProductService[Product Service] OrderService[Order Service] PaymentService[Payment Service] NotificationService[Notification Service] end subgraph "Data Layer" UserDB[(User DB)] ProductDB[(Product DB)] OrderDB[(Order DB)] PaymentDB[(Payment DB)] end subgraph "Infrastructure" MessageBroker[Message Broker] ServiceRegistry[Service Discovery] ConfigServer[Config Server] end Client --> Gateway Gateway --> UserService Gateway --> ProductService Gateway --> OrderService Gateway --> PaymentService UserService --> UserDB ProductService --> ProductDB OrderService --> OrderDB PaymentService --> PaymentDB OrderService -.->|HTTP/gRPC| UserService OrderService -.->|HTTP/gRPC| ProductService OrderService -.->|HTTP/gRPC| PaymentService OrderService -.->|Async| MessageBroker PaymentService -.->|Async| MessageBroker NotificationService -.->|Subscribe| MessageBroker UserService -.-> ServiceRegistry ProductService -.-> ServiceRegistry OrderService -.-> ServiceRegistry style UserService fill:#1e3a5f,color:#fff style ProductService fill:#78350f,color:#fff style OrderService fill:#134e4a,color:#fff style PaymentService fill:#4c1d95,color:#fff style NotificationService fill:#4a1e3a,color:#fff Service Communication Patterns %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% sequenceDiagram participant Client participant Gateway participant OrderSvc as Order Service participant UserSvc as User Service participant ProductSvc as Product Service participant PaymentSvc as Payment Service participant Queue as Message Queue participant NotifySvc as Notification Service Client->>Gateway: Create Order Request Gateway->>OrderSvc: POST /orders OrderSvc->>UserSvc: GET /users/{id} UserSvc-->>OrderSvc: User Data OrderSvc->>ProductSvc: GET /products/{id} ProductSvc-->>OrderSvc: Product Data OrderSvc->>ProductSvc: POST /products/reserve ProductSvc-->>OrderSvc: Stock Reserved OrderSvc->>PaymentSvc: POST /payments PaymentSvc-->>OrderSvc: Payment Success OrderSvc->>Queue: Publish OrderCreated Event Queue->>NotifySvc: OrderCreated Event NotifySvc->>NotifySvc: Send Email/SMS OrderSvc-->>Gateway: Order Created Gateway-->>Client: Response Real-World Use Cases E-commerce Platforms: Amazon, eBay with separate services for products, orders, payments Streaming Services: Netflix with services for recommendations, playback, billing Ride-Sharing Apps: Uber with services for riders, drivers, payments, routing Financial Systems: Banking apps with separate services for accounts, transactions, loans Social Media: Twitter with services for posts, timelines, notifications, messages Cloud Platforms: AWS-like platforms with independent service offerings Microservices Implementation Project Structure (Multi-Repository) microservices/ ├── user-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ └── service/ │ ├── proto/ │ │ └── user.proto │ └── go.mod ├── product-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ └── service/ │ └── go.mod ├── order-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ ├── service/ │ │ └── clients/ │ └── go.mod └── api-gateway/ ├── cmd/ │ └── server/ │ └── main.go └── go.mod Service 1: User Service // user-service/internal/domain/user.go package domain import ( "context" "errors" "time" ) type User struct { ID string `json:"id"` Email string `json:"email"` Name string `json:"name"` Active bool `json:"active"` CreatedAt time.Time `json:"created_at"` } var ( ErrUserNotFound = errors.New("user not found") ErrUserExists = errors.New("user already exists") ) type Repository interface { Create(ctx context.Context, user *User) error GetByID(ctx context.Context, id string) (*User, error) GetByEmail(ctx context.Context, email string) (*User, error) Update(ctx context.Context, user *User) error } // user-service/internal/service/user_service.go package service import ( "context" "fmt" "time" "user-service/internal/domain" ) type UserService struct { repo domain.Repository } func NewUserService(repo domain.Repository) *UserService { return &UserService{repo: repo} } func (s *UserService) CreateUser(ctx context.Context, email, name string) (*domain.User, error) { existing, _ := s.repo.GetByEmail(ctx, email) if existing != nil { return nil, domain.ErrUserExists } user := &domain.User{ ID: generateID(), Email: email, Name: name, Active: true, CreatedAt: time.Now(), } if err := s.repo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } return user, nil } func (s *UserService) GetUser(ctx context.Context, id string) (*domain.User, error) { return s.repo.GetByID(ctx, id) } func (s *UserService) ValidateUser(ctx context.Context, id string) (bool, error) { user, err := s.repo.GetByID(ctx, id) if err != nil { return false, err } return user.Active, nil } func generateID() string { return fmt.Sprintf("user_%d", time.Now().UnixNano()) } // user-service/internal/handlers/http_handler.go package handlers import ( "encoding/json" "net/http" "github.com/gorilla/mux" "user-service/internal/service" ) type HTTPHandler struct { service *service.UserService } func NewHTTPHandler(service *service.UserService) *HTTPHandler { return &HTTPHandler{service: service} } type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` } func (h *HTTPHandler) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request") return } user, err := h.service.CreateUser(r.Context(), req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusCreated, user) } func (h *HTTPHandler) GetUser(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["id"] user, err := h.service.GetUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, user) } func (h *HTTPHandler) ValidateUser(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["id"] valid, err := h.service.ValidateUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, map[string]bool{"valid": valid}) } func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func respondError(w http.ResponseWriter, status int, message string) { respondJSON(w, status, map[string]string{"error": message}) } // user-service/cmd/server/main.go package main import ( "database/sql" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "user-service/internal/handlers" "user-service/internal/repository" "user-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/users?sslmode=disable" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) svc := service.NewUserService(repo) handler := handlers.NewHTTPHandler(svc) router := mux.NewRouter() router.HandleFunc("/users", handler.CreateUser).Methods("POST") router.HandleFunc("/users/{id}", handler.GetUser).Methods("GET") router.HandleFunc("/users/{id}/validate", handler.ValidateUser).Methods("GET") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) port := os.Getenv("PORT") if port == "" { port = "8081" } log.Printf("User service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Service 2: Product Service // product-service/internal/domain/product.go package domain import ( "context" "errors" "time" ) type Product struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Price float64 `json:"price"` Stock int `json:"stock"` CreatedAt time.Time `json:"created_at"` } var ( ErrProductNotFound = errors.New("product not found") ErrInsufficientStock = errors.New("insufficient stock") ) type Repository interface { Create(ctx context.Context, product *Product) error GetByID(ctx context.Context, id string) (*Product, error) Update(ctx context.Context, product *Product) error ReserveStock(ctx context.Context, id string, quantity int) error } // product-service/internal/service/product_service.go package service import ( "context" "fmt" "time" "product-service/internal/domain" ) type ProductService struct { repo domain.Repository } func NewProductService(repo domain.Repository) *ProductService { return &ProductService{repo: repo} } func (s *ProductService) CreateProduct(ctx context.Context, name, desc string, price float64, stock int) (*domain.Product, error) { product := &domain.Product{ ID: generateID(), Name: name, Description: desc, Price: price, Stock: stock, CreatedAt: time.Now(), } if err := s.repo.Create(ctx, product); err != nil { return nil, fmt.Errorf("failed to create product: %w", err) } return product, nil } func (s *ProductService) GetProduct(ctx context.Context, id string) (*domain.Product, error) { return s.repo.GetByID(ctx, id) } func (s *ProductService) ReserveStock(ctx context.Context, id string, quantity int) error { product, err := s.repo.GetByID(ctx, id) if err != nil { return err } if product.Stock < quantity { return domain.ErrInsufficientStock } return s.repo.ReserveStock(ctx, id, quantity) } func generateID() string { return fmt.Sprintf("product_%d", time.Now().UnixNano()) } // product-service/cmd/server/main.go package main import ( "database/sql" "encoding/json" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "product-service/internal/repository" "product-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/products?sslmode=disable" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) svc := service.NewProductService(repo) router := mux.NewRouter() router.HandleFunc("/products/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) product, err := svc.GetProduct(r.Context(), vars["id"]) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } json.NewEncoder(w).Encode(product) }).Methods("GET") router.HandleFunc("/products/reserve", func(w http.ResponseWriter, r *http.Request) { var req struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := svc.ReserveStock(r.Context(), req.ProductID, req.Quantity); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]string{"status": "reserved"}) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) port := os.Getenv("PORT") if port == "" { port = "8082" } log.Printf("Product service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Service 3: Order Service (Orchestrator) // order-service/internal/clients/user_client.go package clients import ( "context" "encoding/json" "fmt" "net/http" "time" ) type UserClient struct { baseURL string httpClient *http.Client } func NewUserClient(baseURL string) *UserClient { return &UserClient{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 5 * time.Second, }, } } func (c *UserClient) ValidateUser(ctx context.Context, userID string) (bool, error) { url := fmt.Sprintf("%s/users/%s/validate", c.baseURL, userID) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return false, err } resp, err := c.httpClient.Do(req) if err != nil { return false, fmt.Errorf("failed to call user service: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return false, fmt.Errorf("user service returned status %d", resp.StatusCode) } var result struct { Valid bool `json:"valid"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return false, err } return result.Valid, nil } // order-service/internal/clients/product_client.go package clients import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" ) type Product struct { ID string `json:"id"` Name string `json:"name"` Price float64 `json:"price"` Stock int `json:"stock"` } type ProductClient struct { baseURL string httpClient *http.Client } func NewProductClient(baseURL string) *ProductClient { return &ProductClient{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 5 * time.Second, }, } } func (c *ProductClient) GetProduct(ctx context.Context, productID string) (*Product, error) { url := fmt.Sprintf("%s/products/%s", c.baseURL, productID) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, err } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to call product service: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("product not found") } var product Product if err := json.NewDecoder(resp.Body).Decode(&product); err != nil { return nil, err } return &product, nil } func (c *ProductClient) ReserveStock(ctx context.Context, productID string, quantity int) error { url := fmt.Sprintf("%s/products/reserve", c.baseURL) reqBody := map[string]interface{}{ "product_id": productID, "quantity": quantity, } body, err := json.Marshal(reqBody) if err != nil { return err } req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to reserve stock: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("failed to reserve stock: status %d", resp.StatusCode) } return nil } // order-service/internal/service/order_service.go package service import ( "context" "fmt" "time" "order-service/internal/clients" "order-service/internal/domain" ) type OrderService struct { repo domain.Repository userClient *clients.UserClient productClient *clients.ProductClient } func NewOrderService( repo domain.Repository, userClient *clients.UserClient, productClient *clients.ProductClient, ) *OrderService { return &OrderService{ repo: repo, userClient: userClient, productClient: productClient, } } func (s *OrderService) CreateOrder(ctx context.Context, userID string, items []domain.OrderItem) (*domain.Order, error) { // Validate user via User Service valid, err := s.userClient.ValidateUser(ctx, userID) if err != nil { return nil, fmt.Errorf("failed to validate user: %w", err) } if !valid { return nil, fmt.Errorf("user is not valid") } // Validate products and calculate total var total float64 for i, item := range items { product, err := s.productClient.GetProduct(ctx, item.ProductID) if err != nil { return nil, fmt.Errorf("failed to get product: %w", err) } items[i].Price = product.Price total += product.Price * float64(item.Quantity) } // Reserve stock via Product Service for _, item := range items { if err := s.productClient.ReserveStock(ctx, item.ProductID, item.Quantity); err != nil { return nil, fmt.Errorf("failed to reserve stock: %w", err) } } order := &domain.Order{ ID: generateID(), UserID: userID, Items: items, Total: total, Status: "pending", CreatedAt: time.Now(), } if err := s.repo.Create(ctx, order); err != nil { return nil, fmt.Errorf("failed to create order: %w", err) } return order, nil } func generateID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } // order-service/cmd/server/main.go package main import ( "database/sql" "encoding/json" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "order-service/internal/clients" "order-service/internal/domain" "order-service/internal/repository" "order-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/orders?sslmode=disable" } userServiceURL := os.Getenv("USER_SERVICE_URL") if userServiceURL == "" { userServiceURL = "http://localhost:8081" } productServiceURL := os.Getenv("PRODUCT_SERVICE_URL") if productServiceURL == "" { productServiceURL = "http://localhost:8082" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) userClient := clients.NewUserClient(userServiceURL) productClient := clients.NewProductClient(productServiceURL) svc := service.NewOrderService(repo, userClient, productClient) router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Items []domain.OrderItem `json:"items"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := svc.CreateOrder(r.Context(), req.UserID, req.Items) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) port := os.Getenv("PORT") if port == "" { port = "8083" } log.Printf("Order service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Docker Compose Setup version: '3.8' services: user-service: build: ./user-service ports: - "8081:8081" environment: - DATABASE_URL=postgres://user:pass@user-db:5432/users?sslmode=disable - PORT=8081 depends_on: - user-db product-service: build: ./product-service ports: - "8082:8082" environment: - DATABASE_URL=postgres://user:pass@product-db:5432/products?sslmode=disable - PORT=8082 depends_on: - product-db order-service: build: ./order-service ports: - "8083:8083" environment: - DATABASE_URL=postgres://user:pass@order-db:5432/orders?sslmode=disable - USER_SERVICE_URL=http://user-service:8081 - PRODUCT_SERVICE_URL=http://product-service:8082 - PORT=8083 depends_on: - order-db - user-service - product-service user-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=users product-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=products order-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=orders Best Practices Service Boundaries: Define clear service boundaries based on business capabilities API Contracts: Use API versioning and maintain backward compatibility Service Discovery: Implement service registry for dynamic service location Circuit Breakers: Prevent cascading failures with circuit breaker pattern Distributed Tracing: Implement tracing to debug cross-service calls Health Checks: Provide health endpoints for monitoring Configuration Management: Externalize configuration Security: Implement service-to-service authentication Common Pitfalls Distributed Monolith: Services too tightly coupled, defeating the purpose Chatty Services: Too many synchronous calls between services Shared Database: Multiple services accessing the same database Ignoring Network Failures: Not handling network errors gracefully No Service Versioning: Breaking changes without versioning Data Consistency Issues: Not handling eventual consistency Over-Engineering: Creating too many small services When to Use Microservices Architecture Use When: ...

    January 25, 2025 · 13 min · Rafiul Alam

    Go Concurrency Pattern: The Login Counter

    ← Ticket Seller | Series Overview | Monte Carlo Pi → The Problem: Counting What You Can’t See Count concurrent users. On login: increment. On logout: decrement. Simple, right? Now add reality: The counter is distributed across multiple servers A user’s session times out on one server but they’re active on another The increment message arrives after the decrement message Network partitions split your cluster Servers crash mid-operation Suddenly, this “simple” counter becomes a distributed systems nightmare. Welcome to distributed counting, where even addition is hard. ...

    January 24, 2025 · 9 min · Rafiul Alam

    Visual Guide to Distributed Systems Patterns

    Introduction Building robust distributed systems requires understanding fundamental patterns that solve common challenges like consensus, fault tolerance, request distribution, and asynchronous communication. This comprehensive guide uses visual diagrams to illustrate how these patterns work, making complex distributed systems concepts easier to understand and implement. We’ll explore: Raft Consensus Algorithm: How distributed systems agree on shared state Circuit Breaker Pattern: Preventing cascading failures in microservices Load Balancing Algorithms: Distributing traffic efficiently across servers Message Queue Patterns: Asynchronous communication strategies Part 1: Raft Consensus Algorithm The Raft consensus algorithm ensures that a cluster of servers maintains a consistent, replicated log even in the face of failures. It’s designed to be more understandable than Paxos while providing the same guarantees. ...

    January 17, 2025 · 24 min · Rafiul Alam

    Context Propagation Patterns in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Memory Model → What is Context Propagation? Context propagation is the practice of threading context.Context through your application to carry cancellation signals, deadlines, and request-scoped values across API boundaries, goroutines, and service boundaries. This is critical for building observable, responsive distributed systems. Key Capabilities: Distributed Tracing: Propagate trace IDs across services Cancellation Cascades: Cancel entire request trees Deadline Enforcement: Ensure requests complete within time budgets Request-Scoped Values: Carry metadata without polluting function signatures Real-World Use Cases Microservices: Trace requests across multiple services API Gateways: Propagate timeouts and user context Database Layers: Cancel queries when requests are abandoned Message Queues: Propagate processing deadlines HTTP Middleware: Extract and inject trace headers gRPC Services: Automatic context propagation Basic Context Propagation Propagating Through Function Calls package main import ( "context" "fmt" "time" ) // ServiceA calls ServiceB which calls ServiceC // Context propagates through all layers func ServiceA(ctx context.Context, userID string) error { // Add request-scoped value ctx = context.WithValue(ctx, "user_id", userID) ctx = context.WithValue(ctx, "request_id", generateRequestID()) fmt.Printf("[ServiceA] Processing request for user: %s\n", userID) // Propagate context to next service return ServiceB(ctx) } func ServiceB(ctx context.Context) error { // Retrieve values from context userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceB] User: %s, Request: %s\n", userID, requestID) // Add timeout for downstream call ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() return ServiceC(ctx) } func ServiceC(ctx context.Context) error { userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceC] Processing for User: %s, Request: %s\n", userID, requestID) // Simulate work select { case <-time.After(1 * time.Second): fmt.Println("[ServiceC] Work completed") return nil case <-ctx.Done(): fmt.Printf("[ServiceC] Cancelled: %v\n", ctx.Err()) return ctx.Err() } } func generateRequestID() string { return fmt.Sprintf("req-%d", time.Now().UnixNano()) } func main() { ctx := context.Background() err := ServiceA(ctx, "user-123") if err != nil { fmt.Printf("Error: %v\n", err) } } Output: ...

    June 24, 2024 · 11 min · Rafiul Alam

    Circuit Breaker Pattern in Go

    Go Concurrency Patterns Series: ← Context Pattern | Series Overview | Rate Limiter → What is the Circuit Breaker Pattern? The Circuit Breaker pattern prevents cascading failures in distributed systems by monitoring for failures and temporarily stopping calls to failing services. Like an electrical circuit breaker, it “trips” when failures exceed a threshold, giving the failing service time to recover. States: Closed: Normal operation, requests pass through Open: Failing fast, requests are rejected immediately Half-Open: Testing if service has recovered Real-World Use Cases Microservices: Prevent cascade failures between services Database Connections: Handle database outages gracefully External APIs: Deal with third-party service failures Payment Processing: Handle payment gateway issues File Systems: Manage disk I/O failures Network Operations: Handle network partitions Basic Circuit Breaker Implementation package main import ( "context" "errors" "fmt" "sync" "time" ) // State represents the circuit breaker state type State int const ( StateClosed State = iota StateOpen StateHalfOpen ) func (s State) String() string { switch s { case StateClosed: return "CLOSED" case StateOpen: return "OPEN" case StateHalfOpen: return "HALF_OPEN" default: return "UNKNOWN" } } // CircuitBreaker implements the circuit breaker pattern type CircuitBreaker struct { mu sync.RWMutex state State failureCount int successCount int lastFailureTime time.Time nextAttemptTime time.Time // Configuration maxFailures int resetTimeout time.Duration halfOpenMaxCalls int } // Config holds circuit breaker configuration type Config struct { MaxFailures int // Number of failures before opening ResetTimeout time.Duration // Time to wait before trying half-open HalfOpenMaxCalls int // Max calls allowed in half-open state } // NewCircuitBreaker creates a new circuit breaker func NewCircuitBreaker(config Config) *CircuitBreaker { return &CircuitBreaker{ state: StateClosed, maxFailures: config.MaxFailures, resetTimeout: config.ResetTimeout, halfOpenMaxCalls: config.HalfOpenMaxCalls, } } // Execute runs the given function with circuit breaker protection func (cb *CircuitBreaker) Execute(fn func() error) error { if !cb.allowRequest() { return errors.New("circuit breaker is open") } err := fn() cb.recordResult(err) return err } // allowRequest determines if a request should be allowed func (cb *CircuitBreaker) allowRequest() bool { cb.mu.Lock() defer cb.mu.Unlock() now := time.Now() switch cb.state { case StateClosed: return true case StateOpen: if now.After(cb.nextAttemptTime) { cb.state = StateHalfOpen cb.successCount = 0 cb.failureCount = 0 fmt.Printf("Circuit breaker transitioning to HALF_OPEN\n") return true } return false case StateHalfOpen: return cb.successCount + cb.failureCount < cb.halfOpenMaxCalls default: return false } } // recordResult records the result of a function call func (cb *CircuitBreaker) recordResult(err error) { cb.mu.Lock() defer cb.mu.Unlock() if err != nil { cb.onFailure() } else { cb.onSuccess() } } // onFailure handles a failure func (cb *CircuitBreaker) onFailure() { cb.failureCount++ cb.lastFailureTime = time.Now() switch cb.state { case StateClosed: if cb.failureCount >= cb.maxFailures { cb.state = StateOpen cb.nextAttemptTime = time.Now().Add(cb.resetTimeout) fmt.Printf("Circuit breaker OPENED after %d failures\n", cb.failureCount) } case StateHalfOpen: cb.state = StateOpen cb.nextAttemptTime = time.Now().Add(cb.resetTimeout) fmt.Printf("Circuit breaker returned to OPEN from HALF_OPEN\n") } } // onSuccess handles a success func (cb *CircuitBreaker) onSuccess() { switch cb.state { case StateClosed: cb.failureCount = 0 case StateHalfOpen: cb.successCount++ if cb.successCount >= cb.halfOpenMaxCalls { cb.state = StateClosed cb.failureCount = 0 fmt.Printf("Circuit breaker CLOSED after successful recovery\n") } } } // GetState returns the current state func (cb *CircuitBreaker) GetState() State { cb.mu.RLock() defer cb.mu.RUnlock() return cb.state } // GetStats returns current statistics func (cb *CircuitBreaker) GetStats() (State, int, int) { cb.mu.RLock() defer cb.mu.RUnlock() return cb.state, cb.failureCount, cb.successCount } // simulateService simulates a service that might fail func simulateService(shouldFail bool, delay time.Duration) func() error { return func() error { time.Sleep(delay) if shouldFail { return errors.New("service failure") } return nil } } func main() { config := Config{ MaxFailures: 3, ResetTimeout: 2 * time.Second, HalfOpenMaxCalls: 2, } cb := NewCircuitBreaker(config) // Test scenario: failures followed by recovery scenarios := []struct { name string shouldFail bool delay time.Duration }{ {"Success 1", false, 100 * time.Millisecond}, {"Success 2", false, 100 * time.Millisecond}, {"Failure 1", true, 100 * time.Millisecond}, {"Failure 2", true, 100 * time.Millisecond}, {"Failure 3", true, 100 * time.Millisecond}, // Should open circuit {"Blocked 1", false, 100 * time.Millisecond}, // Should be blocked {"Blocked 2", false, 100 * time.Millisecond}, // Should be blocked } for i, scenario := range scenarios { fmt.Printf("\n--- Test %d: %s ---\n", i+1, scenario.name) err := cb.Execute(simulateService(scenario.shouldFail, scenario.delay)) state, failures, successes := cb.GetStats() if err != nil { fmt.Printf("Result: ERROR - %v\n", err) } else { fmt.Printf("Result: SUCCESS\n") } fmt.Printf("State: %s, Failures: %d, Successes: %d\n", state, failures, successes) time.Sleep(100 * time.Millisecond) } // Wait for reset timeout and test recovery fmt.Printf("\n--- Waiting for reset timeout (%v) ---\n", config.ResetTimeout) time.Sleep(config.ResetTimeout + 100*time.Millisecond) // Test recovery recoveryTests := []struct { name string shouldFail bool }{ {"Recovery 1", false}, // Should succeed and move to half-open {"Recovery 2", false}, // Should succeed and close circuit {"Success after recovery", false}, } for i, test := range recoveryTests { fmt.Printf("\n--- Recovery Test %d: %s ---\n", i+1, test.name) err := cb.Execute(simulateService(test.shouldFail, 100*time.Millisecond)) state, failures, successes := cb.GetStats() if err != nil { fmt.Printf("Result: ERROR - %v\n", err) } else { fmt.Printf("Result: SUCCESS\n") } fmt.Printf("State: %s, Failures: %d, Successes: %d\n", state, failures, successes) } } Advanced Circuit Breaker with Metrics package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // Metrics tracks circuit breaker statistics type Metrics struct { totalRequests int64 successfulCalls int64 failedCalls int64 rejectedCalls int64 timeouts int64 stateChanges int64 } // AdvancedCircuitBreaker with comprehensive metrics and monitoring type AdvancedCircuitBreaker struct { mu sync.RWMutex state State failureCount int successCount int lastFailureTime time.Time nextAttemptTime time.Time stateChangeTime time.Time // Configuration maxFailures int resetTimeout time.Duration halfOpenMaxCalls int callTimeout time.Duration // Metrics metrics *Metrics // Monitoring onStateChange func(from, to State) } // AdvancedConfig holds advanced circuit breaker configuration type AdvancedConfig struct { MaxFailures int ResetTimeout time.Duration HalfOpenMaxCalls int CallTimeout time.Duration OnStateChange func(from, to State) } // NewAdvancedCircuitBreaker creates a new advanced circuit breaker func NewAdvancedCircuitBreaker(config AdvancedConfig) *AdvancedCircuitBreaker { return &AdvancedCircuitBreaker{ state: StateClosed, maxFailures: config.MaxFailures, resetTimeout: config.ResetTimeout, halfOpenMaxCalls: config.HalfOpenMaxCalls, callTimeout: config.CallTimeout, metrics: &Metrics{}, onStateChange: config.OnStateChange, stateChangeTime: time.Now(), } } // ExecuteWithContext runs function with context and timeout func (acb *AdvancedCircuitBreaker) ExecuteWithContext(ctx context.Context, fn func(context.Context) error) error { atomic.AddInt64(&acb.metrics.totalRequests, 1) if !acb.allowRequest() { atomic.AddInt64(&acb.metrics.rejectedCalls, 1) return fmt.Errorf("circuit breaker is %s", acb.GetState()) } // Create context with timeout if specified if acb.callTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, acb.callTimeout) defer cancel() } // Execute with timeout monitoring done := make(chan error, 1) go func() { done <- fn(ctx) }() select { case err := <-done: acb.recordResult(err) return err case <-ctx.Done(): atomic.AddInt64(&acb.metrics.timeouts, 1) acb.recordResult(ctx.Err()) return ctx.Err() } } // allowRequest determines if a request should be allowed func (acb *AdvancedCircuitBreaker) allowRequest() bool { acb.mu.Lock() defer acb.mu.Unlock() now := time.Now() switch acb.state { case StateClosed: return true case StateOpen: if now.After(acb.nextAttemptTime) { acb.changeState(StateHalfOpen) acb.successCount = 0 acb.failureCount = 0 return true } return false case StateHalfOpen: return acb.successCount + acb.failureCount < acb.halfOpenMaxCalls default: return false } } // recordResult records the result of a function call func (acb *AdvancedCircuitBreaker) recordResult(err error) { acb.mu.Lock() defer acb.mu.Unlock() if err != nil { atomic.AddInt64(&acb.metrics.failedCalls, 1) acb.onFailure() } else { atomic.AddInt64(&acb.metrics.successfulCalls, 1) acb.onSuccess() } } // changeState changes the circuit breaker state func (acb *AdvancedCircuitBreaker) changeState(newState State) { if acb.state != newState { oldState := acb.state acb.state = newState acb.stateChangeTime = time.Now() atomic.AddInt64(&acb.metrics.stateChanges, 1) if acb.onStateChange != nil { go acb.onStateChange(oldState, newState) } } } // onFailure handles a failure func (acb *AdvancedCircuitBreaker) onFailure() { acb.failureCount++ acb.lastFailureTime = time.Now() switch acb.state { case StateClosed: if acb.failureCount >= acb.maxFailures { acb.changeState(StateOpen) acb.nextAttemptTime = time.Now().Add(acb.resetTimeout) } case StateHalfOpen: acb.changeState(StateOpen) acb.nextAttemptTime = time.Now().Add(acb.resetTimeout) } } // onSuccess handles a success func (acb *AdvancedCircuitBreaker) onSuccess() { switch acb.state { case StateClosed: acb.failureCount = 0 case StateHalfOpen: acb.successCount++ if acb.successCount >= acb.halfOpenMaxCalls { acb.changeState(StateClosed) acb.failureCount = 0 } } } // GetMetrics returns current metrics func (acb *AdvancedCircuitBreaker) GetMetrics() Metrics { return Metrics{ totalRequests: atomic.LoadInt64(&acb.metrics.totalRequests), successfulCalls: atomic.LoadInt64(&acb.metrics.successfulCalls), failedCalls: atomic.LoadInt64(&acb.metrics.failedCalls), rejectedCalls: atomic.LoadInt64(&acb.metrics.rejectedCalls), timeouts: atomic.LoadInt64(&acb.metrics.timeouts), stateChanges: atomic.LoadInt64(&acb.metrics.stateChanges), } } // GetState returns current state func (acb *AdvancedCircuitBreaker) GetState() State { acb.mu.RLock() defer acb.mu.RUnlock() return acb.state } // HealthCheck returns health information func (acb *AdvancedCircuitBreaker) HealthCheck() map[string]interface{} { acb.mu.RLock() defer acb.mu.RUnlock() metrics := acb.GetMetrics() var successRate float64 if metrics.totalRequests > 0 { successRate = float64(metrics.successfulCalls) / float64(metrics.totalRequests) * 100 } return map[string]interface{}{ "state": acb.state.String(), "failure_count": acb.failureCount, "success_count": acb.successCount, "last_failure_time": acb.lastFailureTime, "state_change_time": acb.stateChangeTime, "next_attempt_time": acb.nextAttemptTime, "total_requests": metrics.totalRequests, "successful_calls": metrics.successfulCalls, "failed_calls": metrics.failedCalls, "rejected_calls": metrics.rejectedCalls, "timeouts": metrics.timeouts, "state_changes": metrics.stateChanges, "success_rate": fmt.Sprintf("%.2f%%", successRate), } } // Service simulation type ExternalService struct { failureRate float64 latency time.Duration } func (es *ExternalService) Call(ctx context.Context, data string) error { // Simulate latency select { case <-time.After(es.latency): case <-ctx.Done(): return ctx.Err() } // Simulate random failures if time.Now().UnixNano()%100 < int64(es.failureRate*100) { return fmt.Errorf("service failure for data: %s", data) } return nil } func main() { // Create service that fails 30% of the time service := &ExternalService{ failureRate: 0.3, latency: 100 * time.Millisecond, } config := AdvancedConfig{ MaxFailures: 3, ResetTimeout: 2 * time.Second, HalfOpenMaxCalls: 2, CallTimeout: 500 * time.Millisecond, OnStateChange: func(from, to State) { fmt.Printf(" Circuit breaker state changed: %s -> %s\n", from, to) }, } cb := NewAdvancedCircuitBreaker(config) // Monitor circuit breaker health go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { health := cb.HealthCheck() fmt.Printf(" Health: State=%s, Success Rate=%s, Total=%d, Failed=%d, Rejected=%d\n", health["state"], health["success_rate"], health["total_requests"], health["failed_calls"], health["rejected_calls"]) } }() // Simulate load var wg sync.WaitGroup for i := 0; i < 50; i++ { wg.Add(1) go func(id int) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err := cb.ExecuteWithContext(ctx, func(ctx context.Context) error { return service.Call(ctx, fmt.Sprintf("request-%d", id)) }) if err != nil { fmt.Printf(" Request %d failed: %v\n", id, err) } else { fmt.Printf(" Request %d succeeded\n", id) } time.Sleep(200 * time.Millisecond) }(i) } wg.Wait() // Final health report fmt.Println("\n Final Health Report:") health := cb.HealthCheck() for key, value := range health { fmt.Printf(" %s: %v\n", key, value) } } HTTP Client with Circuit Breaker package main import ( "context" "encoding/json" "fmt" "io" "net/http" "time" ) // HTTPClient wraps http.Client with circuit breaker type HTTPClient struct { client *http.Client circuitBreaker *AdvancedCircuitBreaker } // NewHTTPClient creates a new HTTP client with circuit breaker func NewHTTPClient(timeout time.Duration, cbConfig AdvancedConfig) *HTTPClient { return &HTTPClient{ client: &http.Client{ Timeout: timeout, }, circuitBreaker: NewAdvancedCircuitBreaker(cbConfig), } } // Get performs a GET request with circuit breaker protection func (hc *HTTPClient) Get(ctx context.Context, url string) (*http.Response, error) { var resp *http.Response err := hc.circuitBreaker.ExecuteWithContext(ctx, func(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err } var httpErr error resp, httpErr = hc.client.Do(req) if httpErr != nil { return httpErr } // Consider 5xx status codes as failures if resp.StatusCode >= 500 { resp.Body.Close() return fmt.Errorf("server error: %d", resp.StatusCode) } return nil }) return resp, err } // GetJSON performs a GET request and unmarshals JSON response func (hc *HTTPClient) GetJSON(ctx context.Context, url string, target interface{}) error { resp, err := hc.Get(ctx, url) if err != nil { return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } return json.Unmarshal(body, target) } // GetHealth returns circuit breaker health func (hc *HTTPClient) GetHealth() map[string]interface{} { return hc.circuitBreaker.HealthCheck() } // Example usage func main() { config := AdvancedConfig{ MaxFailures: 3, ResetTimeout: 5 * time.Second, HalfOpenMaxCalls: 2, CallTimeout: 2 * time.Second, OnStateChange: func(from, to State) { fmt.Printf(" HTTP Client circuit breaker: %s -> %s\n", from, to) }, } client := NewHTTPClient(3*time.Second, config) // Test URLs (some will fail) urls := []string{ "https://httpbin.org/status/200", // Success "https://httpbin.org/status/500", // Server error "https://httpbin.org/delay/1", // Success with delay "https://httpbin.org/status/503", // Server error "https://httpbin.org/status/500", // Server error "https://httpbin.org/status/502", // Server error (should open circuit) "https://httpbin.org/status/200", // Should be rejected "https://httpbin.org/status/200", // Should be rejected } for i, url := range urls { fmt.Printf("\n--- Request %d: %s ---\n", i+1, url) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) resp, err := client.Get(ctx, url) if err != nil { fmt.Printf(" Error: %v\n", err) } else { fmt.Printf(" Success: %d %s\n", resp.StatusCode, resp.Status) resp.Body.Close() } cancel() // Show current health health := client.GetHealth() fmt.Printf("State: %s, Success Rate: %s\n", health["state"], health["success_rate"]) time.Sleep(1 * time.Second) } // Wait for circuit to potentially reset fmt.Println("\n--- Waiting for potential reset ---") time.Sleep(6 * time.Second) // Try again fmt.Println("\n--- Testing recovery ---") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := client.Get(ctx, "https://httpbin.org/status/200") if err != nil { fmt.Printf(" Recovery test failed: %v\n", err) } else { fmt.Printf(" Recovery test succeeded: %d %s\n", resp.StatusCode, resp.Status) resp.Body.Close() } // Final health report fmt.Println("\n Final Health Report:") health := client.GetHealth() for key, value := range health { fmt.Printf(" %s: %v\n", key, value) } } Best Practices Choose Appropriate Thresholds: Set failure thresholds based on service characteristics Monitor State Changes: Log state transitions for debugging Implement Fallbacks: Provide alternative responses when circuit is open Use Timeouts: Combine with timeouts to handle slow responses Gradual Recovery: Use half-open state to test service recovery Metrics Collection: Track success rates, response times, and state changes Configuration: Make thresholds configurable for different environments Common Pitfalls Too Sensitive: Setting thresholds too low causes unnecessary trips Too Tolerant: High thresholds don’t protect against cascading failures No Fallbacks: Not providing alternative responses when circuit is open Ignoring Context: Not respecting context cancellation in protected functions Poor Monitoring: Not tracking circuit breaker metrics and health The Circuit Breaker pattern is essential for building resilient distributed systems. It prevents cascading failures, provides fast failure responses, and allows services time to recover, making your applications more robust and reliable. ...

    June 12, 2024 · 11 min · Rafiul Alam