← Ticket Seller | Series Overview | Monte Carlo Pi →
The Problem: Counting What You Can’t See
Count concurrent users. On login: increment. On logout: decrement. Simple, right?
Now add reality:
- The counter is distributed across multiple servers
- A user’s session times out on one server but they’re active on another
- The increment message arrives after the decrement message
- Network partitions split your cluster
- Servers crash mid-operation
Suddenly, this “simple” counter becomes a distributed systems nightmare. Welcome to distributed counting, where even addition is hard.
The Naive Single-Server Implementation
Let’s start simple:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type LoginCounter struct {
count int64
}
func (lc *LoginCounter) Login(userID string) {
atomic.AddInt64(&lc.count, 1)
fmt.Printf("User %s logged in. Count: %d\n", userID, atomic.LoadInt64(&lc.count))
}
func (lc *LoginCounter) Logout(userID string) {
atomic.AddInt64(&lc.count, -1)
fmt.Printf("User %s logged out. Count: %d\n", userID, atomic.LoadInt64(&lc.count))
}
func (lc *LoginCounter) Count() int64 {
return atomic.LoadInt64(&lc.count)
}
func main() {
counter := &LoginCounter{}
// Simulate logins and logouts
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
userID := fmt.Sprintf("user%d", id)
counter.Login(userID)
time.Sleep(100 * time.Millisecond) // Simulate session
counter.Logout(userID)
}(i)
}
wg.Wait()
fmt.Printf("Final count: %d (should be 0)\n", counter.Count())
}
Output:
User user0 logged in. Count: 1
User user1 logged in. Count: 2
...
User user99 logged out. Count: 0
Final count: 0 (should be 0)
Perfect! But this is single-server, single-process. Reality is messier.
Challenge 1: Session Timeout vs. Active User
Users don’t always logout cleanly. Sessions timeout:
type Session struct {
UserID string
LoginTime time.Time
LastSeen time.Time
}
type SessionTracker struct {
mu sync.RWMutex
sessions map[string]*Session
count int64
}
func (st *SessionTracker) Login(userID string) {
st.mu.Lock()
defer st.mu.Unlock()
if _, exists := st.sessions[userID]; !exists {
st.sessions[userID] = &Session{
UserID: userID,
LoginTime: time.Now(),
LastSeen: time.Now(),
}
atomic.AddInt64(&st.count, 1)
}
}
func (st *SessionTracker) Heartbeat(userID string) {
st.mu.RLock()
defer st.mu.RUnlock()
if session, exists := st.sessions[userID]; exists {
session.LastSeen = time.Now()
}
}
func (st *SessionTracker) CleanupExpired(timeout time.Duration) {
st.mu.Lock()
defer st.mu.Unlock()
now := time.Now()
for userID, session := range st.sessions {
if now.Sub(session.LastSeen) > timeout {
delete(st.sessions, userID)
atomic.AddInt64(&st.count, -1)
fmt.Printf("Session %s expired\n", userID)
}
}
}
// Background cleanup goroutine
func (st *SessionTracker) StartCleanup(interval, timeout time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
st.CleanupExpired(timeout)
}
}()
}
Problem: What if a user sends a heartbeat right as cleanup runs? Race condition between read lock (heartbeat) and write lock (cleanup).
Solution: Use a write lock for heartbeat too, or use atomic operations for LastSeen:
type Session struct {
UserID string
LoginTime time.Time
lastSeen atomic.Int64 // Unix timestamp
}
func (st *SessionTracker) Heartbeat(userID string) {
st.mu.RLock()
session, exists := st.sessions[userID]
st.mu.RUnlock()
if exists {
session.lastSeen.Store(time.Now().Unix())
}
}
Challenge 2: Distributed Counting Across Servers
Now you have multiple servers. How do you count total users?
Approach 1: Central Counter Service
type CentralCounter struct {
mu sync.RWMutex
count int64
}
func (cc *CentralCounter) Increment() {
cc.mu.Lock()
cc.count++
cc.mu.Unlock()
}
func (cc *CentralCounter) Decrement() {
cc.mu.Lock()
cc.count--
cc.mu.Unlock()
}
func (cc *CentralCounter) Get() int64 {
cc.mu.RLock()
defer cc.mu.RUnlock()
return cc.count
}
// Each server calls this on login/logout
func (server *Server) Login(userID string) {
// ... handle session locally ...
// Update central counter
server.centralCounter.Increment()
}
Problems:
- Single point of failure
- Network latency for every login/logout
- Bottleneck under high load
- What if the network call fails?
Approach 2: Aggregate on Demand
Each server maintains its own count, aggregate when needed:
type DistributedCounter struct {
servers []*Server
}
func (dc *DistributedCounter) TotalCount() (int64, error) {
var total int64
var mu sync.Mutex
var wg sync.WaitGroup
var firstErr error
for _, server := range dc.servers {
wg.Add(1)
go func(s *Server) {
defer wg.Done()
count, err := s.GetLocalCount()
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
return
}
mu.Lock()
total += count
mu.Unlock()
}(server)
}
wg.Wait()
return total, firstErr
}
Problems:
- Inconsistent snapshot (count changes while aggregating)
- What if a server is down?
- High latency to query all servers
Challenge 3: Message Ordering
User logs in on Server A, then immediately logs out. Messages arrive out of order:
Server B receives: logout, then login
Final state: User appears logged in (wrong!)
This is the message reordering problem in distributed systems.
Solution: Vector Clocks
type Event struct {
Type string // "login" or "logout"
UserID string
Timestamp time.Time
Clock VectorClock
}
type VectorClock map[string]int
func (vc VectorClock) HappensBefore(other VectorClock) bool {
lessOrEqual := true
strictlyLess := false
for server, clock := range vc {
otherClock := other[server]
if clock > otherClock {
return false
}
if clock < otherClock {
strictlyLess = true
}
}
return lessOrEqual && strictlyLess
}
type SmartCounter struct {
mu sync.Mutex
events map[string][]Event // userID -> events
count int64
}
func (sc *SmartCounter) ProcessEvent(event Event) {
sc.mu.Lock()
defer sc.mu.Unlock()
userEvents := sc.events[event.UserID]
// Find correct position based on vector clock
for i, existing := range userEvents {
if event.Clock.HappensBefore(existing.Clock) {
// This event happened before existing event
userEvents = append(userEvents[:i], append([]Event{event}, userEvents[i:]...)...)
sc.events[event.UserID] = userEvents
sc.recomputeCount(event.UserID)
return
}
}
// Event is newest
sc.events[event.UserID] = append(userEvents, event)
sc.applyEvent(event)
}
func (sc *SmartCounter) applyEvent(event Event) {
if event.Type == "login" {
sc.count++
} else {
sc.count--
}
}
Complexity explosion! We’re now maintaining event history and vector clocks just to count users.
Challenge 4: Network Partitions
Network splits your cluster into two groups. Each group thinks it has the authoritative count:
Before partition: 100 users
After partition:
- Group A: 50 users (thinks total is 50)
- Group B: 50 users (thinks total is 50)
During partition:
- Group A: +10 logins = 60 users
- Group B: +5 logins = 55 users
After partition heals:
What's the count? 60? 55? 115? 105?
This is where CRDTs (Conflict-Free Replicated Data Types) come in.
CRDT: Grow-Only Counter
type GCounter struct {
mu sync.RWMutex
counts map[string]int64 // serverID -> local count
serverID string
}
func NewGCounter(serverID string) *GCounter {
return &GCounter{
counts: make(map[string]int64),
serverID: serverID,
}
}
func (gc *GCounter) Increment() {
gc.mu.Lock()
defer gc.mu.Unlock()
gc.counts[gc.serverID]++
}
func (gc *GCounter) Value() int64 {
gc.mu.RLock()
defer gc.mu.RUnlock()
var total int64
for _, count := range gc.counts {
total += count
}
return total
}
func (gc *GCounter) Merge(other *GCounter) {
gc.mu.Lock()
defer gc.mu.Unlock()
for serverID, count := range other.counts {
if count > gc.counts[serverID] {
gc.counts[serverID] = count
}
}
}
Problem: Grow-only counters can’t decrement (for logout). We need a PN-Counter.
CRDT: PN-Counter (Positive-Negative Counter)
type PNCounter struct {
positive *GCounter
negative *GCounter
}
func NewPNCounter(serverID string) *PNCounter {
return &PNCounter{
positive: NewGCounter(serverID),
negative: NewGCounter(serverID),
}
}
func (pn *PNCounter) Increment() {
pn.positive.Increment()
}
func (pn *PNCounter) Decrement() {
pn.negative.Increment()
}
func (pn *PNCounter) Value() int64 {
return pn.positive.Value() - pn.negative.Value()
}
func (pn *PNCounter) Merge(other *PNCounter) {
pn.positive.Merge(other.positive)
pn.negative.Merge(other.negative)
}
Now we can handle increments and decrements across network partitions!
Challenge 5: Garbage Collection
PN-Counters grow forever (storing per-server counts). For long-running systems, this is a problem:
type CompactPNCounter struct {
mu sync.RWMutex
positive map[string]int64
negative map[string]int64
serverID string
lastCompact time.Time
compactInterval time.Duration
}
func (cpn *CompactPNCounter) Compact() {
cpn.mu.Lock()
defer cpn.mu.Unlock()
// Remove servers that haven't updated in a while
// This requires consensus on which servers are alive!
// Beyond scope, but involves heartbeats and tombstones
}
Garbage collection in distributed systems is really hard.
Real-World Implementation: Session Store with Redis
Most production systems use a distributed cache:
type RedisLoginCounter struct {
client *redis.Client
}
func (rlc *RedisLoginCounter) Login(userID string) error {
pipe := rlc.client.Pipeline()
// Add to active users set
pipe.SAdd(ctx, "active_users", userID)
// Set session expiry
pipe.SetEx(ctx, "session:"+userID, "active", 30*time.Minute)
_, err := pipe.Exec(ctx)
return err
}
func (rlc *RedisLoginCounter) Logout(userID string) error {
pipe := rlc.client.Pipeline()
// Remove from active users set
pipe.SRem(ctx, "active_users", userID)
// Delete session
pipe.Del(ctx, "session:"+userID)
_, err := pipe.Exec(ctx)
return err
}
func (rlc *RedisLoginCounter) Count() (int64, error) {
return rlc.client.SCard(ctx, "active_users").Result()
}
Benefits:
- Redis handles distribution
- Atomic operations
- Built-in expiry
Tradeoffs:
- External dependency
- Network latency
- Redis becomes single point of failure (unless clustered)
Eventual Consistency: Embracing Imperfection
Sometimes, exact counts don’t matter:
type EventualCounter struct {
mu sync.RWMutex
localCount int64
servers []*RemoteCounter
lastSync time.Time
syncInterval time.Duration
}
func (ec *EventualCounter) Increment() {
atomic.AddInt64(&ec.localCount, 1)
}
func (ec *EventualCounter) Decrement() {
atomic.AddInt64(&ec.localCount, -1)
}
func (ec *EventualCounter) ApproximateCount() int64 {
// Return local count (fast, but approximate)
return atomic.LoadInt64(&ec.localCount)
}
func (ec *EventualCounter) SyncInBackground() {
ticker := time.NewTicker(ec.syncInterval)
go func() {
for range ticker.C {
ec.sync()
}
}()
}
func (ec *EventualCounter) sync() {
var total int64
for _, server := range ec.servers {
// Best-effort query (ignore errors)
if count, err := server.GetCount(); err == nil {
total += count
}
}
// Update local cache
ec.mu.Lock()
ec.localCount = total
ec.lastSync = time.Now()
ec.mu.Unlock()
}
Philosophy: It’s okay if the count is off by a few for a few seconds. Most applications can tolerate this.
Testing Distributed Counters
func TestDistributedCounter(t *testing.T) {
servers := []*Server{
NewServer("server1"),
NewServer("server2"),
NewServer("server3"),
}
// Simulate logins across servers
var wg sync.WaitGroup
for i := 0; i < 300; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
server := servers[id%len(servers)]
server.Login(fmt.Sprintf("user%d", id))
}(i)
}
wg.Wait()
// Aggregate counts
var total int64
for _, server := range servers {
total += server.LocalCount()
}
if total != 300 {
t.Errorf("Expected 300 logins, got %d", total)
}
}
func TestPartitionRecovery(t *testing.T) {
counter1 := NewPNCounter("server1")
counter2 := NewPNCounter("server2")
// Partition: each counter operates independently
for i := 0; i < 10; i++ {
counter1.Increment()
}
for i := 0; i < 5; i++ {
counter2.Increment()
}
// Partition heals: merge
counter1.Merge(counter2)
if counter1.Value() != 15 {
t.Errorf("Expected 15 after merge, got %d", counter1.Value())
}
}
Performance Comparison
| Approach | Reads/sec | Writes/sec | Consistency | Complexity |
|---|---|---|---|---|
| Local atomic | 100M | 50M | Strong | Low |
| Central service | 10K | 5K | Strong | Medium |
| Aggregate on demand | 1K | 50M | Eventual | Medium |
| Redis | 100K | 50K | Strong | Low |
| CRDT (PN-Counter) | 10M | 5M | Eventual | High |
Real-World Applications
- User Activity Tracking: Online users, concurrent sessions
- Rate Limiting: Distributed rate limiters across servers
- Analytics: Page views, click counts, real-time metrics
- Resource Quotas: API calls per user, storage limits
Common Pitfalls
- Assuming strong consistency: Distributed systems are eventually consistent
- Ignoring network failures: Timeouts, partitions, and packet loss are normal
- Not handling duplicates: Messages can arrive multiple times
- Forgetting garbage collection: Counters accumulate metadata
- Over-engineering: Often, approximate counts are good enough
Best Practices
- Define your consistency requirements: Strong vs. eventual
- Use CRDTs for partition tolerance: If you need it
- Leverage existing systems: Redis, etcd, Consul
- Monitor staleness: How old is your count?
- Test with chaos: Inject failures, delays, partitions
- Accept approximation: Perfect distributed counting is expensive
Conclusion
The login counter teaches us that distributed counting is surprisingly hard:
- Message ordering matters more than you think
- Network partitions break assumptions
- CRDTs provide mathematical guarantees but add complexity
- Eventual consistency is often good enough
- Existing systems (Redis, etc.) solve 90% of use cases
The fundamental lesson: In distributed systems, even simple operations become complex. The question isn’t “how do I count perfectly?” but “how much imperfection can I tolerate?”
For most applications:
- Use Redis or similar for strong consistency within bounds
- Use CRDTs only if you need partition tolerance
- Accept eventual consistency and focus on reducing staleness
- Monitor and alert on anomalies
Remember: A slightly wrong count that’s available is better than a perfect count that’s not.