← Ticket Seller | Series Overview | Monte Carlo Pi →


The Problem: Counting What You Can’t See

Count concurrent users. On login: increment. On logout: decrement. Simple, right?

Now add reality:

  • The counter is distributed across multiple servers
  • A user’s session times out on one server but they’re active on another
  • The increment message arrives after the decrement message
  • Network partitions split your cluster
  • Servers crash mid-operation

Suddenly, this “simple” counter becomes a distributed systems nightmare. Welcome to distributed counting, where even addition is hard.

The Naive Single-Server Implementation

Let’s start simple:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type LoginCounter struct {
    count int64
}

func (lc *LoginCounter) Login(userID string) {
    atomic.AddInt64(&lc.count, 1)
    fmt.Printf("User %s logged in. Count: %d\n", userID, atomic.LoadInt64(&lc.count))
}

func (lc *LoginCounter) Logout(userID string) {
    atomic.AddInt64(&lc.count, -1)
    fmt.Printf("User %s logged out. Count: %d\n", userID, atomic.LoadInt64(&lc.count))
}

func (lc *LoginCounter) Count() int64 {
    return atomic.LoadInt64(&lc.count)
}

func main() {
    counter := &LoginCounter{}

    // Simulate logins and logouts
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            userID := fmt.Sprintf("user%d", id)

            counter.Login(userID)
            time.Sleep(100 * time.Millisecond) // Simulate session
            counter.Logout(userID)
        }(i)
    }

    wg.Wait()
    fmt.Printf("Final count: %d (should be 0)\n", counter.Count())
}

Output:

User user0 logged in. Count: 1
User user1 logged in. Count: 2
...
User user99 logged out. Count: 0
Final count: 0 (should be 0)

Perfect! But this is single-server, single-process. Reality is messier.

Challenge 1: Session Timeout vs. Active User

Users don’t always logout cleanly. Sessions timeout:

type Session struct {
    UserID    string
    LoginTime time.Time
    LastSeen  time.Time
}

type SessionTracker struct {
    mu       sync.RWMutex
    sessions map[string]*Session
    count    int64
}

func (st *SessionTracker) Login(userID string) {
    st.mu.Lock()
    defer st.mu.Unlock()

    if _, exists := st.sessions[userID]; !exists {
        st.sessions[userID] = &Session{
            UserID:    userID,
            LoginTime: time.Now(),
            LastSeen:  time.Now(),
        }
        atomic.AddInt64(&st.count, 1)
    }
}

func (st *SessionTracker) Heartbeat(userID string) {
    st.mu.RLock()
    defer st.mu.RUnlock()

    if session, exists := st.sessions[userID]; exists {
        session.LastSeen = time.Now()
    }
}

func (st *SessionTracker) CleanupExpired(timeout time.Duration) {
    st.mu.Lock()
    defer st.mu.Unlock()

    now := time.Now()
    for userID, session := range st.sessions {
        if now.Sub(session.LastSeen) > timeout {
            delete(st.sessions, userID)
            atomic.AddInt64(&st.count, -1)
            fmt.Printf("Session %s expired\n", userID)
        }
    }
}

// Background cleanup goroutine
func (st *SessionTracker) StartCleanup(interval, timeout time.Duration) {
    ticker := time.NewTicker(interval)
    go func() {
        for range ticker.C {
            st.CleanupExpired(timeout)
        }
    }()
}

Problem: What if a user sends a heartbeat right as cleanup runs? Race condition between read lock (heartbeat) and write lock (cleanup).

Solution: Use a write lock for heartbeat too, or use atomic operations for LastSeen:

type Session struct {
    UserID    string
    LoginTime time.Time
    lastSeen  atomic.Int64 // Unix timestamp
}

func (st *SessionTracker) Heartbeat(userID string) {
    st.mu.RLock()
    session, exists := st.sessions[userID]
    st.mu.RUnlock()

    if exists {
        session.lastSeen.Store(time.Now().Unix())
    }
}

Challenge 2: Distributed Counting Across Servers

