Introduction
Building robust distributed systems requires understanding fundamental patterns that solve common challenges like consensus, fault tolerance, request distribution, and asynchronous communication. This comprehensive guide uses visual diagrams to illustrate how these patterns work, making complex distributed systems concepts easier to understand and implement.
We’ll explore:
- Raft Consensus Algorithm: How distributed systems agree on shared state
- Circuit Breaker Pattern: Preventing cascading failures in microservices
- Load Balancing Algorithms: Distributing traffic efficiently across servers
- Message Queue Patterns: Asynchronous communication strategies
Part 1: Raft Consensus Algorithm
The Raft consensus algorithm ensures that a cluster of servers maintains a consistent, replicated log even in the face of failures. It’s designed to be more understandable than Paxos while providing the same guarantees.
Raft Server States
Every server in a Raft cluster is in one of three states:
or new term Candidate --> Candidate: Split vote,
new election Leader --> Follower: Discovers server
with higher term note right of Follower - Responds to RPCs from
candidates and leaders - If election timeout,
becomes candidate end note note right of Candidate - Requests votes from
other servers - Becomes leader if
receives majority end note note right of Leader - Handles all client requests - Sends heartbeats to
maintain authority - Replicates log entries end note
Leader Election Process
When a follower doesn’t receive heartbeats from a leader, it starts an election:
No heartbeat received F1->>F1: Increment term to 2
Transition to Candidate par Request votes F1->>F2: RequestVote(term=2) F1->>F3: RequestVote(term=2) F1->>F4: RequestVote(term=2) F1->>F5: RequestVote(term=2) end F2-->>F1: Vote granted F3-->>F1: Vote granted F4-->>F1: Vote granted Note over F1: Received 4/5 votes
(including self)
Majority achieved! F1->>F1: Transition to Leader par Send heartbeats F1->>F2: AppendEntries(heartbeat) F1->>F3: AppendEntries(heartbeat) F1->>F4: AppendEntries(heartbeat) F1->>F5: AppendEntries(heartbeat) end Note over F1,F5: Term 2: F1 is now Leader
Log Replication Flow
Once elected, the leader handles client requests and replicates entries to followers:
(uncommitted) L->>L: Log[7] = SET x=5 par Replicate to followers L->>F1: AppendEntries(entry=SET x=5, prevLogIndex=6) L->>F2: AppendEntries(entry=SET x=5, prevLogIndex=6) L->>F3: AppendEntries(entry=SET x=5, prevLogIndex=6) end F1->>F1: Append Log[7] = SET x=5 F1-->>L: Success F2->>F2: Append Log[7] = SET x=5 F2-->>L: Success Note over L: Received majority
(2/3 followers + leader)
Entry is COMMITTED L->>L: Commit Log[7]
Apply to state machine
x = 5 L-->>C: Success: x=5 committed par Notify followers of commit L->>F1: AppendEntries(leaderCommit=7) L->>F2: AppendEntries(leaderCommit=7) L->>F3: AppendEntries(leaderCommit=7) end F1->>F1: Commit Log[7]
Apply: x=5 F2->>F2: Commit Log[7]
Apply: x=5 Note over L,F3: All servers have
committed x=5
Raft Consensus in Go
Here’s a simplified implementation of core Raft concepts:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// ServerState represents the state of a Raft server
type ServerState int
const (
Follower ServerState = iota
Candidate
Leader
)
func (s ServerState) String() string {
switch s {
case Follower:
return "FOLLOWER"
case Candidate:
return "CANDIDATE"
case Leader:
return "LEADER"
default:
return "UNKNOWN"
}
}
// LogEntry represents a single entry in the replicated log
type LogEntry struct {
Term int
Command string
Index int
}
// RaftServer represents a single server in a Raft cluster
type RaftServer struct {
mu sync.RWMutex
// Persistent state
currentTerm int
votedFor *int
log []LogEntry
// Volatile state
state ServerState
commitIndex int
lastApplied int
// Leader state
nextIndex map[int]int
matchIndex map[int]int
// Configuration
id int
peers []int
// Channels
heartbeat chan bool
election chan bool
stopCh chan struct{}
}
// NewRaftServer creates a new Raft server
func NewRaftServer(id int, peers []int) *RaftServer {
rs := &RaftServer{
id: id,
peers: peers,
currentTerm: 0,
state: Follower,
log: make([]LogEntry, 0),
nextIndex: make(map[int]int),
matchIndex: make(map[int]int),
heartbeat: make(chan bool, 10),
election: make(chan bool, 10),
stopCh: make(chan struct{}),
}
return rs
}
// Start begins the Raft server's main loop
func (rs *RaftServer) Start(ctx context.Context) {
go rs.run(ctx)
}
// run is the main server loop
func (rs *RaftServer) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-rs.stopCh:
return
default:
rs.mu.RLock()
state := rs.state
rs.mu.RUnlock()
switch state {
case Follower:
rs.runFollower(ctx)
case Candidate:
rs.runCandidate(ctx)
case Leader:
rs.runLeader(ctx)
}
}
}
}
// runFollower handles follower state logic
func (rs *RaftServer) runFollower(ctx context.Context) {
timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
select {
case <-rs.heartbeat:
// Received heartbeat, stay as follower
fmt.Printf("[Server %d] Received heartbeat in term %d\n", rs.id, rs.currentTerm)
case <-time.After(timeout):
// Election timeout, become candidate
fmt.Printf("[Server %d] Election timeout, becoming candidate\n", rs.id)
rs.mu.Lock()
rs.state = Candidate
rs.mu.Unlock()
case <-ctx.Done():
return
}
}
// runCandidate handles candidate state logic
func (rs *RaftServer) runCandidate(ctx context.Context) {
rs.mu.Lock()
rs.currentTerm++
rs.votedFor = &rs.id
currentTerm := rs.currentTerm
rs.mu.Unlock()
fmt.Printf("[Server %d] Starting election for term %d\n", rs.id, currentTerm)
votes := 1 // Vote for self
majority := (len(rs.peers)+1)/2 + 1
// Request votes from peers
voteCh := make(chan bool, len(rs.peers))
for _, peer := range rs.peers {
go func(peerID int) {
// Simulate RequestVote RPC
granted := rs.requestVote(peerID, currentTerm)
voteCh <- granted
}(peer)
}
timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
timer := time.After(timeout)
for votes < majority {
select {
case granted := <-voteCh:
if granted {
votes++
fmt.Printf("[Server %d] Received vote, total: %d/%d\n",
rs.id, votes, majority)
}
case <-rs.heartbeat:
// Discovered current leader, step down
fmt.Printf("[Server %d] Discovered leader, stepping down\n", rs.id)
rs.mu.Lock()
rs.state = Follower
rs.mu.Unlock()
return
case <-timer:
// Election timeout, start new election
fmt.Printf("[Server %d] Election timeout, restarting\n", rs.id)
return
case <-ctx.Done():
return
}
}
// Won election
fmt.Printf("[Server %d] WON ELECTION for term %d with %d votes\n",
rs.id, currentTerm, votes)
rs.mu.Lock()
rs.state = Leader
rs.mu.Unlock()
}
// runLeader handles leader state logic
func (rs *RaftServer) runLeader(ctx context.Context) {
fmt.Printf("[Server %d] Now LEADER for term %d\n", rs.id, rs.currentTerm)
// Initialize leader state
rs.mu.Lock()
for _, peer := range rs.peers {
rs.nextIndex[peer] = len(rs.log) + 1
rs.matchIndex[peer] = 0
}
rs.mu.Unlock()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rs.mu.RLock()
if rs.state != Leader {
rs.mu.RUnlock()
return
}
rs.mu.RUnlock()
// Send heartbeats to all followers
for _, peer := range rs.peers {
go rs.sendHeartbeat(peer)
}
case <-ctx.Done():
return
}
}
}
// requestVote simulates a RequestVote RPC
func (rs *RaftServer) requestVote(peerID, term int) bool {
// Simulate network delay
time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond)
// Simulate vote decision (simplified)
// In real implementation, this would check log consistency
return rand.Float32() > 0.3 // 70% chance of granting vote
}
// sendHeartbeat sends a heartbeat to a follower
func (rs *RaftServer) sendHeartbeat(peerID int) {
rs.mu.RLock()
term := rs.currentTerm
rs.mu.RUnlock()
// In real implementation, this would be an AppendEntries RPC
fmt.Printf("[Server %d] Sending heartbeat to Server %d (term %d)\n",
rs.id, peerID, term)
}
// AppendEntry adds a new entry to the log (leader only)
func (rs *RaftServer) AppendEntry(command string) error {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.state != Leader {
return fmt.Errorf("not leader")
}
entry := LogEntry{
Term: rs.currentTerm,
Command: command,
Index: len(rs.log) + 1,
}
rs.log = append(rs.log, entry)
fmt.Printf("[Server %d] Appended entry: %v\n", rs.id, entry)
return nil
}
func main() {
// Create a 5-node cluster
peers := []int{2, 3, 4, 5}
server := NewRaftServer(1, peers)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
fmt.Println("Starting Raft server simulation...")
server.Start(ctx)
<-ctx.Done()
fmt.Println("\nSimulation complete")
}
Part 2: Circuit Breaker State Transitions
The Circuit Breaker pattern prevents cascading failures by monitoring for failures and stopping requests to failing services. It acts like an electrical circuit breaker.
Circuit Breaker State Machine
exceeded Open --> HalfOpen: Reset timeout
expires HalfOpen --> Closed: Success threshold
met HalfOpen --> Open: Any failure occurs Closed --> Closed: Success (reset
failure count) Closed --> Closed: Failure (increment
failure count) note right of Closed Normal operation ✓ Requests pass through ✓ Failures counted ✓ Successes reset counter end note note right of Open Failing fast ✗ Requests rejected immediately ⏱ Waiting for reset timeout ⚡ Return error instantly end note note right of HalfOpen Testing recovery ⚠ Limited requests allowed 🔍 Monitoring success rate ⚖️ Deciding next state end note
Circuit Breaker Decision Flow
State?} CheckState -->|Closed| AllowRequest[Allow Request] CheckState -->|Open| CheckTimeout{Reset timeout
expired?} CheckState -->|Half-Open| CheckLimit{Under request
limit?} CheckTimeout -->|No| RejectFast[Reject Immediately
Return Error] CheckTimeout -->|Yes| TransitionHalf[Transition to
Half-Open] TransitionHalf --> AllowRequest CheckLimit -->|No| RejectFast CheckLimit -->|Yes| AllowRequest AllowRequest --> Execute[Execute Protected
Function] Execute --> CheckResult{Success or
Failure?} CheckResult -->|Success| RecordSuccess[Record Success] CheckResult -->|Failure| RecordFailure[Record Failure] RecordSuccess --> EvalSuccess{State?} RecordFailure --> EvalFailure{State?} EvalSuccess -->|Closed| ResetCount[Reset Failure
Count] EvalSuccess -->|Half-Open| CheckSuccessThreshold{Success threshold
reached?} CheckSuccessThreshold -->|Yes| TransitionClosed[Transition to
Closed] CheckSuccessThreshold -->|No| Continue[Continue] EvalFailure -->|Closed| IncrementCount[Increment Failure
Count] EvalFailure -->|Half-Open| TransitionOpen[Transition to
Open] IncrementCount --> CheckThreshold{Failures >=
Threshold?} CheckThreshold -->|Yes| TransitionOpenFromClosed[Transition to
Open
Set Reset Timer] CheckThreshold -->|No| Continue ResetCount --> End([Return Success]) TransitionClosed --> End Continue --> End TransitionOpen --> End2([Return Failure]) TransitionOpenFromClosed --> End2 RejectFast --> End2
Advanced Circuit Breaker with Metrics
package main
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
// State represents circuit breaker state
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
func (s State) String() string {
return []string{"CLOSED", "OPEN", "HALF_OPEN"}[s]
}
// Metrics tracks circuit breaker statistics
type Metrics struct {
TotalRequests int64
SuccessfulCalls int64
FailedCalls int64
RejectedCalls int64
StateChanges int64
LastFailureTime time.Time
}
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
mu sync.RWMutex
state State
failureCount int
successCount int
consecutiveFails int
lastStateChange time.Time
nextAttemptTime time.Time
// Configuration
maxFailures int
resetTimeout time.Duration
halfOpenMaxCalls int
failureThreshold float64
// Metrics
metrics *Metrics
// Callbacks
onStateChange func(from, to State)
}
// Config holds circuit breaker configuration
type Config struct {
MaxFailures int // Failures before opening
ResetTimeout time.Duration // Time to wait before half-open
HalfOpenMaxCalls int // Max calls in half-open state
OnStateChange func(from, to State)
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(config Config) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
maxFailures: config.MaxFailures,
resetTimeout: config.ResetTimeout,
halfOpenMaxCalls: config.HalfOpenMaxCalls,
onStateChange: config.OnStateChange,
metrics: &Metrics{},
lastStateChange: time.Now(),
}
}
// Execute runs the function with circuit breaker protection
func (cb *CircuitBreaker) Execute(fn func() error) error {
atomic.AddInt64(&cb.metrics.TotalRequests, 1)
if !cb.allowRequest() {
atomic.AddInt64(&cb.metrics.RejectedCalls, 1)
return errors.New("circuit breaker is " + cb.GetState().String())
}
err := fn()
cb.recordResult(err)
return err
}
// allowRequest checks if request should be allowed
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
switch cb.state {
case StateClosed:
return true
case StateOpen:
if now.After(cb.nextAttemptTime) {
cb.changeState(StateHalfOpen)
cb.successCount = 0
cb.failureCount = 0
return true
}
return false
case StateHalfOpen:
calls := cb.successCount + cb.failureCount
return calls < cb.halfOpenMaxCalls
default:
return false
}
}
// recordResult records the result of a function call
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
atomic.AddInt64(&cb.metrics.FailedCalls, 1)
cb.onFailure()
} else {
atomic.AddInt64(&cb.metrics.SuccessfulCalls, 1)
cb.onSuccess()
}
}
// onFailure handles a failure
func (cb *CircuitBreaker) onFailure() {
cb.failureCount++
cb.consecutiveFails++
cb.metrics.LastFailureTime = time.Now()
switch cb.state {
case StateClosed:
if cb.failureCount >= cb.maxFailures {
cb.changeState(StateOpen)
cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
}
case StateHalfOpen:
// Any failure in half-open returns to open
cb.changeState(StateOpen)
cb.nextAttemptTime = time.Now().Add(cb.resetTimeout)
}
}
// onSuccess handles a success
func (cb *CircuitBreaker) onSuccess() {
cb.consecutiveFails = 0
switch cb.state {
case StateClosed:
cb.failureCount = 0
case StateHalfOpen:
cb.successCount++
if cb.successCount >= cb.halfOpenMaxCalls {
cb.changeState(StateClosed)
cb.failureCount = 0
}
}
}
// changeState transitions the circuit breaker to a new state
func (cb *CircuitBreaker) changeState(newState State) {
if cb.state != newState {
oldState := cb.state
cb.state = newState
cb.lastStateChange = time.Now()
atomic.AddInt64(&cb.metrics.StateChanges, 1)
fmt.Printf("⚡ Circuit Breaker: %s → %s\n", oldState, newState)
if cb.onStateChange != nil {
go cb.onStateChange(oldState, newState)
}
}
}
// GetState returns the current state
func (cb *CircuitBreaker) GetState() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// GetMetrics returns current metrics
func (cb *CircuitBreaker) GetMetrics() Metrics {
cb.mu.RLock()
defer cb.mu.RUnlock()
return Metrics{
TotalRequests: atomic.LoadInt64(&cb.metrics.TotalRequests),
SuccessfulCalls: atomic.LoadInt64(&cb.metrics.SuccessfulCalls),
FailedCalls: atomic.LoadInt64(&cb.metrics.FailedCalls),
RejectedCalls: atomic.LoadInt64(&cb.metrics.RejectedCalls),
StateChanges: atomic.LoadInt64(&cb.metrics.StateChanges),
LastFailureTime: cb.metrics.LastFailureTime,
}
}
// HealthCheck returns health information
func (cb *CircuitBreaker) HealthCheck() map[string]interface{} {
cb.mu.RLock()
defer cb.mu.RUnlock()
metrics := cb.GetMetrics()
var successRate float64
if metrics.TotalRequests > 0 {
successRate = float64(metrics.SuccessfulCalls) /
float64(metrics.TotalRequests) * 100
}
return map[string]interface{}{
"state": cb.state.String(),
"failure_count": cb.failureCount,
"consecutive_fails": cb.consecutiveFails,
"total_requests": metrics.TotalRequests,
"successful_calls": metrics.SuccessfulCalls,
"failed_calls": metrics.FailedCalls,
"rejected_calls": metrics.RejectedCalls,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
"last_state_change": cb.lastStateChange,
}
}
// Example: Unreliable service simulation
type UnreliableService struct {
failureRate float64
callCount int
mu sync.Mutex
}
func (us *UnreliableService) Call() error {
us.mu.Lock()
us.callCount++
count := us.callCount
us.mu.Unlock()
time.Sleep(50 * time.Millisecond)
// Simulate intermittent failures
if count%3 == 0 && count < 10 {
return errors.New("service temporarily unavailable")
}
return nil
}
func main() {
service := &UnreliableService{}
config := Config{
MaxFailures: 3,
ResetTimeout: 2 * time.Second,
HalfOpenMaxCalls: 2,
OnStateChange: func(from, to State) {
fmt.Printf("📊 State transition: %s → %s\n", from, to)
},
}
cb := NewCircuitBreaker(config)
fmt.Println("Starting Circuit Breaker test...\n")
// Test requests
for i := 1; i <= 15; i++ {
fmt.Printf("\n--- Request %d ---\n", i)
err := cb.Execute(func() error {
return service.Call()
})
if err != nil {
fmt.Printf("❌ Error: %v\n", err)
} else {
fmt.Printf("✅ Success\n")
}
health := cb.GetHealthCheck()
fmt.Printf("State: %s | Success Rate: %s\n",
health["state"], health["success_rate"])
if i == 10 {
fmt.Println("\n⏱ Waiting for reset timeout...")
time.Sleep(2100 * time.Millisecond)
} else {
time.Sleep(300 * time.Millisecond)
}
}
fmt.Println("\n" + strings.Repeat("=", 50))
fmt.Println("Final Health Report:")
health := cb.GetHealthCheck()
for key, value := range health {
fmt.Printf(" %s: %v\n", key, value)
}
}
Part 3: Load Balancing Algorithms
Load balancers distribute incoming requests across multiple servers to ensure no single server becomes overwhelmed.
Round-Robin Load Balancing
Server List] GetServers --> CheckEmpty{Server list
empty?} CheckEmpty -->|Yes| Error[Return Error:
No servers available] CheckEmpty -->|No| GetCurrent[Get Current Index] GetCurrent --> SelectServer[Select Server at
Current Index] SelectServer --> Increment[Increment Index:
index = index + 1 % len] Increment --> CheckHealth{Server
healthy?} CheckHealth -->|No| Skip[Skip to
Next Server] Skip --> GetCurrent CheckHealth -->|Yes| SendRequest[Send Request
to Server] SendRequest --> LogMetrics[Update Metrics:
- Request count
- Response time] LogMetrics --> Success([Return Response]) Error --> End([End])
Least Connections Algorithm
minConn = ∞
selectedServer = nil] Init --> GetServers[Get All Active
Servers] GetServers --> IterateStart{For each
server} IterateStart --> CheckHealth{Server
healthy?} CheckHealth -->|No| NextServer[Continue to
next server] CheckHealth -->|Yes| GetConnCount[Get Active
Connection Count] GetConnCount --> Compare{connCount <
minConn?} Compare -->|Yes| UpdateMin[minConn = connCount
selectedServer = current] Compare -->|No| NextServer UpdateMin --> NextServer NextServer --> MoreServers{More
servers?} MoreServers -->|Yes| IterateStart MoreServers -->|No| CheckSelected{selectedServer
!= nil?} CheckSelected -->|No| NoServer[Return Error:
No healthy servers] CheckSelected -->|Yes| IncrementConn[Increment server's
active connection count] IncrementConn --> SendRequest[Send Request
to Selected Server] SendRequest --> DecrementConn[On completion:
Decrement connection
count] DecrementConn --> Success([Return Response]) NoServer --> End([End])
Consistent Hashing
e.g., user ID,
session ID] Extract --> Hash[Hash Key:
hash = hashFunc] Hash --> FindRing[Find position
on hash ring] FindRing --> SearchCW[Search clockwise
on ring for first
virtual node] SearchCW --> MapNode[Map virtual node
to physical server] MapNode --> CheckServer{Server
available?} CheckServer -->|No| FindNext[Continue clockwise
to next virtual node] FindNext --> MapNode CheckServer -->|Yes| SendRequest[Send Request
to Server] SendRequest --> UpdateCache[Update local cache:
key → server mapping] UpdateCache --> Success([Return Response]) Note1[Hash Ring Visualization:
0° ─ Server A vnode 1
72° ─ Server B vnode 1
144° ─ Server A vnode 2
216° ─ Server C vnode 1
288° ─ Server B vnode 2] style Note1 fill:#4a1e3a,color:#fff,stroke:#3b82f6,stroke-width:2px
Load Balancing Implementation in Go
package main
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
)
// Server represents a backend server
type Server struct {
ID string
Address string
Healthy bool
ActiveConns int32
TotalRequests int64
FailedRequests int64
LastHealthCheck time.Time
}
// ===== Round-Robin Load Balancer =====
// RoundRobinLB implements round-robin load balancing
type RoundRobinLB struct {
servers []*Server
current uint32
mu sync.RWMutex
}
// NewRoundRobinLB creates a new round-robin load balancer
func NewRoundRobinLB(servers []*Server) *RoundRobinLB {
return &RoundRobinLB{
servers: servers,
current: 0,
}
}
// NextServer returns the next server using round-robin
func (rr *RoundRobinLB) NextServer() (*Server, error) {
rr.mu.RLock()
defer rr.mu.RUnlock()
if len(rr.servers) == 0 {
return nil, errors.New("no servers available")
}
// Try each server once
for i := 0; i < len(rr.servers); i++ {
index := atomic.AddUint32(&rr.current, 1) % uint32(len(rr.servers))
server := rr.servers[index]
if server.Healthy {
return server, nil
}
}
return nil, errors.New("no healthy servers available")
}
// ===== Least Connections Load Balancer =====
// LeastConnectionsLB implements least connections load balancing
type LeastConnectionsLB struct {
servers []*Server
mu sync.RWMutex
}
// NewLeastConnectionsLB creates a new least connections load balancer
func NewLeastConnectionsLB(servers []*Server) *LeastConnectionsLB {
return &LeastConnectionsLB{
servers: servers,
}
}
// NextServer returns the server with least active connections
func (lc *LeastConnectionsLB) NextServer() (*Server, error) {
lc.mu.RLock()
defer lc.mu.RUnlock()
var selected *Server
minConns := int32(1<<31 - 1) // Max int32
for _, server := range lc.servers {
if !server.Healthy {
continue
}
conns := atomic.LoadInt32(&server.ActiveConns)
if conns < minConns {
minConns = conns
selected = server
}
}
if selected == nil {
return nil, errors.New("no healthy servers available")
}
return selected, nil
}
// ===== Consistent Hashing Load Balancer =====
// hashRing represents a consistent hash ring
type hashRing struct {
nodes map[uint32]string // hash -> server ID
sortedKeys []uint32
servers map[string]*Server
replicas int
mu sync.RWMutex
}
// NewHashRing creates a new consistent hash ring
func NewHashRing(servers []*Server, replicas int) *hashRing {
hr := &hashRing{
nodes: make(map[uint32]string),
servers: make(map[string]*Server),
replicas: replicas,
}
for _, server := range servers {
hr.AddServer(server)
}
return hr
}
// AddServer adds a server to the hash ring
func (hr *hashRing) AddServer(server *Server) {
hr.mu.Lock()
defer hr.mu.Unlock()
hr.servers[server.ID] = server
// Add virtual nodes
for i := 0; i < hr.replicas; i++ {
hash := hr.hashKey(fmt.Sprintf("%s:%d", server.ID, i))
hr.nodes[hash] = server.ID
hr.sortedKeys = append(hr.sortedKeys, hash)
}
sort.Slice(hr.sortedKeys, func(i, j int) bool {
return hr.sortedKeys[i] < hr.sortedKeys[j]
})
}
// GetServer returns a server for the given key
func (hr *hashRing) GetServer(key string) (*Server, error) {
hr.mu.RLock()
defer hr.mu.RUnlock()
if len(hr.nodes) == 0 {
return nil, errors.New("no servers in ring")
}
hash := hr.hashKey(key)
// Binary search for first node >= hash
idx := sort.Search(len(hr.sortedKeys), func(i int) bool {
return hr.sortedKeys[i] >= hash
})
// Wrap around if necessary
if idx == len(hr.sortedKeys) {
idx = 0
}
// Find first healthy server
for i := 0; i < len(hr.sortedKeys); i++ {
nodeHash := hr.sortedKeys[(idx+i)%len(hr.sortedKeys)]
serverID := hr.nodes[nodeHash]
server := hr.servers[serverID]
if server.Healthy {
return server, nil
}
}
return nil, errors.New("no healthy servers available")
}
// hashKey generates a hash for a key
func (hr *hashRing) hashKey(key string) uint32 {
h := sha256.Sum256([]byte(key))
return binary.BigEndian.Uint32(h[:4])
}
// ===== Demo =====
func simulateRequest(server *Server, requestID int) error {
atomic.AddInt32(&server.ActiveConns, 1)
atomic.AddInt64(&server.TotalRequests, 1)
defer atomic.AddInt32(&server.ActiveConns, -1)
// Simulate processing time
time.Sleep(time.Duration(50+requestID%100) * time.Millisecond)
fmt.Printf(" ✓ Request %d handled by %s (active: %d)\n",
requestID, server.ID, atomic.LoadInt32(&server.ActiveConns))
return nil
}
func main() {
// Create servers
servers := []*Server{
{ID: "server-1", Address: "192.168.1.1:8080", Healthy: true},
{ID: "server-2", Address: "192.168.1.2:8080", Healthy: true},
{ID: "server-3", Address: "192.168.1.3:8080", Healthy: true},
}
fmt.Println("========== Round-Robin Load Balancer ==========")
rrLB := NewRoundRobinLB(servers)
for i := 1; i <= 6; i++ {
server, err := rrLB.NextServer()
if err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("Request %d → %s\n", i, server.ID)
}
fmt.Println("\n========== Least Connections Load Balancer ==========")
lcLB := NewLeastConnectionsLB(servers)
var wg sync.WaitGroup
for i := 1; i <= 9; i++ {
wg.Add(1)
go func(reqID int) {
defer wg.Done()
server, err := lcLB.NextServer()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Request %d → %s\n", reqID, server.ID)
simulateRequest(server, reqID)
}(i)
time.Sleep(20 * time.Millisecond)
}
wg.Wait()
fmt.Println("\n========== Consistent Hashing Load Balancer ==========")
chLB := NewHashRing(servers, 3) // 3 virtual nodes per server
keys := []string{
"user:12345",
"user:67890",
"session:abc123",
"user:12345", // Same key should go to same server
"session:def456",
"user:67890", // Same key should go to same server
}
for _, key := range keys {
server, err := chLB.GetServer(key)
if err != nil {
fmt.Printf("Error for key %s: %v\n", key, err)
continue
}
fmt.Printf("Key %-20s → %s\n", key, server.ID)
}
// Show final statistics
fmt.Println("\n========== Server Statistics ==========")
for _, server := range servers {
fmt.Printf("%s: Total Requests=%d, Active Connections=%d\n",
server.ID,
atomic.LoadInt64(&server.TotalRequests),
atomic.LoadInt32(&server.ActiveConns))
}
}
Part 4: Message Queue Patterns
Message queues enable asynchronous communication between services, decoupling producers from consumers.
Publish-Subscribe (Pub/Sub) Pattern
subscribers par Broadcast to all subscribers B->>S1: Message: user.created #123 B->>S2: Message: user.created #123 B->>S3: Message: user.created #123 end S1-->>B: Ack S2-->>B: Ack S3-->>B: Ack P2->>B: Publish: {type: "user.updated", id: 123} par Broadcast again B->>S1: Message: user.updated #123 B->>S2: Message: user.updated #123 B->>S3: Message: user.updated #123 end Note over S1,S3: Each subscriber receives
ALL messages
Request-Reply Pattern
complex calculation C->>C: Generate correlation ID
correlationID = "req-12345" C->>RQ: Send Request {
correlationID: "req-12345",
replyTo: "response.client1",
data: {x: 100, y: 200}
} Note over C: Waiting for response on
"response.client1" W->>RQ: Poll for messages RQ->>W: Deliver Request Note over W: Process request
result = x + y = 300 W->>RespQ: Send to "response.client1" {
correlationID: "req-12345",
status: "success",
result: 300
} RespQ->>C: Deliver Response C->>C: Match correlationID
"req-12345" Note over C: Response received!
result = 300 C-->>RespQ: Ack message
Push-Pull (Work Queue) Pattern
Waiting for workers par Workers pull tasks W1->>Q: Pull task W2->>Q: Pull task W3->>Q: Pull task end Q->>W1: Task 1 Q->>W2: Task 2 Q->>W3: Task 3 Note over W1: Processing Task 1... Note over W2: Processing Task 2... Note over W3: Processing Task 3... W2->>W2: Complete Task 2 W2-->>Q: Ack Task 2 W2->>Q: Pull next task Q->>W2: Task 4 Note over W2: Processing Task 4... W1->>W1: Complete Task 1 W1-->>Q: Ack Task 1 W1->>Q: Pull next task Q->>W1: Task 5 W3->>W3: Complete Task 3 W3-->>Q: Ack Task 3 W3->>Q: Pull next task Q->>W3: Task 6 Note over W1,W3: Load balanced
across workers
Message Queue Patterns in Go
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ===== Pub/Sub Pattern =====
// Message represents a pub/sub message
type Message struct {
Topic string
Payload interface{}
Timestamp time.Time
}
// Subscriber is a function that handles messages
type Subscriber func(Message)
// PubSubBroker implements publish-subscribe pattern
type PubSubBroker struct {
mu sync.RWMutex
subscribers map[string][]Subscriber
}
// NewPubSubBroker creates a new pub/sub broker
func NewPubSubBroker() *PubSubBroker {
return &PubSubBroker{
subscribers: make(map[string][]Subscriber),
}
}
// Subscribe registers a subscriber for a topic
func (psb *PubSubBroker) Subscribe(topic string, handler Subscriber) {
psb.mu.Lock()
defer psb.mu.Unlock()
psb.subscribers[topic] = append(psb.subscribers[topic], handler)
fmt.Printf("📬 New subscriber for topic: %s\n", topic)
}
// Publish sends a message to all subscribers of a topic
func (psb *PubSubBroker) Publish(topic string, payload interface{}) {
psb.mu.RLock()
defer psb.mu.RUnlock()
message := Message{
Topic: topic,
Payload: payload,
Timestamp: time.Now(),
}
subscribers := psb.subscribers[topic]
fmt.Printf("📤 Publishing to %s: %d subscribers\n", topic, len(subscribers))
for _, subscriber := range subscribers {
go subscriber(message) // Async delivery
}
}
// ===== Request-Reply Pattern =====
// Request represents a request message
type Request struct {
CorrelationID string
ReplyTo string
Data interface{}
}
// Response represents a response message
type Response struct {
CorrelationID string
Status string
Result interface{}
Error error
}
// RequestReplyClient handles request-reply communication
type RequestReplyClient struct {
requestQueue chan Request
responseChans map[string]chan Response
mu sync.RWMutex
}
// NewRequestReplyClient creates a new request-reply client
func NewRequestReplyClient() *RequestReplyClient {
return &RequestReplyClient{
requestQueue: make(chan Request, 100),
responseChans: make(map[string]chan Response),
}
}
// SendRequest sends a request and waits for response
func (rrc *RequestReplyClient) SendRequest(ctx context.Context, data interface{}) (*Response, error) {
correlationID := fmt.Sprintf("req-%d", time.Now().UnixNano())
responseChan := make(chan Response, 1)
rrc.mu.Lock()
rrc.responseChans[correlationID] = responseChan
rrc.mu.Unlock()
defer func() {
rrc.mu.Lock()
delete(rrc.responseChans, correlationID)
rrc.mu.Unlock()
}()
request := Request{
CorrelationID: correlationID,
ReplyTo: "response-queue",
Data: data,
}
fmt.Printf("📨 Sending request: %s\n", correlationID)
select {
case rrc.requestQueue <- request:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case response := <-responseChan:
fmt.Printf("📬 Received response: %s\n", correlationID)
return &response, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// HandleResponse processes incoming responses
func (rrc *RequestReplyClient) HandleResponse(response Response) {
rrc.mu.RLock()
responseChan, exists := rrc.responseChans[response.CorrelationID]
rrc.mu.RUnlock()
if exists {
select {
case responseChan <- response:
default:
}
}
}
// Worker processes requests
type Worker struct {
ID int
requestQueue chan Request
client *RequestReplyClient
}
// Start begins processing requests
func (w *Worker) Start(ctx context.Context) {
go func() {
for {
select {
case request := <-w.requestQueue:
fmt.Printf("⚙️ Worker %d processing: %s\n",
w.ID, request.CorrelationID)
// Simulate processing
time.Sleep(100 * time.Millisecond)
response := Response{
CorrelationID: request.CorrelationID,
Status: "success",
Result: fmt.Sprintf("Processed by worker %d", w.ID),
}
w.client.HandleResponse(response)
case <-ctx.Done():
return
}
}
}()
}
// ===== Work Queue (Push-Pull) Pattern =====
// Task represents a unit of work
type Task struct {
ID int
Data interface{}
Retries int
}
// WorkQueue implements work queue pattern
type WorkQueue struct {
tasks chan Task
results chan interface{}
workers int
}
// NewWorkQueue creates a new work queue
func NewWorkQueue(bufferSize, numWorkers int) *WorkQueue {
return &WorkQueue{
tasks: make(chan Task, bufferSize),
results: make(chan interface{}, bufferSize),
workers: numWorkers,
}
}
// Start starts all workers
func (wq *WorkQueue) Start(ctx context.Context, processFn func(Task) interface{}) {
for i := 0; i < wq.workers; i++ {
go wq.worker(ctx, i, processFn)
}
}
// worker processes tasks from the queue
func (wq *WorkQueue) worker(ctx context.Context, id int, processFn func(Task) interface{}) {
fmt.Printf("👷 Worker %d started\n", id)
for {
select {
case task := <-wq.tasks:
fmt.Printf("👷 Worker %d processing task %d\n", id, task.ID)
result := processFn(task)
select {
case wq.results <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
fmt.Printf("👷 Worker %d stopped\n", id)
return
}
}
}
// Submit adds a task to the queue
func (wq *WorkQueue) Submit(task Task) {
wq.tasks <- task
}
// Results returns the results channel
func (wq *WorkQueue) Results() <-chan interface{} {
return wq.results
}
// ===== Demo =====
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// ===== Pub/Sub Demo =====
fmt.Println("========== Pub/Sub Pattern ==========\n")
broker := NewPubSubBroker()
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 📧 Email Service: %v\n", msg.Payload)
})
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 📊 Analytics: %v\n", msg.Payload)
})
broker.Subscribe("user.events", func(msg Message) {
fmt.Printf(" 💾 Database: %v\n", msg.Payload)
})
broker.Publish("user.events", "User [email protected] registered")
time.Sleep(100 * time.Millisecond)
broker.Publish("user.events", "User [email protected] logged in")
time.Sleep(100 * time.Millisecond)
// ===== Request-Reply Demo =====
fmt.Println("\n========== Request-Reply Pattern ==========\n")
rrClient := NewRequestReplyClient()
// Start workers
for i := 1; i <= 3; i++ {
worker := &Worker{
ID: i,
requestQueue: rrClient.requestQueue,
client: rrClient,
}
worker.Start(ctx)
}
// Send requests
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
response, err := rrClient.SendRequest(ctx,
fmt.Sprintf("Calculate %d", id))
if err != nil {
fmt.Printf("❌ Request %d failed: %v\n", id, err)
return
}
fmt.Printf("✅ Request %d: %v\n", id, response.Result)
}(i)
}
wg.Wait()
time.Sleep(200 * time.Millisecond)
// ===== Work Queue Demo =====
fmt.Println("\n========== Work Queue Pattern ==========\n")
workQueue := NewWorkQueue(10, 3)
processFn := func(task Task) interface{} {
// Simulate work
time.Sleep(time.Duration(50+task.ID*10) * time.Millisecond)
return fmt.Sprintf("Task %d completed", task.ID)
}
workQueue.Start(ctx, processFn)
// Submit tasks
go func() {
for i := 1; i <= 6; i++ {
workQueue.Submit(Task{
ID: i,
Data: fmt.Sprintf("Process item %d", i),
})
}
}()
// Collect results
go func() {
for result := range workQueue.Results() {
fmt.Printf("✅ Result: %v\n", result)
}
}()
time.Sleep(1 * time.Second)
fmt.Println("\n========== Demo Complete ==========")
}
Comparison and When to Use Each Pattern
Consensus Algorithms (Raft)
Use When:
- You need strong consistency across distributed nodes
- Building distributed databases or configuration stores
- Implementing leader election in clustered systems
- Replicating state machines
Trade-offs:
- ✅ Strong consistency guarantees
- ✅ Easier to understand than Paxos
- ❌ Requires majority of nodes to be available
- ❌ Higher latency due to consensus overhead
Circuit Breakers
Use When:
- Making calls to external services or microservices
- Protecting against cascading failures
- Need graceful degradation
- Want to fail fast instead of waiting for timeouts
Trade-offs:
- ✅ Prevents cascading failures
- ✅ Faster failure detection
- ✅ Gives failing services time to recover
- ❌ Requires careful threshold tuning
- ❌ Can reject valid requests during recovery
Load Balancing
Round-Robin:
- Simple, even distribution
- Good for homogeneous backends
- No state required
Least Connections:
- Better for variable request durations
- Prevents overloading slow servers
- Requires connection tracking
Consistent Hashing:
- Maintains sticky sessions
- Minimal remapping when nodes change
- Great for caching scenarios
Message Queue Patterns
Pub/Sub:
- Broadcasting events to multiple consumers
- Loosely coupled event-driven architecture
- Real-time notifications
Request-Reply:
- Synchronous-style communication over async transport
- RPC-like behavior with message queues
- Can implement timeouts and retries
Push-Pull (Work Queue):
- Task distribution among workers
- Load balancing computational work
- Parallel processing pipelines
Conclusion
Understanding these distributed systems patterns is crucial for building scalable, resilient applications. Each pattern solves specific problems:
- Raft ensures data consistency across distributed nodes
- Circuit Breakers protect against cascading failures
- Load Balancers distribute traffic efficiently
- Message Queues enable asynchronous, decoupled communication
The visual diagrams in this guide help illustrate complex interactions, making these patterns easier to understand and implement. Choose patterns based on your specific requirements for consistency, availability, partition tolerance, and performance.
Further Reading
- Raft Consensus Algorithm Paper
- Release It! - Design Patterns for Production Systems
- Designing Data-Intensive Applications
- Enterprise Integration Patterns
Have questions or want to discuss distributed systems patterns? Feel free to reach out!