Introduction

Building robust distributed systems requires understanding fundamental patterns that solve common challenges like consensus, fault tolerance, request distribution, and asynchronous communication. This comprehensive guide uses visual diagrams to illustrate how these patterns work, making complex distributed systems concepts easier to understand and implement.

We’ll explore:

  • Raft Consensus Algorithm: How distributed systems agree on shared state
  • Circuit Breaker Pattern: Preventing cascading failures in microservices
  • Load Balancing Algorithms: Distributing traffic efficiently across servers
  • Message Queue Patterns: Asynchronous communication strategies

Part 1: Raft Consensus Algorithm

The Raft consensus algorithm ensures that a cluster of servers maintains a consistent, replicated log even in the face of failures. It’s designed to be more understandable than Paxos while providing the same guarantees.

Raft Server States

Every server in a Raft cluster is in one of three states:

stateDiagram-v2
    [*] --> Follower

    Follower --> Candidate: Election timeout
    Candidate --> Leader: Receives majority votes
    Candidate --> Follower: Discovers current leader<br/>or new term
    Candidate --> Candidate: Split vote,<br/>new election

    Leader --> Follower: Discovers server<br/>with higher term

    note right of Follower
        - Responds to RPCs from<br/>  candidates and leaders
        - If election timeout,<br/>  becomes candidate
    end note

    note right of Candidate
        - Requests votes from<br/>  other servers
        - Becomes leader if<br/>  receives majority
    end note

    note right of Leader
        - Handles all client requests
        - Sends heartbeats to<br/>  maintain authority
        - Replicates log entries
    end note

Leader Election Process

When a follower doesn’t receive heartbeats from a leader, it starts an election:

sequenceDiagram
    participant F1 as Follower 1
    participant F2 as Follower 2
    participant F3 as Follower 3
    participant F4 as Follower 4
    participant F5 as Follower 5

    Note over F1,F5: Term 1: Leader exists

    Note over F1: Election timeout!<br/>No heartbeat received
    F1->>F1: Increment term to 2<br/>Transition to Candidate

    par Request votes
        F1->>F2: RequestVote(term=2)
        F1->>F3: RequestVote(term=2)
        F1->>F4: RequestVote(term=2)
        F1->>F5: RequestVote(term=2)
    end

    F2-->>F1: Vote granted
    F3-->>F1: Vote granted
    F4-->>F1: Vote granted

    Note over F1: Received 4/5 votes<br/>(including self)<br/>Majority achieved!

    F1->>F1: Transition to Leader

    par Send heartbeats
        F1->>F2: AppendEntries(heartbeat)
        F1->>F3: AppendEntries(heartbeat)
        F1->>F4: AppendEntries(heartbeat)
        F1->>F5: AppendEntries(heartbeat)
    end

    Note over F1,F5: Term 2: F1 is now Leader

Log Replication Flow

Once elected, the leader handles client requests and replicates entries to followers:

sequenceDiagram
    participant C as Client
    participant L as Leader
    participant F1 as Follower 1
    participant F2 as Follower 2
    participant F3 as Follower 3

    C->>L: Write Request: SET x=5

    Note over L: Append to local log<br/>(uncommitted)
    L->>L: Log[7] = SET x=5

    par Replicate to followers
        L->>F1: AppendEntries(entry=SET x=5, prevLogIndex=6)
        L->>F2: AppendEntries(entry=SET x=5, prevLogIndex=6)
        L->>F3: AppendEntries(entry=SET x=5, prevLogIndex=6)
    end

    F1->>F1: Append Log[7] = SET x=5
    F1-->>L: Success

    F2->>F2: Append Log[7] = SET x=5
    F2-->>L: Success

    Note over L: Received majority<br/>(2/3 followers + leader)<br/>Entry is COMMITTED

    L->>L: Commit Log[7]<br/>Apply to state machine<br/>x = 5

    L-->>C: Success: x=5 committed

    par Notify followers of commit
        L->>F1: AppendEntries(leaderCommit=7)
        L->>F2: AppendEntries(leaderCommit=7)
        L->>F3: AppendEntries(leaderCommit=7)
    end

    F1->>F1: Commit Log[7]<br/>Apply: x=5
    F2->>F2: Commit Log[7]<br/>Apply: x=5

    Note over L,F3: All servers have<br/>committed x=5

Raft Consensus in Go

Here’s a simplified implementation of core Raft concepts:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// ServerState represents the state of a Raft server
type ServerState int

const (
    Follower ServerState = iota
    Candidate
    Leader
)

func (s ServerState) String() string {
    switch s {
    case Follower:
        return "FOLLOWER"
    case Candidate:
        return "CANDIDATE"
    case Leader:
        return "LEADER"
    default:
        return "UNKNOWN"
    }
}

// LogEntry represents a single entry in the replicated log
type LogEntry struct {
    Term    int
    Command string
    Index   int
}

// RaftServer represents a single server in a Raft cluster
type RaftServer struct {
    mu sync.RWMutex

    // Persistent state
    currentTerm int
    votedFor    *int
    log         []LogEntry

    // Volatile state
    state       ServerState
    commitIndex int
    lastApplied int

    // Leader state
    nextIndex   map[int]int
    matchIndex  map[int]int

    // Configuration
    id          int
    peers       []int

    // Channels
    heartbeat   chan bool
    election    chan bool
    stopCh      chan struct{}
}

// NewRaftServer creates a new Raft server
func NewRaftServer(id int, peers []int) *RaftServer {
    rs := &RaftServer{
        id:          id,
        peers:       peers,
        currentTerm: 0,
        state:       Follower,
        log:         make([]LogEntry, 0),
        nextIndex:   make(map[int]int),
        matchIndex:  make(map[int]int),
        heartbeat:   make(chan bool, 10),
        election:    make(chan bool, 10),
        stopCh:      make(chan struct{}),
    }
    return rs
}

