Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response →


What is the Pub/Sub Pattern?

The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies.

Key Components:

  • Publisher: Sends messages/events
  • Subscriber: Receives and processes messages/events
  • Message Broker: Routes messages from publishers to subscribers
  • Topics/Channels: Categories for organizing messages

Real-World Use Cases

  • Event-Driven Architecture: Microservices communication
  • Real-Time Notifications: User activity feeds, alerts
  • Data Streaming: Log aggregation, metrics collection
  • UI Updates: React to state changes across components
  • Workflow Orchestration: Trigger actions based on events
  • Cache Invalidation: Notify when data changes

Basic Pub/Sub Implementation

package main

import (
    "fmt"
    "sync"
    "time"
)

// Message represents a pub/sub message
type Message struct {
    Topic   string
    Payload interface{}
}

// Subscriber represents a message handler
type Subscriber func(Message)

// PubSub is a simple in-memory pub/sub system
type PubSub struct {
    mu          sync.RWMutex
    subscribers map[string][]Subscriber
    closed      bool
}

// NewPubSub creates a new pub/sub instance
func NewPubSub() *PubSub {
    return &PubSub{
        subscribers: make(map[string][]Subscriber),
    }
}

// Subscribe adds a subscriber to a topic
func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    
    if ps.closed {
        return
    }
    
    ps.subscribers[topic] = append(ps.subscribers[topic], subscriber)
}

// Publish sends a message to all subscribers of a topic
func (ps *PubSub) Publish(topic string, payload interface{}) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()
    
    if ps.closed {
        return
    }
    
    message := Message{
        Topic:   topic,
        Payload: payload,
    }
    
    // Send to all subscribers asynchronously
    for _, subscriber := range ps.subscribers[topic] {
        go subscriber(message)
    }
}

// Close shuts down the pub/sub system
func (ps *PubSub) Close() {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    ps.closed = true
}

func main() {
    pubsub := NewPubSub()
    defer pubsub.Close()
    
    // Subscribe to user events
    pubsub.Subscribe("user.created", func(msg Message) {
        fmt.Printf("Email service: Welcome %v!\n", msg.Payload)
    })
    
    pubsub.Subscribe("user.created", func(msg Message) {
        fmt.Printf("Analytics: New user registered: %v\n", msg.Payload)
    })
    
    pubsub.Subscribe("user.deleted", func(msg Message) {
        fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload)
    })
    
    // Publish events
    pubsub.Publish("user.created", "[email protected]")
    pubsub.Publish("user.created", "[email protected]")
    pubsub.Publish("user.deleted", "[email protected]")
    
    // Wait for async processing
    time.Sleep(100 * time.Millisecond)
}

Advanced Pub/Sub with Channels

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Event represents a structured event
type Event struct {
    ID        string
    Type      string
    Timestamp time.Time
    Data      interface{}
}

// Subscription represents an active subscription
type Subscription struct {
    ID       string
    Topic    string
    Channel  chan Event
    Filter   func(Event) bool
    cancel   context.CancelFunc
}

// Close cancels the subscription
func (s *Subscription) Close() {
    if s.cancel != nil {
        s.cancel()
    }
}

// EventBus is a channel-based pub/sub system
type EventBus struct {
    mu            sync.RWMutex
    subscriptions map[string][]*Subscription
    buffer        int
    closed        bool
}

// NewEventBus creates a new event bus
func NewEventBus(bufferSize int) *EventBus {
    return &EventBus{
        subscriptions: make(map[string][]*Subscription),
        buffer:        bufferSize,
    }
}

// Subscribe creates a new subscription with optional filtering
func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    
    if eb.closed {
        return nil
    }
    
    subCtx, cancel := context.WithCancel(ctx)
    
    subscription := &Subscription{
        ID:      fmt.Sprintf("sub-%d", time.Now().UnixNano()),
        Topic:   topic,
        Channel: make(chan Event, eb.buffer),
        Filter:  filter,
        cancel:  cancel,
    }
    
    eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription)
    
    // Clean up subscription when context is cancelled
    go func() {
        <-subCtx.Done()
        eb.unsubscribe(subscription)
        close(subscription.Channel)
    }()
    
    return subscription
}