Now you have multiple servers. How do you count total users?

Approach 1: Central Counter Service

type CentralCounter struct {
    mu    sync.RWMutex
    count int64
}

func (cc *CentralCounter) Increment() {
    cc.mu.Lock()
    cc.count++
    cc.mu.Unlock()
}

func (cc *CentralCounter) Decrement() {
    cc.mu.Lock()
    cc.count--
    cc.mu.Unlock()
}

func (cc *CentralCounter) Get() int64 {
    cc.mu.RLock()
    defer cc.mu.RUnlock()
    return cc.count
}

// Each server calls this on login/logout
func (server *Server) Login(userID string) {
    // ... handle session locally ...

    // Update central counter
    server.centralCounter.Increment()
}

Problems:

  • Single point of failure
  • Network latency for every login/logout
  • Bottleneck under high load
  • What if the network call fails?

Approach 2: Aggregate on Demand

Each server maintains its own count, aggregate when needed:

type DistributedCounter struct {
    servers []*Server
}

func (dc *DistributedCounter) TotalCount() (int64, error) {
    var total int64
    var mu sync.Mutex
    var wg sync.WaitGroup
    var firstErr error

    for _, server := range dc.servers {
        wg.Add(1)
        go func(s *Server) {
            defer wg.Done()

            count, err := s.GetLocalCount()
            if err != nil {
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
                return
            }

            mu.Lock()
            total += count
            mu.Unlock()
        }(server)
    }

    wg.Wait()
    return total, firstErr
}

Problems:

  • Inconsistent snapshot (count changes while aggregating)
  • What if a server is down?
  • High latency to query all servers

Challenge 3: Message Ordering

User logs in on Server A, then immediately logs out. Messages arrive out of order:

Server B receives: logout, then login
Final state: User appears logged in (wrong!)

This is the message reordering problem in distributed systems.

Solution: Vector Clocks

type Event struct {
    Type      string // "login" or "logout"
    UserID    string
    Timestamp time.Time
    Clock     VectorClock
}

type VectorClock map[string]int

func (vc VectorClock) HappensBefore(other VectorClock) bool {
    lessOrEqual := true
    strictlyLess := false

    for server, clock := range vc {
        otherClock := other[server]
        if clock > otherClock {
            return false
        }
        if clock < otherClock {
            strictlyLess = true
        }
    }

    return lessOrEqual && strictlyLess
}

type SmartCounter struct {
    mu      sync.Mutex
    events  map[string][]Event // userID -> events
    count   int64
}

func (sc *SmartCounter) ProcessEvent(event Event) {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    userEvents := sc.events[event.UserID]

    // Find correct position based on vector clock
    for i, existing := range userEvents {
        if event.Clock.HappensBefore(existing.Clock) {
            // This event happened before existing event
            userEvents = append(userEvents[:i], append([]Event{event}, userEvents[i:]...)...)
            sc.events[event.UserID] = userEvents
            sc.recomputeCount(event.UserID)
            return
        }
    }

    // Event is newest
    sc.events[event.UserID] = append(userEvents, event)
    sc.applyEvent(event)
}

func (sc *SmartCounter) applyEvent(event Event) {
    if event.Type == "login" {
        sc.count++
    } else {
        sc.count--
    }
}

Complexity explosion! We’re now maintaining event history and vector clocks just to count users.

Challenge 4: Network Partitions

Network splits your cluster into two groups. Each group thinks it has the authoritative count:

Before partition: 100 users

After partition:
- Group A: 50 users (thinks total is 50)
- Group B: 50 users (thinks total is 50)

During partition:
- Group A: +10 logins = 60 users
- Group B: +5 logins = 55 users

After partition heals:
What's the count? 60? 55? 115? 105?

This is where CRDTs (Conflict-Free Replicated Data Types) come in.

CRDT: Grow-Only Counter

type GCounter struct {
    mu      sync.RWMutex
    counts  map[string]int64 // serverID -> local count
    serverID string
}