// Start begins the Raft server's main loop
func (rs *RaftServer) Start(ctx context.Context) {
    go rs.run(ctx)
}

// run is the main server loop
func (rs *RaftServer) run(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-rs.stopCh:
            return
        default:
            rs.mu.RLock()
            state := rs.state
            rs.mu.RUnlock()

            switch state {
            case Follower:
                rs.runFollower(ctx)
            case Candidate:
                rs.runCandidate(ctx)
            case Leader:
                rs.runLeader(ctx)
            }
        }
    }
}

// runFollower handles follower state logic
func (rs *RaftServer) runFollower(ctx context.Context) {
    timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond

    select {
    case <-rs.heartbeat:
        // Received heartbeat, stay as follower
        fmt.Printf("[Server %d] Received heartbeat in term %d\n", rs.id, rs.currentTerm)
    case <-time.After(timeout):
        // Election timeout, become candidate
        fmt.Printf("[Server %d] Election timeout, becoming candidate\n", rs.id)
        rs.mu.Lock()
        rs.state = Candidate
        rs.mu.Unlock()
    case <-ctx.Done():
        return
    }
}

// runCandidate handles candidate state logic
func (rs *RaftServer) runCandidate(ctx context.Context) {
    rs.mu.Lock()
    rs.currentTerm++
    rs.votedFor = &rs.id
    currentTerm := rs.currentTerm
    rs.mu.Unlock()

    fmt.Printf("[Server %d] Starting election for term %d\n", rs.id, currentTerm)

    votes := 1 // Vote for self
    majority := (len(rs.peers)+1)/2 + 1

    // Request votes from peers
    voteCh := make(chan bool, len(rs.peers))

    for _, peer := range rs.peers {
        go func(peerID int) {
            // Simulate RequestVote RPC
            granted := rs.requestVote(peerID, currentTerm)
            voteCh <- granted
        }(peer)
    }

    timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
    timer := time.After(timeout)

    for votes < majority {
        select {
        case granted := <-voteCh:
            if granted {
                votes++
                fmt.Printf("[Server %d] Received vote, total: %d/%d\n",
                    rs.id, votes, majority)
            }
        case <-rs.heartbeat:
            // Discovered current leader, step down
            fmt.Printf("[Server %d] Discovered leader, stepping down\n", rs.id)
            rs.mu.Lock()
            rs.state = Follower
            rs.mu.Unlock()
            return
        case <-timer:
            // Election timeout, start new election
            fmt.Printf("[Server %d] Election timeout, restarting\n", rs.id)
            return
        case <-ctx.Done():
            return
        }
    }

    // Won election
    fmt.Printf("[Server %d] WON ELECTION for term %d with %d votes\n",
        rs.id, currentTerm, votes)
    rs.mu.Lock()
    rs.state = Leader
    rs.mu.Unlock()
}

// runLeader handles leader state logic
func (rs *RaftServer) runLeader(ctx context.Context) {
    fmt.Printf("[Server %d] Now LEADER for term %d\n", rs.id, rs.currentTerm)

    // Initialize leader state
    rs.mu.Lock()
    for _, peer := range rs.peers {
        rs.nextIndex[peer] = len(rs.log) + 1
        rs.matchIndex[peer] = 0
    }
    rs.mu.Unlock()

    ticker := time.NewTicker(50 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            rs.mu.RLock()
            if rs.state != Leader {
                rs.mu.RUnlock()
                return
            }
            rs.mu.RUnlock()

            // Send heartbeats to all followers
            for _, peer := range rs.peers {
                go rs.sendHeartbeat(peer)
            }
        case <-ctx.Done():
            return
        }
    }
}

// requestVote simulates a RequestVote RPC
func (rs *RaftServer) requestVote(peerID, term int) bool {
    // Simulate network delay
    time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond)

    // Simulate vote decision (simplified)
    // In real implementation, this would check log consistency
    return rand.Float32() > 0.3 // 70% chance of granting vote
}

// sendHeartbeat sends a heartbeat to a follower
func (rs *RaftServer) sendHeartbeat(peerID int) {
    rs.mu.RLock()
    term := rs.currentTerm
    rs.mu.RUnlock()

    // In real implementation, this would be an AppendEntries RPC
    fmt.Printf("[Server %d] Sending heartbeat to Server %d (term %d)\n",
        rs.id, peerID, term)
}

// AppendEntry adds a new entry to the log (leader only)
func (rs *RaftServer) AppendEntry(command string) error {
    rs.mu.Lock()
    defer rs.mu.Unlock()

    if rs.state != Leader {
        return fmt.Errorf("not leader")
    }

    entry := LogEntry{
        Term:    rs.currentTerm,
        Command: command,
        Index:   len(rs.log) + 1,
    }

    rs.log = append(rs.log, entry)
    fmt.Printf("[Server %d] Appended entry: %v\n", rs.id, entry)

    return nil
}

func main() {
    // Create a 5-node cluster
    peers := []int{2, 3, 4, 5}
    server := NewRaftServer(1, peers)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    fmt.Println("Starting Raft server simulation...")
    server.Start(ctx)

    <-ctx.Done()
    fmt.Println("\nSimulation complete")
}

Part 2: Circuit Breaker State Transitions

The Circuit Breaker pattern prevents cascading failures by monitoring for failures and stopping requests to failing services. It acts like an electrical circuit breaker.

Circuit Breaker State Machine

stateDiagram-v2
    [*] --> Closed

    Closed --> Open: Failure threshold<br/>exceeded
    Open --> HalfOpen: Reset timeout<br/>expires
    HalfOpen --> Closed: Success threshold<br/>met
    HalfOpen --> Open: Any failure occurs

    Closed --> Closed: Success (reset<br/>failure count)
    Closed --> Closed: Failure (increment<br/>failure count)

    note right of Closed
        Normal operation
        ✓ Requests pass through
        ✓ Failures counted
        ✓ Successes reset counter
    end note

    note right of Open
        Failing fast
        ✗ Requests rejected immediately
        ⏱ Waiting for reset timeout
        ⚡ Return error instantly
    end note

    note right of HalfOpen
        Testing recovery
        ⚠ Limited requests allowed
        🔍 Monitoring success rate
        ⚖️ Deciding next state
    end note