// unsubscribe removes a subscription
func (eb *EventBus) unsubscribe(sub *Subscription) {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    
    subs := eb.subscriptions[sub.Topic]
    for i, s := range subs {
        if s.ID == sub.ID {
            eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...)
            break
        }
    }
}

// Publish sends an event to all matching subscribers
func (eb *EventBus) Publish(event Event) {
    eb.mu.RLock()
    defer eb.mu.RUnlock()
    
    if eb.closed {
        return
    }
    
    event.Timestamp = time.Now()
    
    for _, subscription := range eb.subscriptions[event.Type] {
        // Apply filter if present
        if subscription.Filter != nil && !subscription.Filter(event) {
            continue
        }
        
        // Non-blocking send
        select {
        case subscription.Channel <- event:
        default:
            // Channel is full, could log this
            fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID)
        }
    }
}

// Close shuts down the event bus
func (eb *EventBus) Close() {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    
    eb.closed = true
    
    // Close all subscriptions
    for _, subs := range eb.subscriptions {
        for _, sub := range subs {
            sub.Close()
        }
    }
}

func main() {
    ctx := context.Background()
    eventBus := NewEventBus(10)
    defer eventBus.Close()
    
    // Subscribe to all user events
    userSub := eventBus.Subscribe(ctx, "user", nil)
    
    // Subscribe to only high-priority events
    prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool {
        if data, ok := e.Data.(map[string]interface{}); ok {
            return data["priority"] == "high"
        }
        return false
    })
    
    // Start event processors
    go func() {
        for event := range userSub.Channel {
            fmt.Printf("User processor: %s - %v\n", event.Type, event.Data)
        }
    }()
    
    go func() {
        for event := range prioritySub.Channel {
            fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data)
        }
    }()
    
    // Publish events
    eventBus.Publish(Event{
        ID:   "1",
        Type: "user",
        Data: map[string]interface{}{
            "action":   "login",
            "user":     "john",
            "priority": "low",
        },
    })
    
    eventBus.Publish(Event{
        ID:   "2",
        Type: "user",
        Data: map[string]interface{}{
            "action":   "payment",
            "user":     "jane",
            "priority": "high",
        },
    })
    
    time.Sleep(100 * time.Millisecond)
}

Persistent Pub/Sub with Replay

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// StoredEvent represents an event with storage metadata
type StoredEvent struct {
    Event
    Sequence int64
    Stored   time.Time
}

// PersistentEventBus stores events and supports replay
type PersistentEventBus struct {
    mu       sync.RWMutex
    events   []StoredEvent
    sequence int64
    subs     map[string][]*PersistentSubscription
    closed   bool
}

// PersistentSubscription supports replay from a specific point
type PersistentSubscription struct {
    ID         string
    Topic      string
    Channel    chan StoredEvent
    FromSeq    int64
    cancel     context.CancelFunc
}

func (s *PersistentSubscription) Close() {
    if s.cancel != nil {
        s.cancel()
    }
}

// NewPersistentEventBus creates a new persistent event bus
func NewPersistentEventBus() *PersistentEventBus {
    return &PersistentEventBus{
        events: make([]StoredEvent, 0),
        subs:   make(map[string][]*PersistentSubscription),
    }
}