func NewGCounter(serverID string) *GCounter {
    return &GCounter{
        counts:   make(map[string]int64),
        serverID: serverID,
    }
}

func (gc *GCounter) Increment() {
    gc.mu.Lock()
    defer gc.mu.Unlock()
    gc.counts[gc.serverID]++
}

func (gc *GCounter) Value() int64 {
    gc.mu.RLock()
    defer gc.mu.RUnlock()

    var total int64
    for _, count := range gc.counts {
        total += count
    }
    return total
}

func (gc *GCounter) Merge(other *GCounter) {
    gc.mu.Lock()
    defer gc.mu.Unlock()

    for serverID, count := range other.counts {
        if count > gc.counts[serverID] {
            gc.counts[serverID] = count
        }
    }
}

Problem: Grow-only counters can’t decrement (for logout). We need a PN-Counter.

CRDT: PN-Counter (Positive-Negative Counter)

type PNCounter struct {
    positive *GCounter
    negative *GCounter
}

func NewPNCounter(serverID string) *PNCounter {
    return &PNCounter{
        positive: NewGCounter(serverID),
        negative: NewGCounter(serverID),
    }
}

func (pn *PNCounter) Increment() {
    pn.positive.Increment()
}

func (pn *PNCounter) Decrement() {
    pn.negative.Increment()
}

func (pn *PNCounter) Value() int64 {
    return pn.positive.Value() - pn.negative.Value()
}

func (pn *PNCounter) Merge(other *PNCounter) {
    pn.positive.Merge(other.positive)
    pn.negative.Merge(other.negative)
}

Now we can handle increments and decrements across network partitions!

Challenge 5: Garbage Collection

PN-Counters grow forever (storing per-server counts). For long-running systems, this is a problem:

type CompactPNCounter struct {
    mu           sync.RWMutex
    positive     map[string]int64
    negative     map[string]int64
    serverID     string
    lastCompact  time.Time
    compactInterval time.Duration
}

func (cpn *CompactPNCounter) Compact() {
    cpn.mu.Lock()
    defer cpn.mu.Unlock()

    // Remove servers that haven't updated in a while
    // This requires consensus on which servers are alive!
    // Beyond scope, but involves heartbeats and tombstones
}

Garbage collection in distributed systems is really hard.

Real-World Implementation: Session Store with Redis

Most production systems use a distributed cache:

type RedisLoginCounter struct {
    client *redis.Client
}

func (rlc *RedisLoginCounter) Login(userID string) error {
    pipe := rlc.client.Pipeline()

    // Add to active users set
    pipe.SAdd(ctx, "active_users", userID)

    // Set session expiry
    pipe.SetEx(ctx, "session:"+userID, "active", 30*time.Minute)

    _, err := pipe.Exec(ctx)
    return err
}

func (rlc *RedisLoginCounter) Logout(userID string) error {
    pipe := rlc.client.Pipeline()

    // Remove from active users set
    pipe.SRem(ctx, "active_users", userID)

    // Delete session
    pipe.Del(ctx, "session:"+userID)

    _, err := pipe.Exec(ctx)
    return err
}

func (rlc *RedisLoginCounter) Count() (int64, error) {
    return rlc.client.SCard(ctx, "active_users").Result()
}

Benefits:

  • Redis handles distribution
  • Atomic operations
  • Built-in expiry

Tradeoffs:

  • External dependency
  • Network latency
  • Redis becomes single point of failure (unless clustered)

Eventual Consistency: Embracing Imperfection

Sometimes, exact counts don’t matter:

type EventualCounter struct {
    mu          sync.RWMutex
    localCount  int64
    servers     []*RemoteCounter
    lastSync    time.Time
    syncInterval time.Duration
}

func (ec *EventualCounter) Increment() {
    atomic.AddInt64(&ec.localCount, 1)
}

func (ec *EventualCounter) Decrement() {
    atomic.AddInt64(&ec.localCount, -1)
}