Circuit Breaker Decision Flow

flowchart TD
    Start([Incoming Request]) --> CheckState{Current<br/>State?}

    CheckState -->|Closed| AllowRequest[Allow Request]
    CheckState -->|Open| CheckTimeout{Reset timeout<br/>expired?}
    CheckState -->|Half-Open| CheckLimit{Under request<br/>limit?}

    CheckTimeout -->|No| RejectFast[Reject Immediately<br/>Return Error]
    CheckTimeout -->|Yes| TransitionHalf[Transition to<br/>Half-Open]
    TransitionHalf --> AllowRequest

    CheckLimit -->|No| RejectFast
    CheckLimit -->|Yes| AllowRequest

    AllowRequest --> Execute[Execute Protected<br/>Function]

    Execute --> CheckResult{Success or<br/>Failure?}

    CheckResult -->|Success| RecordSuccess[Record Success]
    CheckResult -->|Failure| RecordFailure[Record Failure]

    RecordSuccess --> EvalSuccess{State?}
    RecordFailure --> EvalFailure{State?}

    EvalSuccess -->|Closed| ResetCount[Reset Failure<br/>Count]
    EvalSuccess -->|Half-Open| CheckSuccessThreshold{Success threshold<br/>reached?}

    CheckSuccessThreshold -->|Yes| TransitionClosed[Transition to<br/>Closed]
    CheckSuccessThreshold -->|No| Continue[Continue]

    EvalFailure -->|Closed| IncrementCount[Increment Failure<br/>Count]
    EvalFailure -->|Half-Open| TransitionOpen[Transition to<br/>Open]

    IncrementCount --> CheckThreshold{Failures >=<br/>Threshold?}
    CheckThreshold -->|Yes| TransitionOpenFromClosed[Transition to<br/>Open<br/>Set Reset Timer]
    CheckThreshold -->|No| Continue

    ResetCount --> End([Return Success])
    TransitionClosed --> End
    Continue --> End
    TransitionOpen --> End2([Return Failure])
    TransitionOpenFromClosed --> End2
    RejectFast --> End2

Advanced Circuit Breaker with Metrics

package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// State represents circuit breaker state
type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

func (s State) String() string {
    return []string{"CLOSED", "OPEN", "HALF_OPEN"}[s]
}

// Metrics tracks circuit breaker statistics
type Metrics struct {
    TotalRequests   int64
    SuccessfulCalls int64
    FailedCalls     int64
    RejectedCalls   int64
    StateChanges    int64
    LastFailureTime time.Time
}

// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
    mu                sync.RWMutex
    state             State
    failureCount      int
    successCount      int
    consecutiveFails  int
    lastStateChange   time.Time
    nextAttemptTime   time.Time

    // Configuration
    maxFailures       int
    resetTimeout      time.Duration
    halfOpenMaxCalls  int
    failureThreshold  float64

    // Metrics
    metrics           *Metrics

    // Callbacks
    onStateChange     func(from, to State)
}

// Config holds circuit breaker configuration
type Config struct {
    MaxFailures      int           // Failures before opening
    ResetTimeout     time.Duration // Time to wait before half-open
    HalfOpenMaxCalls int           // Max calls in half-open state
    OnStateChange    func(from, to State)
}

// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(config Config) *CircuitBreaker {
    return &CircuitBreaker{
        state:            StateClosed,
        maxFailures:      config.MaxFailures,
        resetTimeout:     config.ResetTimeout,
        halfOpenMaxCalls: config.HalfOpenMaxCalls,
        onStateChange:    config.OnStateChange,
        metrics:          &Metrics{},
        lastStateChange:  time.Now(),
    }
}

// Execute runs the function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
    atomic.AddInt64(&cb.metrics.TotalRequests, 1)

    if !cb.allowRequest() {
        atomic.AddInt64(&cb.metrics.RejectedCalls, 1)
        return errors.New("circuit breaker is " + cb.GetState().String())
    }

    err := fn()
    cb.recordResult(err)

    return err
}

// allowRequest checks if request should be allowed
func (cb *CircuitBreaker) allowRequest() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()

    switch cb.state {
    case StateClosed:
        return true

    case StateOpen:
        if now.After(cb.nextAttemptTime) {
            cb.changeState(StateHalfOpen)
            cb.successCount = 0
            cb.failureCount = 0
            return true
        }
        return false

    case StateHalfOpen:
        calls := cb.successCount + cb.failureCount
        return calls < cb.halfOpenMaxCalls

    default:
        return false
    }
}

// recordResult records the result of a function call
func (cb *CircuitBreaker) recordResult(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        atomic.AddInt64(&cb.metrics.FailedCalls, 1)
        cb.onFailure()
    } else {
        atomic.AddInt64(&cb.metrics.SuccessfulCalls, 1)
        cb.onSuccess()
    }
}

// onFailure handles a failure
func (cb *CircuitBreaker) onFailure() {
    cb.failureCount++
    cb.consecutiveFails++
    cb.metrics.LastFailureTime = time.Now()

    switch cb.state {
    case StateClosed:
        if cb.failureCount >= cb.maxFailures {
            cb.changeState(StateOpen)
            cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
        }

    case StateHalfOpen:
        // Any failure in half-open returns to open
        cb.changeState(StateOpen)
        cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
    }
}

// onSuccess handles a success
func (cb *CircuitBreaker) onSuccess() {
    cb.consecutiveFails = 0

    switch cb.state {
    case StateClosed:
        cb.failureCount = 0

    case StateHalfOpen:
        cb.successCount++
        if cb.successCount >= cb.halfOpenMaxCalls {
            cb.changeState(StateClosed)
            cb.failureCount = 0
        }
    }
}