// Subscribe creates a subscription with optional replay
func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription {
    peb.mu.Lock()
    defer peb.mu.Unlock()
    
    if peb.closed {
        return nil
    }
    
    subCtx, cancel := context.WithCancel(ctx)
    
    sub := &PersistentSubscription{
        ID:      fmt.Sprintf("psub-%d", time.Now().UnixNano()),
        Topic:   topic,
        Channel: make(chan StoredEvent, 100),
        FromSeq: fromSequence,
        cancel:  cancel,
    }
    
    peb.subs[topic] = append(peb.subs[topic], sub)
    
    // Replay historical events if requested
    if fromSequence >= 0 {
        go peb.replayEvents(sub)
    }
    
    // Clean up on context cancellation
    go func() {
        <-subCtx.Done()
        peb.unsubscribe(sub)
        close(sub.Channel)
    }()
    
    return sub
}

// replayEvents sends historical events to a subscription
func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) {
    peb.mu.RLock()
    defer peb.mu.RUnlock()
    
    for _, storedEvent := range peb.events {
        if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic {
            select {
            case sub.Channel <- storedEvent:
            default:
                // Channel full, skip
            }
        }
    }
}

// unsubscribe removes a subscription
func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) {
    peb.mu.Lock()
    defer peb.mu.Unlock()
    
    subs := peb.subs[sub.Topic]
    for i, s := range subs {
        if s.ID == sub.ID {
            peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...)
            break
        }
    }
}

// Publish stores and distributes an event
func (peb *PersistentEventBus) Publish(event Event) int64 {
    peb.mu.Lock()
    defer peb.mu.Unlock()
    
    if peb.closed {
        return -1
    }
    
    peb.sequence++
    storedEvent := StoredEvent{
        Event:    event,
        Sequence: peb.sequence,
        Stored:   time.Now(),
    }
    
    // Store event
    peb.events = append(peb.events, storedEvent)
    
    // Distribute to current subscribers
    for _, sub := range peb.subs[event.Type] {
        select {
        case sub.Channel <- storedEvent:
        default:
            // Channel full
        }
    }
    
    return peb.sequence
}

// GetLastSequence returns the last event sequence number
func (peb *PersistentEventBus) GetLastSequence() int64 {
    peb.mu.RLock()
    defer peb.mu.RUnlock()
    return peb.sequence
}

func main() {
    ctx := context.Background()
    eventBus := NewPersistentEventBus()
    
    // Publish some initial events
    eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"})
    eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"})
    eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"})
    
    fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence())
    
    // Subscribe from the beginning (replay all events)
    replaySub := eventBus.Subscribe(ctx, "order", 0)
    
    // Subscribe from current point (no replay)
    liveSub := eventBus.Subscribe(ctx, "order", -1)
    
    // Process replayed events
    go func() {
        fmt.Println("Replay subscription:")
        for event := range replaySub.Channel {
            fmt.Printf("  Replayed: seq=%d, %v\n", event.Sequence, event.Data)
        }
    }()
    
    // Process live events
    go func() {
        fmt.Println("Live subscription:")
        for event := range liveSub.Channel {
            fmt.Printf("  Live: seq=%d, %v\n", event.Sequence, event.Data)
        }
    }()
    
    time.Sleep(100 * time.Millisecond)
    
    // Publish new events
    eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"})
    eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"})
    
    time.Sleep(100 * time.Millisecond)
    
    replaySub.Close()
    liveSub.Close()
}

Typed Pub/Sub System

package main

import (
    "context"
    "fmt"
    "reflect"
    "sync"
)

// TypedEventBus provides type-safe pub/sub
type TypedEventBus struct {
    mu       sync.RWMutex
    handlers map[reflect.Type][]reflect.Value
    closed   bool
}

// NewTypedEventBus creates a new typed event bus
func NewTypedEventBus() *TypedEventBus {
    return &TypedEventBus{
        handlers: make(map[reflect.Type][]reflect.Value),
    }
}

// Subscribe registers a handler for a specific event type
func (teb *TypedEventBus) Subscribe(handler interface{}) {
    teb.mu.Lock()
    defer teb.mu.Unlock()
    
    if teb.closed {
        return
    }
    
    handlerValue := reflect.ValueOf(handler)
    handlerType := handlerValue.Type()
    
    // Validate handler signature: func(EventType)
    if handlerType.Kind() != reflect.Func ||
        handlerType.NumIn() != 1 ||
        handlerType.NumOut() != 0 {
        panic("Handler must be func(EventType)")
    }
    
    eventType := handlerType.In(0)
    teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue)
}

