Introduction
Building robust distributed systems requires understanding fundamental patterns that solve common challenges like consensus, fault tolerance, request distribution, and asynchronous communication. This comprehensive guide uses visual diagrams to illustrate how these patterns work, making complex distributed systems concepts easier to understand and implement.
We’ll explore:
- Raft Consensus Algorithm: How distributed systems agree on shared state
- Circuit Breaker Pattern: Preventing cascading failures in microservices
- Load Balancing Algorithms: Distributing traffic efficiently across servers
- Message Queue Patterns: Asynchronous communication strategies
Part 1: Raft Consensus Algorithm
The Raft consensus algorithm ensures that a cluster of servers maintains a consistent, replicated log even in the face of failures. It’s designed to be more understandable than Paxos while providing the same guarantees.
Raft Server States
Every server in a Raft cluster is in one of three states:
stateDiagram-v2
[*] --> Follower
Follower --> Candidate: Election timeout
Candidate --> Leader: Receives majority votes
Candidate --> Follower: Discovers current leader<br/>or new term
Candidate --> Candidate: Split vote,<br/>new election
Leader --> Follower: Discovers server<br/>with higher term
note right of Follower
- Responds to RPCs from<br/> candidates and leaders
- If election timeout,<br/> becomes candidate
end note
note right of Candidate
- Requests votes from<br/> other servers
- Becomes leader if<br/> receives majority
end note
note right of Leader
- Handles all client requests
- Sends heartbeats to<br/> maintain authority
- Replicates log entries
end note
Leader Election Process
When a follower doesn’t receive heartbeats from a leader, it starts an election:
sequenceDiagram
participant F1 as Follower 1
participant F2 as Follower 2
participant F3 as Follower 3
participant F4 as Follower 4
participant F5 as Follower 5
Note over F1,F5: Term 1: Leader exists
Note over F1: Election timeout!<br/>No heartbeat received
F1->>F1: Increment term to 2<br/>Transition to Candidate
par Request votes
F1->>F2: RequestVote(term=2)
F1->>F3: RequestVote(term=2)
F1->>F4: RequestVote(term=2)
F1->>F5: RequestVote(term=2)
end
F2-->>F1: Vote granted
F3-->>F1: Vote granted
F4-->>F1: Vote granted
Note over F1: Received 4/5 votes<br/>(including self)<br/>Majority achieved!
F1->>F1: Transition to Leader
par Send heartbeats
F1->>F2: AppendEntries(heartbeat)
F1->>F3: AppendEntries(heartbeat)
F1->>F4: AppendEntries(heartbeat)
F1->>F5: AppendEntries(heartbeat)
end
Note over F1,F5: Term 2: F1 is now Leader
Log Replication Flow
Once elected, the leader handles client requests and replicates entries to followers:
sequenceDiagram
participant C as Client
participant L as Leader
participant F1 as Follower 1
participant F2 as Follower 2
participant F3 as Follower 3
C->>L: Write Request: SET x=5
Note over L: Append to local log<br/>(uncommitted)
L->>L: Log[7] = SET x=5
par Replicate to followers
L->>F1: AppendEntries(entry=SET x=5, prevLogIndex=6)
L->>F2: AppendEntries(entry=SET x=5, prevLogIndex=6)
L->>F3: AppendEntries(entry=SET x=5, prevLogIndex=6)
end
F1->>F1: Append Log[7] = SET x=5
F1-->>L: Success
F2->>F2: Append Log[7] = SET x=5
F2-->>L: Success
Note over L: Received majority<br/>(2/3 followers + leader)<br/>Entry is COMMITTED
L->>L: Commit Log[7]<br/>Apply to state machine<br/>x = 5
L-->>C: Success: x=5 committed
par Notify followers of commit
L->>F1: AppendEntries(leaderCommit=7)
L->>F2: AppendEntries(leaderCommit=7)
L->>F3: AppendEntries(leaderCommit=7)
end
F1->>F1: Commit Log[7]<br/>Apply: x=5
F2->>F2: Commit Log[7]<br/>Apply: x=5
Note over L,F3: All servers have<br/>committed x=5
Raft Consensus in Go
Here’s a simplified implementation of core Raft concepts:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// ServerState represents the state of a Raft server
type ServerState int
const (
Follower ServerState = iota
Candidate
Leader
)
func (s ServerState) String() string {
switch s {
case Follower:
return "FOLLOWER"
case Candidate:
return "CANDIDATE"
case Leader:
return "LEADER"
default:
return "UNKNOWN"
}
}
// LogEntry represents a single entry in the replicated log
type LogEntry struct {
Term int
Command string
Index int
}
// RaftServer represents a single server in a Raft cluster
type RaftServer struct {
mu sync.RWMutex
// Persistent state
currentTerm int
votedFor *int
log []LogEntry
// Volatile state
state ServerState
commitIndex int
lastApplied int
// Leader state
nextIndex map[int]int
matchIndex map[int]int
// Configuration
id int
peers []int
// Channels
heartbeat chan bool
election chan bool
stopCh chan struct{}
}
// NewRaftServer creates a new Raft server
func NewRaftServer(id int, peers []int) *RaftServer {
rs := &RaftServer{
id: id,
peers: peers,
currentTerm: 0,
state: Follower,
log: make([]LogEntry, 0),
nextIndex: make(map[int]int),
matchIndex: make(map[int]int),
heartbeat: make(chan bool, 10),
election: make(chan bool, 10),
stopCh: make(chan struct{}),
}
return rs
}
// Start begins the Raft server's main loop
func (rs *RaftServer) Start(ctx context.Context) {
go rs.run(ctx)
}
// run is the main server loop
func (rs *RaftServer) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-rs.stopCh:
return
default:
rs.mu.RLock()
state := rs.state
rs.mu.RUnlock()
switch state {
case Follower:
rs.runFollower(ctx)
case Candidate:
rs.runCandidate(ctx)
case Leader:
rs.runLeader(ctx)
}
}
}
}
// runFollower handles follower state logic
func (rs *RaftServer) runFollower(ctx context.Context) {
timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
select {
case <-rs.heartbeat:
// Received heartbeat, stay as follower
fmt.Printf("[Server %d] Received heartbeat in term %d\n", rs.id, rs.currentTerm)
case <-time.After(timeout):
// Election timeout, become candidate
fmt.Printf("[Server %d] Election timeout, becoming candidate\n", rs.id)
rs.mu.Lock()
rs.state = Candidate
rs.mu.Unlock()
case <-ctx.Done():
return
}
}
// runCandidate handles candidate state logic
func (rs *RaftServer) runCandidate(ctx context.Context) {
rs.mu.Lock()
rs.currentTerm++
rs.votedFor = &rs.id
currentTerm := rs.currentTerm
rs.mu.Unlock()
fmt.Printf("[Server %d] Starting election for term %d\n", rs.id, currentTerm)
votes := 1 // Vote for self
majority := (len(rs.peers)+1)/2 + 1
// Request votes from peers
voteCh := make(chan bool, len(rs.peers))
for _, peer := range rs.peers {
go func(peerID int) {
// Simulate RequestVote RPC
granted := rs.requestVote(peerID, currentTerm)
voteCh <- granted
}(peer)
}
timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
timer := time.After(timeout)
for votes < majority {
select {
case granted := <-voteCh:
if granted {
votes++
fmt.Printf("[Server %d] Received vote, total: %d/%d\n",
rs.id, votes, majority)
}
case <-rs.heartbeat:
// Discovered current leader, step down
fmt.Printf("[Server %d] Discovered leader, stepping down\n", rs.id)
rs.mu.Lock()
rs.state = Follower
rs.mu.Unlock()
return
case <-timer:
// Election timeout, start new election
fmt.Printf("[Server %d] Election timeout, restarting\n", rs.id)
return
case <-ctx.Done():
return
}
}
// Won election
fmt.Printf("[Server %d] WON ELECTION for term %d with %d votes\n",
rs.id, currentTerm, votes)
rs.mu.Lock()
rs.state = Leader
rs.mu.Unlock()
}
// runLeader handles leader state logic
func (rs *RaftServer) runLeader(ctx context.Context) {
fmt.Printf("[Server %d] Now LEADER for term %d\n", rs.id, rs.currentTerm)
// Initialize leader state
rs.mu.Lock()
for _, peer := range rs.peers {
rs.nextIndex[peer] = len(rs.log) + 1
rs.matchIndex[peer] = 0
}
rs.mu.Unlock()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rs.mu.RLock()
if rs.state != Leader {
rs.mu.RUnlock()
return
}
rs.mu.RUnlock()
// Send heartbeats to all followers
for _, peer := range rs.peers {
go rs.sendHeartbeat(peer)
}
case <-ctx.Done():
return
}
}
}
// requestVote simulates a RequestVote RPC
func (rs *RaftServer) requestVote(peerID, term int) bool {
// Simulate network delay
time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond)
// Simulate vote decision (simplified)
// In real implementation, this would check log consistency
return rand.Float32() > 0.3 // 70% chance of granting vote
}
// sendHeartbeat sends a heartbeat to a follower
func (rs *RaftServer) sendHeartbeat(peerID int) {
rs.mu.RLock()
term := rs.currentTerm
rs.mu.RUnlock()
// In real implementation, this would be an AppendEntries RPC
fmt.Printf("[Server %d] Sending heartbeat to Server %d (term %d)\n",
rs.id, peerID, term)
}
// AppendEntry adds a new entry to the log (leader only)
func (rs *RaftServer) AppendEntry(command string) error {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.state != Leader {
return fmt.Errorf("not leader")
}
entry := LogEntry{
Term: rs.currentTerm,
Command: command,
Index: len(rs.log) + 1,
}
rs.log = append(rs.log, entry)
fmt.Printf("[Server %d] Appended entry: %v\n", rs.id, entry)
return nil
}
func main() {
// Create a 5-node cluster
peers := []int{2, 3, 4, 5}
server := NewRaftServer(1, peers)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
fmt.Println("Starting Raft server simulation...")
server.Start(ctx)
<-ctx.Done()
fmt.Println("\nSimulation complete")
}
Part 2: Circuit Breaker State Transitions
The Circuit Breaker pattern prevents cascading failures by monitoring for failures and stopping requests to failing services. It acts like an electrical circuit breaker.
Circuit Breaker State Machine
stateDiagram-v2
[*] --> Closed
Closed --> Open: Failure threshold<br/>exceeded
Open --> HalfOpen: Reset timeout<br/>expires
HalfOpen --> Closed: Success threshold<br/>met
HalfOpen --> Open: Any failure occurs
Closed --> Closed: Success (reset<br/>failure count)
Closed --> Closed: Failure (increment<br/>failure count)
note right of Closed
Normal operation
✓ Requests pass through
✓ Failures counted
✓ Successes reset counter
end note
note right of Open
Failing fast
✗ Requests rejected immediately
⏱ Waiting for reset timeout
⚡ Return error instantly
end note
note right of HalfOpen
Testing recovery
⚠ Limited requests allowed
🔍 Monitoring success rate
⚖️ Deciding next state
end note
Circuit Breaker Decision Flow
flowchart TD
Start([Incoming Request]) --> CheckState{Current<br/>State?}
CheckState -->|Closed| AllowRequest[Allow Request]
CheckState -->|Open| CheckTimeout{Reset timeout<br/>expired?}
CheckState -->|Half-Open| CheckLimit{Under request<br/>limit?}
CheckTimeout -->|No| RejectFast[Reject Immediately<br/>Return Error]
CheckTimeout -->|Yes| TransitionHalf[Transition to<br/>Half-Open]
TransitionHalf --> AllowRequest
CheckLimit -->|No| RejectFast
CheckLimit -->|Yes| AllowRequest
AllowRequest --> Execute[Execute Protected<br/>Function]
Execute --> CheckResult{Success or<br/>Failure?}
CheckResult -->|Success| RecordSuccess[Record Success]
CheckResult -->|Failure| RecordFailure[Record Failure]
RecordSuccess --> EvalSuccess{State?}
RecordFailure --> EvalFailure{State?}
EvalSuccess -->|Closed| ResetCount[Reset Failure<br/>Count]
EvalSuccess -->|Half-Open| CheckSuccessThreshold{Success threshold<br/>reached?}
CheckSuccessThreshold -->|Yes| TransitionClosed[Transition to<br/>Closed]
CheckSuccessThreshold -->|No| Continue[Continue]
EvalFailure -->|Closed| IncrementCount[Increment Failure<br/>Count]
EvalFailure -->|Half-Open| TransitionOpen[Transition to<br/>Open]
IncrementCount --> CheckThreshold{Failures >=<br/>Threshold?}
CheckThreshold -->|Yes| TransitionOpenFromClosed[Transition to<br/>Open<br/>Set Reset Timer]
CheckThreshold -->|No| Continue
ResetCount --> End([Return Success])
TransitionClosed --> End
Continue --> End
TransitionOpen --> End2([Return Failure])
TransitionOpenFromClosed --> End2
RejectFast --> End2
Advanced Circuit Breaker with Metrics
package main
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
// State represents circuit breaker state
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
func (s State) String() string {
return []string{"CLOSED", "OPEN", "HALF_OPEN"}[s]
}
// Metrics tracks circuit breaker statistics
type Metrics struct {
TotalRequests int64
SuccessfulCalls int64
FailedCalls int64
RejectedCalls int64
StateChanges int64
LastFailureTime time.Time
}
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
mu sync.RWMutex
state State
failureCount int
successCount int
consecutiveFails int
lastStateChange time.Time
nextAttemptTime time.Time
// Configuration
maxFailures int
resetTimeout time.Duration
halfOpenMaxCalls int
failureThreshold float64
// Metrics
metrics *Metrics
// Callbacks
onStateChange func(from, to State)
}
// Config holds circuit breaker configuration
type Config struct {
MaxFailures int // Failures before opening
ResetTimeout time.Duration // Time to wait before half-open
HalfOpenMaxCalls int // Max calls in half-open state
OnStateChange func(from, to State)
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(config Config) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
maxFailures: config.MaxFailures,
resetTimeout: config.ResetTimeout,
halfOpenMaxCalls: config.HalfOpenMaxCalls,
onStateChange: config.OnStateChange,
metrics: &Metrics{},
lastStateChange: time.Now(),
}
}
// Execute runs the function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
atomic.AddInt64(&cb.metrics.TotalRequests, 1)
if !cb.allowRequest() {
atomic.AddInt64(&cb.metrics.RejectedCalls, 1)
return errors.New("circuit breaker is " + cb.GetState().String())
}
err := fn()
cb.recordResult(err)
return err
}
// allowRequest checks if request should be allowed
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
switch cb.state {
case StateClosed:
return true
case StateOpen:
if now.After(cb.nextAttemptTime) {
cb.changeState(StateHalfOpen)
cb.successCount = 0
cb.failureCount = 0
return true
}
return false
case StateHalfOpen:
calls := cb.successCount + cb.failureCount
return calls < cb.halfOpenMaxCalls
default:
return false
}
}
// recordResult records the result of a function call
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
atomic.AddInt64(&cb.metrics.FailedCalls, 1)
cb.onFailure()
} else {
atomic.AddInt64(&cb.metrics.SuccessfulCalls, 1)
cb.onSuccess()
}
}
// onFailure handles a failure
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.consecutiveFails++
cb.metrics.LastFailureTime = time.Now()
switch cb.state {
case StateClosed:
if cb.failureCount >= cb.maxFailures {
cb.changeState(StateOpen)
cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
}
case StateHalfOpen:
// Any failure in half-open returns to open
cb.changeState(StateOpen)
cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
}
}
// onSuccess handles a success
func (cb *CircuitBreaker) onSuccess() {
cb.consecutiveFails = 0
switch cb.state {
case StateClosed:
cb.failureCount = 0
case StateHalfOpen:
cb.successCount++
if cb.successCount >= cb.halfOpenMaxCalls {
cb.changeState(StateClosed)
cb.failureCount = 0
}
}
}
// changeState transitions the circuit breaker to a new state
func (cb *CircuitBreaker) changeState(newState State) {
if cb.state != newState {
oldState := cb.state
cb.state = newState
cb.lastStateChange = time.Now()
atomic.AddInt64(&cb.metrics.StateChanges, 1)
fmt.Printf("⚡ Circuit Breaker: %s → %s\n", oldState, newState)
if cb.onStateChange != nil {
go cb.onStateChange(oldState, newState)
}
}
}
// GetState returns the current state
func (cb *CircuitBreaker) GetState() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// GetMetrics returns current metrics
func (cb *CircuitBreaker) GetMetrics() Metrics {
cb.mu.RLock()
defer cb.mu.RUnlock()
return Metrics{
TotalRequests: atomic.LoadInt64(&cb.metrics.TotalRequests),
SuccessfulCalls: atomic.LoadInt64(&cb.metrics.SuccessfulCalls),
FailedCalls: atomic.LoadInt64(&cb.metrics.FailedCalls),
RejectedCalls: atomic.LoadInt64(&cb.metrics.RejectedCalls),
StateChanges: atomic.LoadInt64(&cb.metrics.StateChanges),
LastFailureTime: cb.metrics.LastFailureTime,
}
}
// HealthCheck returns health information
func (cb *CircuitBreaker) HealthCheck() map[string]interface{} {
cb.mu.RLock()
defer cb.mu.RUnlock()
metrics := cb.GetMetrics()
var successRate float64
if metrics.TotalRequests > 0 {
successRate = float64(metrics.SuccessfulCalls) /
float64(metrics.TotalRequests) * 100
}
return map[string]interface{}{
"state": cb.state.String(),
"failure_count": cb.failureCount,
"consecutive_fails": cb.consecutiveFails,
"total_requests": metrics.TotalRequests,
"successful_calls": metrics.SuccessfulCalls,
"failed_calls": metrics.FailedCalls,
"rejected_calls": metrics.RejectedCalls,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
"last_state_change": cb.lastStateChange,
}
}
// Example: Unreliable service simulation
type UnreliableService struct {
failureRate float64
callCount int
mu sync.Mutex
}
func (us *UnreliableService) Call() error {
us.mu.Lock()
us.callCount++
count := us.callCount
us.mu.Unlock()
time.Sleep(50 * time.Millisecond)
// Simulate intermittent failures
if count%3 == 0 && count < 10 {
return errors.New("service temporarily unavailable")
}
return nil
}
func main() {
service := &UnreliableService{}
config := Config{
MaxFailures: 3,
ResetTimeout: 2 * time.Second,
HalfOpenMaxCalls: 2,
OnStateChange: func(from, to State) {
fmt.Printf("📊 State transition: %s → %s\n", from, to)
},
}
cb := NewCircuitBreaker(config)
fmt.Println("Starting Circuit Breaker test...\n")
// Test requests
for i := 1; i <= 15; i++ {
fmt.Printf("\n--- Request %d ---\n", i)
err := cb.Execute(func() error {
return service.Call()
})
if err != nil {
fmt.Printf("❌ Error: %v\n", err)
} else {
fmt.Printf("✅ Success\n")
}
health := cb.GetHealthCheck()
fmt.Printf("State: %s | Success Rate: %s\n",
health["state"], health["success_rate"])
if i == 10 {
fmt.Println("\n⏱ Waiting for reset timeout...")
time.Sleep(2100 * time.Millisecond)
} else {
time.Sleep(300 * time.Millisecond)
}
}
fmt.Println("\n" + strings.Repeat("=", 50))
fmt.Println("Final Health Report:")
health := cb.GetHealthCheck()
for key, value := range health {
fmt.Printf(" %s: %v\n", key, value)
}
}
Part 3: Load Balancing Algorithms
Load balancers distribute incoming requests across multiple servers to ensure no single server becomes overwhelmed.
Round-Robin Load Balancing
flowchart TD
Start([New Request]) --> GetServers[Get Available<br/>Server List]
GetServers --> CheckEmpty{Server list<br/>empty?}
CheckEmpty -->|Yes| Error[Return Error:<br/>No servers available]
CheckEmpty -->|No| GetCurrent[Get Current Index]
GetCurrent --> SelectServer[Select Server at<br/>Current Index]
SelectServer --> Increment[Increment Index:<br/>index = index + 1 % len]
Increment --> CheckHealth{Server<br/>healthy?}
CheckHealth -->|No| Skip[Skip to<br/>Next Server]
Skip --> GetCurrent
CheckHealth -->|Yes| SendRequest[Send Request<br/>to Server]
SendRequest --> LogMetrics[Update Metrics:<br/>- Request count<br/>- Response time]
LogMetrics --> Success([Return Response])
Error --> End([End])
Least Connections Algorithm
flowchart TD
Start([New Request]) --> Init[Initialize:<br/>minConn = ∞<br/>selectedServer = nil]
Init --> GetServers[Get All Active<br/>Servers]
GetServers --> IterateStart{For each<br/>server}
IterateStart --> CheckHealth{Server<br/>healthy?}
CheckHealth -->|No| NextServer[Continue to<br/>next server]
CheckHealth -->|Yes| GetConnCount[Get Active<br/>Connection Count]
GetConnCount --> Compare{connCount <<br/>minConn?}
Compare -->|Yes| UpdateMin[minConn = connCount<br/>selectedServer = current]
Compare -->|No| NextServer
UpdateMin --> NextServer
NextServer --> MoreServers{More<br/>servers?}
MoreServers -->|Yes| IterateStart
MoreServers -->|No| CheckSelected{selectedServer<br/>!= nil?}
CheckSelected -->|No| NoServer[Return Error:<br/>No healthy servers]
CheckSelected -->|Yes| IncrementConn[Increment server's<br/>active connection count]
IncrementConn --> SendRequest[Send Request<br/>to Selected Server]
SendRequest --> DecrementConn[On completion:<br/>Decrement connection<br/>count]
DecrementConn --> Success([Return Response])
NoServer --> End([End])
Consistent Hashing
flowchart TD
Start([New Request]) --> Extract[Extract Key<br/>e.g., user ID,<br/>session ID]
Extract --> Hash[Hash Key:<br/>hash = hashFunc]
Hash --> FindRing[Find position<br/>on hash ring]
FindRing --> SearchCW[Search clockwise<br/>on ring for first<br/>virtual node]
SearchCW --> MapNode[Map virtual node<br/>to physical server]
MapNode --> CheckServer{Server<br/>available?}
CheckServer -->|No| FindNext[Continue clockwise<br/>to next virtual node]
FindNext --> MapNode
CheckServer -->|Yes| SendRequest[Send Request<br/>to Server]
SendRequest --> UpdateCache[Update local cache:<br/>key → server mapping]
UpdateCache --> Success([Return Response])
Note1[Hash Ring Visualization:<br/>0° ─ Server A vnode 1<br/>72° ─ Server B vnode 1<br/>144° ─ Server A vnode 2<br/>216° ─ Server C vnode 1<br/>288° ─ Server B vnode 2]
style Note1 fill:#f9f,stroke:#333,stroke-width:2px
Load Balancing Implementation in Go
package main
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
)
// Server represents a backend server
type Server struct {
ID string
Address string
Healthy bool
ActiveConns int32
TotalRequests int64
FailedRequests int64
LastHealthCheck time.Time
}
// ===== Round-Robin Load Balancer =====
// RoundRobinLB implements round-robin load balancing
type RoundRobinLB struct {
servers []*Server
current uint32
mu sync.RWMutex
}
// NewRoundRobinLB creates a new round-robin load balancer
func NewRoundRobinLB(servers []*Server) *RoundRobinLB {
return &RoundRobinLB{
servers: servers,
current: 0,
}
}
// NextServer returns the next server using round-robin
func (rr *RoundRobinLB) NextServer() (*Server, error) {
rr.mu.RLock()
defer rr.mu.RUnlock()
if len(rr.servers) == 0 {
return nil, errors.New("no servers available")
}
// Try each server once
for i := 0; i < len(rr.servers); i++ {
index := atomic.AddUint32(&rr.current, 1) % uint32(len(rr.servers))
server := rr.servers[index]
if server.Healthy {
return server, nil
}
}
return nil, errors.New("no healthy servers available")
}
// ===== Least Connections Load Balancer =====
// LeastConnectionsLB implements least connections load balancing
type LeastConnectionsLB struct {
servers []*Server
mu sync.RWMutex
}
// NewLeastConnectionsLB creates a new least connections load balancer
func NewLeastConnectionsLB(servers []*Server) *LeastConnectionsLB {
return &LeastConnectionsLB{
servers: servers,
}
}
// NextServer returns the server with least active connections
func (lc *LeastConnectionsLB) NextServer() (*Server, error) {
lc.mu.RLock()
defer lc.mu.RUnlock()
var selected *Server
minConns := int32(1<<31 - 1) // Max int32
for _, server := range lc.servers {
if !server.Healthy {
continue
}
conns := atomic.LoadInt32(&server.ActiveConns)
if conns < minConns {
minConns = conns
selected = server
}
}
if selected == nil {
return nil, errors.New("no healthy servers available")
}
return selected, nil
}
// ===== Consistent Hashing Load Balancer =====
// hashRing represents a consistent hash ring
type hashRing struct {
nodes map[uint32]string // hash -> server ID
sortedKeys []uint32
servers map[string]*Server
replicas int
mu sync.RWMutex
}
// NewHashRing creates a new consistent hash ring
func NewHashRing(servers []*Server, replicas int) *hashRing {
hr := &hashRing{
nodes: make(map[uint32]string),
servers: make(map[string]*Server),
replicas: replicas,
}
for _, server := range servers {
hr.AddServer(server)
}
return hr
}
// AddServer adds a server to the hash ring
func (hr *hashRing) AddServer(server *Server) {
hr.mu.Lock()
defer hr.mu.Unlock()
hr.servers[server.ID] = server
// Add virtual nodes
for i := 0; i < hr.replicas; i++ {
hash := hr.hashKey(fmt.Sprintf("%s:%d", server.ID, i))
hr.nodes[hash] = server.ID
hr.sortedKeys = append(hr.sortedKeys, hash)
}
sort.Slice(hr.sortedKeys, func(i, j int) bool {
return hr.sortedKeys[i] < hr.sortedKeys[j]
})
}
// GetServer returns a server for the given key
func (hr *hashRing) GetServer(key string) (*Server, error) {
hr.mu.RLock()
defer hr.mu.RUnlock()
if len(hr.nodes) == 0 {
return nil, errors.New("no servers in ring")
}
hash := hr.hashKey(key)
// Binary search for first node >= hash
idx := sort.Search(len(hr.sortedKeys), func(i int) bool {
return hr.sortedKeys[i] >= hash
})
// Wrap around if necessary
if idx == len(hr.sortedKeys) {
idx = 0
}
// Find first healthy server
for i := 0; i < len(hr.sortedKeys); i++ {
nodeHash := hr.sortedKeys[(idx+i)%len(hr.sortedKeys)]
serverID := hr.nodes[nodeHash]
server := hr.servers[serverID]
if server.Healthy {
return server, nil
}
}
return nil, errors.New("no healthy servers available")
}
// hashKey generates a hash for a key
func (hr *hashRing) hashKey(key string) uint32 {
h := sha256.Sum256([]byte(key))
return binary.BigEndian.Uint32(h[:4])
}
// ===== Demo =====
func simulateRequest(server *Server, requestID int) error {
atomic.AddInt32(&server.ActiveConns, 1)
atomic.AddInt64(&server.TotalRequests, 1)
defer atomic.AddInt32(&server.ActiveConns, -1)
// Simulate processing time
time.Sleep(time.Duration(50+requestID%100) * time.Millisecond)
fmt.Printf(" ✓ Request %d handled by %s (active: %d)\n",
requestID, server.ID, atomic.LoadInt32(&server.ActiveConns))
return nil
}
func main() {
// Create servers
servers := []*Server{
{ID: "server-1", Address: "192.168.1.1:8080", Healthy: true},
{ID: "server-2", Address: "192.168.1.2:8080", Healthy: true},
{ID: "server-3", Address: "192.168.1.3:8080", Healthy: true},
}
fmt.Println("========== Round-Robin Load Balancer ==========")
rrLB := NewRoundRobinLB(servers)
for i := 1; i <= 6; i++ {
server, err := rrLB.NextServer()
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Request %d → %s\n", i, server.ID)
}
fmt.Println("\n========== Least Connections Load Balancer ==========")
lcLB := NewLeastConnectionsLB(servers)
var wg sync.WaitGroup
for i := 1; i <= 9; i++ {
wg.Add(1)
go func(reqID int) {
defer wg.Done()
server, err := lcLB.NextServer()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Request %d → %s\n", reqID, server.ID)
simulateRequest(server, reqID)
}(i)
time.Sleep(20 * time.Millisecond)
}
wg.Wait()
fmt.Println("\n========== Consistent Hashing Load Balancer ==========")
chLB := NewHashRing(servers, 3) // 3 virtual nodes per server
keys := []string{
"user:12345",
"user:67890",
"session:abc123",
"user:12345", // Same key should go to same server
"session:def456",
"user:67890", // Same key should go to same server
}
for _, key := range keys {
server, err := chLB.GetServer(key)
if err != nil {
fmt.Printf("Error for key %s: %v\n", key, err)
continue
}
fmt.Printf("Key %-20s → %s\n", key, server.ID)
}
// Show final statistics
fmt.Println("\n========== Server Statistics ==========")
for _, server := range servers {
fmt.Printf("%s: Total Requests=%d, Active Connections=%d\n",
server.ID,
atomic.LoadInt64(&server.TotalRequests),
atomic.LoadInt32(&server.ActiveConns))
}
}
Part 4: Message Queue Patterns
Message queues enable asynchronous communication between services, decoupling producers from consumers.
Publish-Subscribe (Pub/Sub) Pattern
sequenceDiagram
participant P1 as Publisher 1
participant P2 as Publisher 2
participant B as Message Broker
participant S1 as Subscriber 1
participant S2 as Subscriber 2
participant S3 as Subscriber 3
Note over B: Topic: "user.events"
S1->>B: Subscribe to "user.events"
S2->>B: Subscribe to "user.events"
S3->>B: Subscribe to "user.events"
Note over S1,S3: All subscribers waiting
P1->>B: Publish: {type: "user.created", id: 123}
Note over B: Fanout to all<br/>subscribers
par Broadcast to all subscribers
B->>S1: Message: user.created #123
B->>S2: Message: user.created #123
B->>S3: Message: user.created #123
end
S1-->>B: Ack
S2-->>B: Ack
S3-->>B: Ack
P2->>B: Publish: {type: "user.updated", id: 123}
par Broadcast again
B->>S1: Message: user.updated #123
B->>S2: Message: user.updated #123
B->>S3: Message: user.updated #123
end
Note over S1,S3: Each subscriber receives<br/>ALL messages
Request-Reply Pattern
sequenceDiagram
participant C as Client
participant RQ as Request Queue
participant W as Worker
participant RespQ as Response Queue
Note over C: Need to process<br/>complex calculation
C->>C: Generate correlation ID<br/>correlationID = "req-12345"
C->>RQ: Send Request {<br/> correlationID: "req-12345",<br/> replyTo: "response.client1",<br/> data: {x: 100, y: 200}<br/>}
Note over C: Waiting for response on<br/>"response.client1"
W->>RQ: Poll for messages
RQ->>W: Deliver Request
Note over W: Process request<br/>result = x + y = 300
W->>RespQ: Send to "response.client1" {<br/> correlationID: "req-12345",<br/> status: "success",<br/> result: 300<br/>}
RespQ->>C: Deliver Response
C->>C: Match correlationID<br/>"req-12345"
Note over C: Response received!<br/>result = 300
C-->>RespQ: Ack message
Push-Pull (Work Queue) Pattern
sequenceDiagram
participant P as Producer
participant Q as Work Queue
participant W1 as Worker 1
participant W2 as Worker 2
participant W3 as Worker 3
Note over P: Generate tasks
par Produce tasks
P->>Q: Task 1: Process image A
P->>Q: Task 2: Process image B
P->>Q: Task 3: Process image C
P->>Q: Task 4: Process image D
P->>Q: Task 5: Process image E
P->>Q: Task 6: Process image F
end
Note over Q: Tasks queued<br/>Waiting for workers
par Workers pull tasks
W1->>Q: Pull task
W2->>Q: Pull task
W3->>Q: Pull task
end
Q->>W1: Task 1
Q->>W2: Task 2
Q->>W3: Task 3
Note over W1: Processing Task 1...
Note over W2: Processing Task 2...
Note over W3: Processing Task 3...
W2->>W2: Complete Task 2
W2-->>Q: Ack Task 2
W2->>Q: Pull next task
Q->>W2: Task 4
Note over W2: Processing Task 4...
W1->>W1: Complete Task 1
W1-->>Q: Ack Task 1
W1->>Q: Pull next task
Q->>W1: Task 5
W3->>W3: Complete Task 3
W3-->>Q: Ack Task 3
W3->>Q: Pull next task
Q->>W3: Task 6
Note over W1,W3: Load balanced<br/>across workers
Message Queue Patterns in Go
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ===== Pub/Sub Pattern =====
// Message represents a pub/sub message
type Message struct {
Topic string
Payload interface{}
Timestamp time.Time
}
// Subscriber is a function that handles messages
type Subscriber func(Message)
// PubSubBroker implements publish-subscribe pattern
type PubSubBroker struct {
mu sync.RWMutex
subscribers map[string][]Subscriber
}
// NewPubSubBroker creates a new pub/sub broker
func NewPubSubBroker() *PubSubBroker {
return &PubSubBroker{
subscribers: make(map[string][]Subscriber),
}
}
// Subscribe registers a subscriber for a topic
func (psb *PubSubBroker) Subscribe(topic string, handler Subscriber) {
psb.mu.Lock()
defer psb.mu.Unlock()
psb.subscribers[topic] = append(psb.subscribers[topic], handler)
fmt.Printf("📬 New subscriber for topic: %s\n", topic)
}
// Publish sends a message to all subscribers of a topic
func (psb *PubSubBroker) Publish(topic string, payload interface{}) {
psb.mu.RLock()
defer psb.mu.RUnlock()
message := Message{
Topic: topic,
Payload: payload,
Timestamp: time.Now(),
}
subscribers := psb.subscribers[topic]
fmt.Printf("📤 Publishing to %s: %d subscribers\n", topic, len(subscribers))
for _, subscriber := range subscribers {
go subscriber(message) // Async delivery
}
}
// ===== Request-Reply Pattern =====
// Request represents a request message
type Request struct {
CorrelationID string
ReplyTo string
Data interface{}
}
// Response represents a response message
type Response struct {
CorrelationID string
Status string
Result interface{}
Error error
}
// RequestReplyClient handles request-reply communication
type RequestReplyClient struct {
requestQueue chan Request
responseChans map[string]chan Response
mu sync.RWMutex
}
// NewRequestReplyClient creates a new request-reply client
func NewRequestReplyClient() *RequestReplyClient {
return &RequestReplyClient{
requestQueue: make(chan Request, 100),
responseChans: make(map[string]chan Response),
}
}
// SendRequest sends a request and waits for response
func (rrc *RequestReplyClient) SendRequest(ctx context.Context, data interface{}) (*Response, error) {
correlationID := fmt.Sprintf("req-%d", time.Now().UnixNano())
responseChan := make(chan Response, 1)
rrc.mu.Lock()
rrc.responseChans[correlationID] = responseChan
rrc.mu.Unlock()
defer func() {
rrc.mu.Lock()
delete(rrc.responseChans, correlationID)
rrc.mu.Unlock()
}()
request := Request{
CorrelationID: correlationID,
ReplyTo: "response-queue",
Data: data,
}
fmt.Printf("📨 Sending request: %s\n", correlationID)
select {
case rrc.requestQueue <- request:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case response := <-responseChan:
fmt.Printf("📬 Received response: %s\n", correlationID)
return &response, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// HandleResponse processes incoming responses
func (rrc *RequestReplyClient) HandleResponse(response Response) {
rrc.mu.RLock()
responseChan, exists := rrc.responseChans[response.CorrelationID]
rrc.mu.RUnlock()
if exists {
select {
case responseChan <- response:
default:
}
}
}
// Worker processes requests
type Worker struct {
ID int
requestQueue chan Request
client *RequestReplyClient
}
// Start begins processing requests
func (w *Worker) Start(ctx context.Context) {
go func() {
for {
select {
case request := <-w.requestQueue:
fmt.Printf("⚙️ Worker %d processing: %s\n",
w.ID, request.CorrelationID)
// Simulate processing
time.Sleep(100 * time.Millisecond)
response := Response{
CorrelationID: request.CorrelationID,
Status: "success",
Result: fmt.Sprintf("Processed by worker %d", w.ID),
}
w.client.HandleResponse(response)
case <-ctx.Done():
return
}
}
}()
}
// ===== Work Queue (Push-Pull) Pattern =====
// Task represents a unit of work
type Task struct {
ID int
Data interface{}
Retries int
}
// WorkQueue implements work queue pattern
type WorkQueue struct {
tasks chan Task
results chan interface{}
workers int
}
// NewWorkQueue creates a new work queue
func NewWorkQueue(bufferSize, numWorkers int) *WorkQueue {
return &WorkQueue{
tasks: make(chan Task, bufferSize),
results: make(chan interface{}, bufferSize),
workers: numWorkers,
}
}
// Start starts all workers
func (wq *WorkQueue) Start(ctx context.Context, processFn func(Task) interface{}) {
for i := 0; i < wq.workers; i++ {
go wq.worker(ctx, i, processFn)
}
}
// worker processes tasks from the queue
func (wq *WorkQueue) worker(ctx context.Context, id int, processFn func(Task) interface{}) {
fmt.Printf("👷 Worker %d started\n", id)
for {
select {
case task := <-wq.tasks:
fmt.Printf("👷 Worker %d processing task %d\n", id, task.ID)
result := processFn(task)
select {
case wq.results <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
fmt.Printf("👷 Worker %d stopped\n", id)
return
}
}
}
// Submit adds a task to the queue
func (wq *WorkQueue) Submit(task Task) {
wq.tasks <- task
}
// Results returns the results channel
func (wq *WorkQueue) Results() <-chan interface{} {
return wq.results
}
// ===== Demo =====
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// ===== Pub/Sub Demo =====
fmt.Println("========== Pub/Sub Pattern ==========\n")
broker := NewPubSubBroker()
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 📧 Email Service: %v\n", msg.Payload)
})
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 📊 Analytics: %v\n", msg.Payload)
})
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 💾 Database: %v\n", msg.Payload)
})
broker.Publish("user.events", "User [email protected] registered")
time.Sleep(100 * time.Millisecond)
broker.Publish("user.events", "User [email protected] logged in")
time.Sleep(100 * time.Millisecond)
// ===== Request-Reply Demo =====
fmt.Println("\n========== Request-Reply Pattern ==========\n")
rrClient := NewRequestReplyClient()
// Start workers
for i := 1; i <= 3; i++ {
worker := &Worker{
ID: i,
requestQueue: rrClient.requestQueue,
client: rrClient,
}
worker.Start(ctx)
}
// Send requests
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
response, err := rrClient.SendRequest(ctx,
fmt.Sprintf("Calculate %d", id))
if err != nil {
fmt.Printf("❌ Request %d failed: %v\n", id, err)
return
}
fmt.Printf("✅ Request %d: %v\n", id, response.Result)
}(i)
}
wg.Wait()
time.Sleep(200 * time.Millisecond)
// ===== Work Queue Demo =====
fmt.Println("\n========== Work Queue Pattern ==========\n")
workQueue := NewWorkQueue(10, 3)
processFn := func(task Task) interface{} {
// Simulate work
time.Sleep(time.Duration(50+task.ID*10) * time.Millisecond)
return fmt.Sprintf("Task %d completed", task.ID)
}
workQueue.Start(ctx, processFn)
// Submit tasks
go func() {
for i := 1; i <= 6; i++ {
workQueue.Submit(Task{
ID: i,
Data: fmt.Sprintf("Process item %d", i),
})
}
}()
// Collect results
go func() {
for result := range workQueue.Results() {
fmt.Printf("✅ Result: %v\n", result)
}
}()
time.Sleep(1 * time.Second)
fmt.Println("\n========== Demo Complete ==========")
}
Comparison and When to Use Each Pattern
Consensus Algorithms (Raft)
Use When:
- You need strong consistency across distributed nodes
- Building distributed databases or configuration stores
- Implementing leader election in clustered systems
- Replicating state machines
Trade-offs:
- ✅ Strong consistency guarantees
- ✅ Easier to understand than Paxos
- ❌ Requires majority of nodes to be available
- ❌ Higher latency due to consensus overhead
Circuit Breakers
Use When:
- Making calls to external services or microservices
- Protecting against cascading failures
- Need graceful degradation
- Want to fail fast instead of waiting for timeouts
Trade-offs:
- ✅ Prevents cascading failures
- ✅ Faster failure detection
- ✅ Gives failing services time to recover
- ❌ Requires careful threshold tuning
- ❌ Can reject valid requests during recovery
Load Balancing
Round-Robin:
- Simple, even distribution
- Good for homogeneous backends
- No state required
Least Connections:
- Better for variable request durations
- Prevents overloading slow servers
- Requires connection tracking
Consistent Hashing:
- Maintains sticky sessions
- Minimal remapping when nodes change
- Great for caching scenarios
Message Queue Patterns
Pub/Sub:
- Broadcasting events to multiple consumers
- Loosely coupled event-driven architecture
- Real-time notifications
Request-Reply:
- Synchronous-style communication over async transport
- RPC-like behavior with message queues
- Can implement timeouts and retries
Push-Pull (Work Queue):
- Task distribution among workers
- Load balancing computational work
- Parallel processing pipelines
Conclusion
Understanding these distributed systems patterns is crucial for building scalable, resilient applications. Each pattern solves specific problems:
- Raft ensures data consistency across distributed nodes
- Circuit Breakers protect against cascading failures
- Load Balancers distribute traffic efficiently
- Message Queues enable asynchronous, decoupled communication
The visual diagrams in this guide help illustrate complex interactions, making these patterns easier to understand and implement. Choose patterns based on your specific requirements for consistency, availability, partition tolerance, and performance.
Further Reading
- Raft Consensus Algorithm Paper
- Release It! - Design Patterns for Production Systems
- Designing Data-Intensive Applications
- Enterprise Integration Patterns
Have questions or want to discuss distributed systems patterns? Feel free to reach out!