// changeState transitions the circuit breaker to a new state
func (cb *CircuitBreaker) changeState(newState State) {
    if cb.state != newState {
        oldState := cb.state
        cb.state = newState
        cb.lastStateChange = time.Now()
        atomic.AddInt64(&cb.metrics.StateChanges, 1)

        fmt.Printf("⚡ Circuit Breaker: %s → %s\n", oldState, newState)

        if cb.onStateChange != nil {
            go cb.onStateChange(oldState, newState)
        }
    }
}

// GetState returns the current state
func (cb *CircuitBreaker) GetState() State {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state
}

// GetMetrics returns current metrics
func (cb *CircuitBreaker) GetMetrics() Metrics {
    cb.mu.RLock()
    defer cb.mu.RUnlock()

    return Metrics{
        TotalRequests:   atomic.LoadInt64(&cb.metrics.TotalRequests),
        SuccessfulCalls: atomic.LoadInt64(&cb.metrics.SuccessfulCalls),
        FailedCalls:     atomic.LoadInt64(&cb.metrics.FailedCalls),
        RejectedCalls:   atomic.LoadInt64(&cb.metrics.RejectedCalls),
        StateChanges:    atomic.LoadInt64(&cb.metrics.StateChanges),
        LastFailureTime: cb.metrics.LastFailureTime,
    }
}

// HealthCheck returns health information
func (cb *CircuitBreaker) HealthCheck() map[string]interface{} {
    cb.mu.RLock()
    defer cb.mu.RUnlock()

    metrics := cb.GetMetrics()

    var successRate float64
    if metrics.TotalRequests > 0 {
        successRate = float64(metrics.SuccessfulCalls) /
                     float64(metrics.TotalRequests) * 100
    }

    return map[string]interface{}{
        "state":              cb.state.String(),
        "failure_count":      cb.failureCount,
        "consecutive_fails":  cb.consecutiveFails,
        "total_requests":     metrics.TotalRequests,
        "successful_calls":   metrics.SuccessfulCalls,
        "failed_calls":       metrics.FailedCalls,
        "rejected_calls":     metrics.RejectedCalls,
        "success_rate":       fmt.Sprintf("%.2f%%", successRate),
        "last_state_change":  cb.lastStateChange,
    }
}

// Example: Unreliable service simulation
type UnreliableService struct {
    failureRate float64
    callCount   int
    mu          sync.Mutex
}

func (us *UnreliableService) Call() error {
    us.mu.Lock()
    us.callCount++
    count := us.callCount
    us.mu.Unlock()

    time.Sleep(50 * time.Millisecond)

    // Simulate intermittent failures
    if count%3 == 0 && count < 10 {
        return errors.New("service temporarily unavailable")
    }

    return nil
}

func main() {
    service := &UnreliableService{}

    config := Config{
        MaxFailures:      3,
        ResetTimeout:     2 * time.Second,
        HalfOpenMaxCalls: 2,
        OnStateChange: func(from, to State) {
            fmt.Printf("📊 State transition: %s → %s\n", from, to)
        },
    }

    cb := NewCircuitBreaker(config)

    fmt.Println("Starting Circuit Breaker test...\n")

    // Test requests
    for i := 1; i <= 15; i++ {
        fmt.Printf("\n--- Request %d ---\n", i)

        err := cb.Execute(func() error {
            return service.Call()
        })

        if err != nil {
            fmt.Printf("❌ Error: %v\n", err)
        } else {
            fmt.Printf("✅ Success\n")
        }

        health := cb.GetHealthCheck()
        fmt.Printf("State: %s | Success Rate: %s\n",
            health["state"], health["success_rate"])

        if i == 10 {
            fmt.Println("\n⏱  Waiting for reset timeout...")
            time.Sleep(2100 * time.Millisecond)
        } else {
            time.Sleep(300 * time.Millisecond)
        }
    }

    fmt.Println("\n" + strings.Repeat("=", 50))
    fmt.Println("Final Health Report:")
    health := cb.GetHealthCheck()
    for key, value := range health {
        fmt.Printf("  %s: %v\n", key, value)
    }
}

Part 3: Load Balancing Algorithms

Load balancers distribute incoming requests across multiple servers to ensure no single server becomes overwhelmed.

Round-Robin Load Balancing

flowchart TD
    Start([New Request]) --> GetServers[Get Available<br/>Server List]
    GetServers --> CheckEmpty{Server list<br/>empty?}

    CheckEmpty -->|Yes| Error[Return Error:<br/>No servers available]
    CheckEmpty -->|No| GetCurrent[Get Current Index]

    GetCurrent --> SelectServer[Select Server at<br/>Current Index]
    SelectServer --> Increment[Increment Index:<br/>index = index + 1 % len]

    Increment --> CheckHealth{Server<br/>healthy?}

    CheckHealth -->|No| Skip[Skip to<br/>Next Server]
    Skip --> GetCurrent

    CheckHealth -->|Yes| SendRequest[Send Request<br/>to Server]
    SendRequest --> LogMetrics[Update Metrics:<br/>- Request count<br/>- Response time]

    LogMetrics --> Success([Return Response])
    Error --> End([End])

Least Connections Algorithm