// Publish sends an event to all registered handlers
func (teb *TypedEventBus) Publish(event interface{}) {
    teb.mu.RLock()
    defer teb.mu.RUnlock()
    
    if teb.closed {
        return
    }
    
    eventType := reflect.TypeOf(event)
    eventValue := reflect.ValueOf(event)
    
    for _, handler := range teb.handlers[eventType] {
        go handler.Call([]reflect.Value{eventValue})
    }
}

// Event types
type UserCreated struct {
    UserID string
    Email  string
}

type OrderPlaced struct {
    OrderID string
    UserID  string
    Amount  float64
}

type PaymentProcessed struct {
    PaymentID string
    OrderID   string
    Success   bool
}

func main() {
    eventBus := NewTypedEventBus()
    
    // Subscribe to different event types
    eventBus.Subscribe(func(event UserCreated) {
        fmt.Printf("Email service: Send welcome email to %s\n", event.Email)
    })
    
    eventBus.Subscribe(func(event UserCreated) {
        fmt.Printf("Analytics: Track user registration %s\n", event.UserID)
    })
    
    eventBus.Subscribe(func(event OrderPlaced) {
        fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID)
    })
    
    eventBus.Subscribe(func(event OrderPlaced) {
        fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", 
            event.OrderID, event.Amount)
    })
    
    eventBus.Subscribe(func(event PaymentProcessed) {
        if event.Success {
            fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID)
        } else {
            fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID)
        }
    })
    
    // Publish events
    eventBus.Publish(UserCreated{
        UserID: "user123",
        Email:  "[email protected]",
    })
    
    eventBus.Publish(OrderPlaced{
        OrderID: "order456",
        UserID:  "user123",
        Amount:  99.99,
    })
    
    eventBus.Publish(PaymentProcessed{
        PaymentID: "pay789",
        OrderID:   "order456",
        Success:   true,
    })
    
    // Wait for async processing
    time.Sleep(100 * time.Millisecond)
}

Best Practices

  1. Async Processing: Handle events asynchronously to avoid blocking publishers
  2. Error Handling: Implement proper error handling in subscribers
  3. Buffering: Use buffered channels to handle bursts of events
  4. Graceful Shutdown: Ensure clean shutdown of all subscribers
  5. Dead Letter Queues: Handle failed message processing
  6. Monitoring: Track message rates, processing times, and failures
  7. Type Safety: Use typed events when possible
  8. Idempotency: Design subscribers to handle duplicate messages

Common Pitfalls

  1. Memory Leaks: Not closing subscriptions properly
  2. Blocking Publishers: Slow subscribers blocking the entire system
  3. Lost Messages: Not handling channel buffer overflows
  4. Circular Dependencies: Events triggering other events in loops
  5. No Error Handling: Panics in subscribers affecting the system

Testing Pub/Sub Systems

package main

import (
    "context"
    "testing"
    "time"
)

func TestEventBus(t *testing.T) {
    eventBus := NewEventBus(10)
    defer eventBus.Close()
    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    
    // Subscribe to events
    sub := eventBus.Subscribe(ctx, "test", nil)
    
    // Publish event
    testEvent := Event{
        ID:   "test1",
        Type: "test",
        Data: "test data",
    }
    
    eventBus.Publish(testEvent)
    
    // Verify event received
    select {
    case received := <-sub.Channel:
        if received.ID != testEvent.ID {
            t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID)
        }
    case <-time.After(100 * time.Millisecond):
        t.Error("Event not received within timeout")
    }
}

The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication.


Next: Learn about Request/Response Pattern for synchronous communication patterns.