func (ec *EventualCounter) ApproximateCount() int64 {
    // Return local count (fast, but approximate)
    return atomic.LoadInt64(&ec.localCount)
}

func (ec *EventualCounter) SyncInBackground() {
    ticker := time.NewTicker(ec.syncInterval)
    go func() {
        for range ticker.C {
            ec.sync()
        }
    }()
}

func (ec *EventualCounter) sync() {
    var total int64
    for _, server := range ec.servers {
        // Best-effort query (ignore errors)
        if count, err := server.GetCount(); err == nil {
            total += count
        }
    }

    // Update local cache
    ec.mu.Lock()
    ec.localCount = total
    ec.lastSync = time.Now()
    ec.mu.Unlock()
}

Philosophy: It’s okay if the count is off by a few for a few seconds. Most applications can tolerate this.

Testing Distributed Counters

func TestDistributedCounter(t *testing.T) {
    servers := []*Server{
        NewServer("server1"),
        NewServer("server2"),
        NewServer("server3"),
    }

    // Simulate logins across servers
    var wg sync.WaitGroup
    for i := 0; i < 300; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            server := servers[id%len(servers)]
            server.Login(fmt.Sprintf("user%d", id))
        }(i)
    }

    wg.Wait()

    // Aggregate counts
    var total int64
    for _, server := range servers {
        total += server.LocalCount()
    }

    if total != 300 {
        t.Errorf("Expected 300 logins, got %d", total)
    }
}

func TestPartitionRecovery(t *testing.T) {
    counter1 := NewPNCounter("server1")
    counter2 := NewPNCounter("server2")

    // Partition: each counter operates independently
    for i := 0; i < 10; i++ {
        counter1.Increment()
    }

    for i := 0; i < 5; i++ {
        counter2.Increment()
    }

    // Partition heals: merge
    counter1.Merge(counter2)

    if counter1.Value() != 15 {
        t.Errorf("Expected 15 after merge, got %d", counter1.Value())
    }
}

Performance Comparison

Approach Reads/sec Writes/sec Consistency Complexity
Local atomic 100M 50M Strong Low
Central service 10K 5K Strong Medium
Aggregate on demand 1K 50M Eventual Medium
Redis 100K 50K Strong Low
CRDT (PN-Counter) 10M 5M Eventual High

Real-World Applications

  1. User Activity Tracking: Online users, concurrent sessions
  2. Rate Limiting: Distributed rate limiters across servers
  3. Analytics: Page views, click counts, real-time metrics
  4. Resource Quotas: API calls per user, storage limits

Common Pitfalls

  1. Assuming strong consistency: Distributed systems are eventually consistent
  2. Ignoring network failures: Timeouts, partitions, and packet loss are normal
  3. Not handling duplicates: Messages can arrive multiple times
  4. Forgetting garbage collection: Counters accumulate metadata
  5. Over-engineering: Often, approximate counts are good enough

Best Practices

  1. Define your consistency requirements: Strong vs. eventual
  2. Use CRDTs for partition tolerance: If you need it
  3. Leverage existing systems: Redis, etcd, Consul
  4. Monitor staleness: How old is your count?
  5. Test with chaos: Inject failures, delays, partitions
  6. Accept approximation: Perfect distributed counting is expensive

Conclusion

The login counter teaches us that distributed counting is surprisingly hard:

  • Message ordering matters more than you think
  • Network partitions break assumptions
  • CRDTs provide mathematical guarantees but add complexity
  • Eventual consistency is often good enough
  • Existing systems (Redis, etc.) solve 90% of use cases

The fundamental lesson: In distributed systems, even simple operations become complex. The question isn’t “how do I count perfectly?” but “how much imperfection can I tolerate?”

For most applications:

  • Use Redis or similar for strong consistency within bounds
  • Use CRDTs only if you need partition tolerance
  • Accept eventual consistency and focus on reducing staleness
  • Monitor and alert on anomalies

Remember: A slightly wrong count that’s available is better than a perfect count that’s not.


Further Reading


← Ticket Seller | Series Overview | Monte Carlo Pi →