flowchart TD
    Start([New Request]) --> Init[Initialize:<br/>minConn = ∞<br/>selectedServer = nil]

    Init --> GetServers[Get All Active<br/>Servers]

    GetServers --> IterateStart{For each<br/>server}

    IterateStart --> CheckHealth{Server<br/>healthy?}
    CheckHealth -->|No| NextServer[Continue to<br/>next server]
    CheckHealth -->|Yes| GetConnCount[Get Active<br/>Connection Count]

    GetConnCount --> Compare{connCount <<br/>minConn?}

    Compare -->|Yes| UpdateMin[minConn = connCount<br/>selectedServer = current]
    Compare -->|No| NextServer

    UpdateMin --> NextServer
    NextServer --> MoreServers{More<br/>servers?}

    MoreServers -->|Yes| IterateStart
    MoreServers -->|No| CheckSelected{selectedServer<br/>!= nil?}

    CheckSelected -->|No| NoServer[Return Error:<br/>No healthy servers]
    CheckSelected -->|Yes| IncrementConn[Increment server's<br/>active connection count]

    IncrementConn --> SendRequest[Send Request<br/>to Selected Server]
    SendRequest --> DecrementConn[On completion:<br/>Decrement connection<br/>count]

    DecrementConn --> Success([Return Response])
    NoServer --> End([End])

Consistent Hashing

flowchart TD
    Start([New Request]) --> Extract[Extract Key<br/>e.g., user ID,<br/>session ID]

    Extract --> Hash[Hash Key:<br/>hash = hashFunc]

    Hash --> FindRing[Find position<br/>on hash ring]

    FindRing --> SearchCW[Search clockwise<br/>on ring for first<br/>virtual node]

    SearchCW --> MapNode[Map virtual node<br/>to physical server]

    MapNode --> CheckServer{Server<br/>available?}

    CheckServer -->|No| FindNext[Continue clockwise<br/>to next virtual node]
    FindNext --> MapNode

    CheckServer -->|Yes| SendRequest[Send Request<br/>to Server]

    SendRequest --> UpdateCache[Update local cache:<br/>key → server mapping]

    UpdateCache --> Success([Return Response])

    Note1[Hash Ring Visualization:<br/>0° ─ Server A vnode 1<br/>72° ─ Server B vnode 1<br/>144° ─ Server A vnode 2<br/>216° ─ Server C vnode 1<br/>288° ─ Server B vnode 2]

    style Note1 fill:#f9f,stroke:#333,stroke-width:2px

Load Balancing Implementation in Go

package main

import (
    "crypto/sha256"
    "encoding/binary"
    "errors"
    "fmt"
    "sort"
    "sync"
    "sync/atomic"
    "time"
)

// Server represents a backend server
type Server struct {
    ID              string
    Address         string
    Healthy         bool
    ActiveConns     int32
    TotalRequests   int64
    FailedRequests  int64
    LastHealthCheck time.Time
}

// ===== Round-Robin Load Balancer =====

// RoundRobinLB implements round-robin load balancing
type RoundRobinLB struct {
    servers []*Server
    current uint32
    mu      sync.RWMutex
}

// NewRoundRobinLB creates a new round-robin load balancer
func NewRoundRobinLB(servers []*Server) *RoundRobinLB {
    return &RoundRobinLB{
        servers: servers,
        current: 0,
    }
}

// NextServer returns the next server using round-robin
func (rr *RoundRobinLB) NextServer() (*Server, error) {
    rr.mu.RLock()
    defer rr.mu.RUnlock()

    if len(rr.servers) == 0 {
        return nil, errors.New("no servers available")
    }

    // Try each server once
    for i := 0; i < len(rr.servers); i++ {
        index := atomic.AddUint32(&rr.current, 1) % uint32(len(rr.servers))
        server := rr.servers[index]

        if server.Healthy {
            return server, nil
        }
    }

    return nil, errors.New("no healthy servers available")
}

// ===== Least Connections Load Balancer =====

// LeastConnectionsLB implements least connections load balancing
type LeastConnectionsLB struct {
    servers []*Server
    mu      sync.RWMutex
}

// NewLeastConnectionsLB creates a new least connections load balancer
func NewLeastConnectionsLB(servers []*Server) *LeastConnectionsLB {
    return &LeastConnectionsLB{
        servers: servers,
    }
}

// NextServer returns the server with least active connections
func (lc *LeastConnectionsLB) NextServer() (*Server, error) {
    lc.mu.RLock()
    defer lc.mu.RUnlock()

    var selected *Server
    minConns := int32(1<<31 - 1) // Max int32

    for _, server := range lc.servers {
        if !server.Healthy {
            continue
        }

        conns := atomic.LoadInt32(&server.ActiveConns)
        if conns < minConns {
            minConns = conns
            selected = server
        }
    }

    if selected == nil {
        return nil, errors.New("no healthy servers available")
    }

    return selected, nil
}

// ===== Consistent Hashing Load Balancer =====

// hashRing represents a consistent hash ring
type hashRing struct {
    nodes       map[uint32]string // hash -> server ID
    sortedKeys  []uint32
    servers     map[string]*Server
    replicas    int
    mu          sync.RWMutex
}

// NewHashRing creates a new consistent hash ring
func NewHashRing(servers []*Server, replicas int) *hashRing {
    hr := &hashRing{
        nodes:    make(map[uint32]string),
        servers:  make(map[string]*Server),
        replicas: replicas,
    }

    for _, server := range servers {
        hr.AddServer(server)
    }

    return hr
}

// AddServer adds a server to the hash ring
func (hr *hashRing) AddServer(server *Server) {
    hr.mu.Lock()
    defer hr.mu.Unlock()

    hr.servers[server.ID] = server

    // Add virtual nodes
    for i := 0; i < hr.replicas; i++ {
        hash := hr.hashKey(fmt.Sprintf("%s:%d", server.ID, i))
        hr.nodes[hash] = server.ID
        hr.sortedKeys = append(hr.sortedKeys, hash)
    }

    sort.Slice(hr.sortedKeys, func(i, j int) bool {
        return hr.sortedKeys[i] < hr.sortedKeys[j]
    })
}

// GetServer returns a server for the given key
func (hr *hashRing) GetServer(key string) (*Server, error) {
    hr.mu.RLock()
    defer hr.mu.RUnlock()

    if len(hr.nodes) == 0 {
        return nil, errors.New("no servers in ring")
    }

    hash := hr.hashKey(key)

    // Binary search for first node >= hash
    idx := sort.Search(len(hr.sortedKeys), func(i int) bool {
        return hr.sortedKeys[i] >= hash
    })

    // Wrap around if necessary
    if idx == len(hr.sortedKeys) {
        idx = 0
    }

    // Find first healthy server
    for i := 0; i < len(hr.sortedKeys); i++ {
        nodeHash := hr.sortedKeys[(idx+i)%len(hr.sortedKeys)]
        serverID := hr.nodes[nodeHash]
        server := hr.servers[serverID]

        if server.Healthy {
            return server, nil
        }
    }

    return nil, errors.New("no healthy servers available")
}

// hashKey generates a hash for a key
func (hr *hashRing) hashKey(key string) uint32 {
    h := sha256.Sum256([]byte(key))
    return binary.BigEndian.Uint32(h[:4])
}

// ===== Demo =====

func simulateRequest(server *Server, requestID int) error {
    atomic.AddInt32(&server.ActiveConns, 1)
    atomic.AddInt64(&server.TotalRequests, 1)

    defer atomic.AddInt32(&server.ActiveConns, -1)

    // Simulate processing time
    time.Sleep(time.Duration(50+requestID%100) * time.Millisecond)

    fmt.Printf("  ✓ Request %d handled by %s (active: %d)\n",
        requestID, server.ID, atomic.LoadInt32(&server.ActiveConns))

    return nil
}

func main() {
    // Create servers
    servers := []*Server{
        {ID: "server-1", Address: "192.168.1.1:8080", Healthy: true},
        {ID: "server-2", Address: "192.168.1.2:8080", Healthy: true},
        {ID: "server-3", Address: "192.168.1.3:8080", Healthy: true},
    }

    fmt.Println("========== Round-Robin Load Balancer ==========")
    rrLB := NewRoundRobinLB(servers)

    for i := 1; i <= 6; i++ {
        server, err := rrLB.NextServer()
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            continue
        }
        fmt.Printf("Request %d → %s\n", i, server.ID)
    }

    fmt.Println("\n========== Least Connections Load Balancer ==========")
    lcLB := NewLeastConnectionsLB(servers)

    var wg sync.WaitGroup
    for i := 1; i <= 9; i++ {
        wg.Add(1)
        go func(reqID int) {
            defer wg.Done()

            server, err := lcLB.NextServer()
            if err != nil {
                fmt.Printf("Error: %v\n", err)
                return
            }

            fmt.Printf("Request %d → %s\n", reqID, server.ID)
            simulateRequest(server, reqID)
        }(i)

        time.Sleep(20 * time.Millisecond)
    }
    wg.Wait()

    fmt.Println("\n========== Consistent Hashing Load Balancer ==========")
    chLB := NewHashRing(servers, 3) // 3 virtual nodes per server

    keys := []string{
        "user:12345",
        "user:67890",
        "session:abc123",
        "user:12345", // Same key should go to same server
        "session:def456",
        "user:67890", // Same key should go to same server
    }

    for _, key := range keys {
        server, err := chLB.GetServer(key)
        if err != nil {
            fmt.Printf("Error for key %s: %v\n", key, err)
            continue
        }
        fmt.Printf("Key %-20s → %s\n", key, server.ID)
    }

    // Show final statistics
    fmt.Println("\n========== Server Statistics ==========")
    for _, server := range servers {
        fmt.Printf("%s: Total Requests=%d, Active Connections=%d\n",
            server.ID,
            atomic.LoadInt64(&server.TotalRequests),
            atomic.LoadInt32(&server.ActiveConns))
    }
}

Part 4: Message Queue Patterns

Message queues enable asynchronous communication between services, decoupling producers from consumers.

Publish-Subscribe (Pub/Sub) Pattern

sequenceDiagram
    participant P1 as Publisher 1
    participant P2 as Publisher 2
    participant B as Message Broker
    participant S1 as Subscriber 1
    participant S2 as Subscriber 2
    participant S3 as Subscriber 3

    Note over B: Topic: "user.events"

    S1->>B: Subscribe to "user.events"
    S2->>B: Subscribe to "user.events"
    S3->>B: Subscribe to "user.events"

    Note over S1,S3: All subscribers waiting

    P1->>B: Publish: {type: "user.created", id: 123}

    Note over B: Fanout to all<br/>subscribers

    par Broadcast to all subscribers
        B->>S1: Message: user.created #123
        B->>S2: Message: user.created #123
        B->>S3: Message: user.created #123
    end

    S1-->>B: Ack
    S2-->>B: Ack
    S3-->>B: Ack

    P2->>B: Publish: {type: "user.updated", id: 123}

    par Broadcast again
        B->>S1: Message: user.updated #123
        B->>S2: Message: user.updated #123
        B->>S3: Message: user.updated #123
    end

    Note over S1,S3: Each subscriber receives<br/>ALL messages

Request-Reply Pattern

sequenceDiagram
    participant C as Client
    participant RQ as Request Queue
    participant W as Worker
    participant RespQ as Response Queue

    Note over C: Need to process<br/>complex calculation

    C->>C: Generate correlation ID<br/>correlationID = "req-12345"

    C->>RQ: Send Request {<br/>  correlationID: "req-12345",<br/>  replyTo: "response.client1",<br/>  data: {x: 100, y: 200}<br/>}

    Note over C: Waiting for response on<br/>"response.client1"

    W->>RQ: Poll for messages
    RQ->>W: Deliver Request

    Note over W: Process request<br/>result = x + y = 300

    W->>RespQ: Send to "response.client1" {<br/>  correlationID: "req-12345",<br/>  status: "success",<br/>  result: 300<br/>}

    RespQ->>C: Deliver Response

    C->>C: Match correlationID<br/>"req-12345"

    Note over C: Response received!<br/>result = 300

    C-->>RespQ: Ack message

Push-Pull (Work Queue) Pattern

sequenceDiagram
    participant P as Producer
    participant Q as Work Queue
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant W3 as Worker 3

    Note over P: Generate tasks

    par Produce tasks
        P->>Q: Task 1: Process image A
        P->>Q: Task 2: Process image B
        P->>Q: Task 3: Process image C
        P->>Q: Task 4: Process image D
        P->>Q: Task 5: Process image E
        P->>Q: Task 6: Process image F
    end

    Note over Q: Tasks queued<br/>Waiting for workers

    par Workers pull tasks
        W1->>Q: Pull task
        W2->>Q: Pull task
        W3->>Q: Pull task
    end

    Q->>W1: Task 1
    Q->>W2: Task 2
    Q->>W3: Task 3

    Note over W1: Processing Task 1...
    Note over W2: Processing Task 2...
    Note over W3: Processing Task 3...

    W2->>W2: Complete Task 2
    W2-->>Q: Ack Task 2

    W2->>Q: Pull next task
    Q->>W2: Task 4

    Note over W2: Processing Task 4...

    W1->>W1: Complete Task 1
    W1-->>Q: Ack Task 1

    W1->>Q: Pull next task
    Q->>W1: Task 5

    W3->>W3: Complete Task 3
    W3-->>Q: Ack Task 3

    W3->>Q: Pull next task
    Q->>W3: Task 6

    Note over W1,W3: Load balanced<br/>across workers

Message Queue Patterns in Go

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ===== Pub/Sub Pattern =====

// Message represents a pub/sub message
type Message struct {
    Topic     string
    Payload   interface{}
    Timestamp time.Time
}

// Subscriber is a function that handles messages
type Subscriber func(Message)

// PubSubBroker implements publish-subscribe pattern
type PubSubBroker struct {
    mu          sync.RWMutex
    subscribers map[string][]Subscriber
}

// NewPubSubBroker creates a new pub/sub broker
func NewPubSubBroker() *PubSubBroker {
    return &PubSubBroker{
        subscribers: make(map[string][]Subscriber),
    }
}

// Subscribe registers a subscriber for a topic
func (psb *PubSubBroker) Subscribe(topic string, handler Subscriber) {
    psb.mu.Lock()
    defer psb.mu.Unlock()

    psb.subscribers[topic] = append(psb.subscribers[topic], handler)
    fmt.Printf("📬 New subscriber for topic: %s\n", topic)
}

// Publish sends a message to all subscribers of a topic
func (psb *PubSubBroker) Publish(topic string, payload interface{}) {
    psb.mu.RLock()
    defer psb.mu.RUnlock()

    message := Message{
        Topic:     topic,
        Payload:   payload,
        Timestamp: time.Now(),
    }

    subscribers := psb.subscribers[topic]
    fmt.Printf("📤 Publishing to %s: %d subscribers\n", topic, len(subscribers))

    for _, subscriber := range subscribers {
        go subscriber(message) // Async delivery
    }
}

// ===== Request-Reply Pattern =====

// Request represents a request message
type Request struct {
    CorrelationID string
    ReplyTo       string
    Data          interface{}
}

// Response represents a response message
type Response struct {
    CorrelationID string
    Status        string
    Result        interface{}
    Error         error
}

// RequestReplyClient handles request-reply communication
type RequestReplyClient struct {
    requestQueue  chan Request
    responseChans map[string]chan Response
    mu            sync.RWMutex
}

// NewRequestReplyClient creates a new request-reply client
func NewRequestReplyClient() *RequestReplyClient {
    return &RequestReplyClient{
        requestQueue:  make(chan Request, 100),
        responseChans: make(map[string]chan Response),
    }
}

// SendRequest sends a request and waits for response
func (rrc *RequestReplyClient) SendRequest(ctx context.Context, data interface{}) (*Response, error) {
    correlationID := fmt.Sprintf("req-%d", time.Now().UnixNano())
    responseChan := make(chan Response, 1)

    rrc.mu.Lock()
    rrc.responseChans[correlationID] = responseChan
    rrc.mu.Unlock()

    defer func() {
        rrc.mu.Lock()
        delete(rrc.responseChans, correlationID)
        rrc.mu.Unlock()
    }()

    request := Request{
        CorrelationID: correlationID,
        ReplyTo:       "response-queue",
        Data:          data,
    }

    fmt.Printf("📨 Sending request: %s\n", correlationID)

    select {
    case rrc.requestQueue <- request:
    case <-ctx.Done():
        return nil, ctx.Err()
    }

    select {
    case response := <-responseChan:
        fmt.Printf("📬 Received response: %s\n", correlationID)
        return &response, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

// HandleResponse processes incoming responses
func (rrc *RequestReplyClient) HandleResponse(response Response) {
    rrc.mu.RLock()
    responseChan, exists := rrc.responseChans[response.CorrelationID]
    rrc.mu.RUnlock()

    if exists {
        select {
        case responseChan <- response:
        default:
        }
    }
}

// Worker processes requests
type Worker struct {
    ID           int
    requestQueue chan Request
    client       *RequestReplyClient
}

// Start begins processing requests
func (w *Worker) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case request := <-w.requestQueue:
                fmt.Printf("⚙️  Worker %d processing: %s\n",
                    w.ID, request.CorrelationID)

                // Simulate processing
                time.Sleep(100 * time.Millisecond)

                response := Response{
                    CorrelationID: request.CorrelationID,
                    Status:        "success",
                    Result:        fmt.Sprintf("Processed by worker %d", w.ID),
                }

                w.client.HandleResponse(response)

            case <-ctx.Done():
                return
            }
        }
    }()
}

// ===== Work Queue (Push-Pull) Pattern =====

// Task represents a unit of work
type Task struct {
    ID      int
    Data    interface{}
    Retries int
}

// WorkQueue implements work queue pattern
type WorkQueue struct {
    tasks    chan Task
    results  chan interface{}
    workers  int
}

// NewWorkQueue creates a new work queue
func NewWorkQueue(bufferSize, numWorkers int) *WorkQueue {
    return &WorkQueue{
        tasks:   make(chan Task, bufferSize),
        results: make(chan interface{}, bufferSize),
        workers: numWorkers,
    }
}

// Start starts all workers
func (wq *WorkQueue) Start(ctx context.Context, processFn func(Task) interface{}) {
    for i := 0; i < wq.workers; i++ {
        go wq.worker(ctx, i, processFn)
    }
}

// worker processes tasks from the queue
func (wq *WorkQueue) worker(ctx context.Context, id int, processFn func(Task) interface{}) {
    fmt.Printf("👷 Worker %d started\n", id)

    for {
        select {
        case task := <-wq.tasks:
            fmt.Printf("👷 Worker %d processing task %d\n", id, task.ID)

            result := processFn(task)

            select {
            case wq.results <- result:
            case <-ctx.Done():
                return
            }

        case <-ctx.Done():
            fmt.Printf("👷 Worker %d stopped\n", id)
            return
        }
    }
}

// Submit adds a task to the queue
func (wq *WorkQueue) Submit(task Task) {
    wq.tasks <- task
}

// Results returns the results channel
func (wq *WorkQueue) Results() <-chan interface{} {
    return wq.results
}

// ===== Demo =====

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // ===== Pub/Sub Demo =====
    fmt.Println("========== Pub/Sub Pattern ==========\n")

    broker := NewPubSubBroker()

    broker.Subscribe("user.events", func(msg Message) {
        fmt.Printf("  📧 Email Service: %v\n", msg.Payload)
    })

    broker.Subscribe("user.events", func(msg Message) {
        fmt.Printf("  📊 Analytics: %v\n", msg.Payload)
    })

    broker.Subscribe("user.events", func(msg Message) {
        fmt.Printf("  💾 Database: %v\n", msg.Payload)
    })

    broker.Publish("user.events", "User [email protected] registered")
    time.Sleep(100 * time.Millisecond)

    broker.Publish("user.events", "User [email protected] logged in")
    time.Sleep(100 * time.Millisecond)

    // ===== Request-Reply Demo =====
    fmt.Println("\n========== Request-Reply Pattern ==========\n")

    rrClient := NewRequestReplyClient()

    // Start workers
    for i := 1; i <= 3; i++ {
        worker := &Worker{
            ID:           i,
            requestQueue: rrClient.requestQueue,
            client:       rrClient,
        }
        worker.Start(ctx)
    }

    // Send requests
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
            defer cancel()

            response, err := rrClient.SendRequest(ctx,
                fmt.Sprintf("Calculate %d", id))

            if err != nil {
                fmt.Printf("❌ Request %d failed: %v\n", id, err)
                return
            }

            fmt.Printf("✅ Request %d: %v\n", id, response.Result)
        }(i)
    }

    wg.Wait()
    time.Sleep(200 * time.Millisecond)

    // ===== Work Queue Demo =====
    fmt.Println("\n========== Work Queue Pattern ==========\n")

    workQueue := NewWorkQueue(10, 3)

    processFn := func(task Task) interface{} {
        // Simulate work
        time.Sleep(time.Duration(50+task.ID*10) * time.Millisecond)
        return fmt.Sprintf("Task %d completed", task.ID)
    }

    workQueue.Start(ctx, processFn)

    // Submit tasks
    go func() {
        for i := 1; i <= 6; i++ {
            workQueue.Submit(Task{
                ID:   i,
                Data: fmt.Sprintf("Process item %d", i),
            })
        }
    }()

    // Collect results
    go func() {
        for result := range workQueue.Results() {
            fmt.Printf("✅ Result: %v\n", result)
        }
    }()

    time.Sleep(1 * time.Second)

    fmt.Println("\n========== Demo Complete ==========")
}

Comparison and When to Use Each Pattern

Consensus Algorithms (Raft)

Use When:

  • You need strong consistency across distributed nodes
  • Building distributed databases or configuration stores
  • Implementing leader election in clustered systems
  • Replicating state machines

Trade-offs:

  • ✅ Strong consistency guarantees
  • ✅ Easier to understand than Paxos
  • ❌ Requires majority of nodes to be available
  • ❌ Higher latency due to consensus overhead

Circuit Breakers

Use When:

  • Making calls to external services or microservices
  • Protecting against cascading failures
  • Need graceful degradation
  • Want to fail fast instead of waiting for timeouts

Trade-offs:

  • ✅ Prevents cascading failures
  • ✅ Faster failure detection
  • ✅ Gives failing services time to recover
  • ❌ Requires careful threshold tuning
  • ❌ Can reject valid requests during recovery

Load Balancing

Round-Robin:

  • Simple, even distribution
  • Good for homogeneous backends
  • No state required

Least Connections:

  • Better for variable request durations
  • Prevents overloading slow servers
  • Requires connection tracking

Consistent Hashing:

  • Maintains sticky sessions
  • Minimal remapping when nodes change
  • Great for caching scenarios

Message Queue Patterns

Pub/Sub:

  • Broadcasting events to multiple consumers
  • Loosely coupled event-driven architecture
  • Real-time notifications

Request-Reply:

  • Synchronous-style communication over async transport
  • RPC-like behavior with message queues
  • Can implement timeouts and retries

Push-Pull (Work Queue):

  • Task distribution among workers
  • Load balancing computational work
  • Parallel processing pipelines

Conclusion

Understanding these distributed systems patterns is crucial for building scalable, resilient applications. Each pattern solves specific problems:

  • Raft ensures data consistency across distributed nodes
  • Circuit Breakers protect against cascading failures
  • Load Balancers distribute traffic efficiently
  • Message Queues enable asynchronous, decoupled communication

The visual diagrams in this guide help illustrate complex interactions, making these patterns easier to understand and implement. Choose patterns based on your specific requirements for consistency, availability, partition tolerance, and performance.


Further Reading


Have questions or want to discuss distributed systems patterns? Feel free to reach out!