Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response →
What is the Pub/Sub Pattern? The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies.
Key Components:
Publisher: Sends messages/events Subscriber: Receives and processes messages/events Message Broker: Routes messages from publishers to subscribers Topics/Channels: Categories for organizing messages Real-World Use Cases Event-Driven Architecture: Microservices communication Real-Time Notifications: User activity feeds, alerts Data Streaming: Log aggregation, metrics collection UI Updates: React to state changes across components Workflow Orchestration: Trigger actions based on events Cache Invalidation: Notify when data changes Basic Pub/Sub Implementation package main import ( "fmt" "sync" "time" ) // Message represents a pub/sub message type Message struct { Topic string Payload interface{} } // Subscriber represents a message handler type Subscriber func(Message) // PubSub is a simple in-memory pub/sub system type PubSub struct { mu sync.RWMutex subscribers map[string][]Subscriber closed bool } // NewPubSub creates a new pub/sub instance func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]Subscriber), } } // Subscribe adds a subscriber to a topic func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) { ps.mu.Lock() defer ps.mu.Unlock() if ps.closed { return } ps.subscribers[topic] = append(ps.subscribers[topic], subscriber) } // Publish sends a message to all subscribers of a topic func (ps *PubSub) Publish(topic string, payload interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } message := Message{ Topic: topic, Payload: payload, } // Send to all subscribers asynchronously for _, subscriber := range ps.subscribers[topic] { go subscriber(message) } } // Close shuts down the pub/sub system func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true } func main() { pubsub := NewPubSub() defer pubsub.Close() // Subscribe to user events pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Email service: Welcome %v!\n", msg.Payload) }) pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Analytics: New user registered: %v\n", msg.Payload) }) pubsub.Subscribe("user.deleted", func(msg Message) { fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload) }) // Publish events pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.deleted", "[email protected]") // Wait for async processing time.Sleep(100 * time.Millisecond) } Advanced Pub/Sub with Channels package main import ( "context" "fmt" "sync" "time" ) // Event represents a structured event type Event struct { ID string Type string Timestamp time.Time Data interface{} } // Subscription represents an active subscription type Subscription struct { ID string Topic string Channel chan Event Filter func(Event) bool cancel context.CancelFunc } // Close cancels the subscription func (s *Subscription) Close() { if s.cancel != nil { s.cancel() } } // EventBus is a channel-based pub/sub system type EventBus struct { mu sync.RWMutex subscriptions map[string][]*Subscription buffer int closed bool } // NewEventBus creates a new event bus func NewEventBus(bufferSize int) *EventBus { return &EventBus{ subscriptions: make(map[string][]*Subscription), buffer: bufferSize, } } // Subscribe creates a new subscription with optional filtering func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription { eb.mu.Lock() defer eb.mu.Unlock() if eb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) subscription := &Subscription{ ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan Event, eb.buffer), Filter: filter, cancel: cancel, } eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription) // Clean up subscription when context is cancelled go func() { <-subCtx.Done() eb.unsubscribe(subscription) close(subscription.Channel) }() return subscription } // unsubscribe removes a subscription func (eb *EventBus) unsubscribe(sub *Subscription) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscriptions[sub.Topic] for i, s := range subs { if s.ID == sub.ID { eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish sends an event to all matching subscribers func (eb *EventBus) Publish(event Event) { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return } event.Timestamp = time.Now() for _, subscription := range eb.subscriptions[event.Type] { // Apply filter if present if subscription.Filter != nil && !subscription.Filter(event) { continue } // Non-blocking send select { case subscription.Channel <- event: default: // Channel is full, could log this fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID) } } } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true // Close all subscriptions for _, subs := range eb.subscriptions { for _, sub := range subs { sub.Close() } } } func main() { ctx := context.Background() eventBus := NewEventBus(10) defer eventBus.Close() // Subscribe to all user events userSub := eventBus.Subscribe(ctx, "user", nil) // Subscribe to only high-priority events prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool { if data, ok := e.Data.(map[string]interface{}); ok { return data["priority"] == "high" } return false }) // Start event processors go func() { for event := range userSub.Channel { fmt.Printf("User processor: %s - %v\n", event.Type, event.Data) } }() go func() { for event := range prioritySub.Channel { fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data) } }() // Publish events eventBus.Publish(Event{ ID: "1", Type: "user", Data: map[string]interface{}{ "action": "login", "user": "john", "priority": "low", }, }) eventBus.Publish(Event{ ID: "2", Type: "user", Data: map[string]interface{}{ "action": "payment", "user": "jane", "priority": "high", }, }) time.Sleep(100 * time.Millisecond) } Persistent Pub/Sub with Replay package main import ( "context" "fmt" "sync" "time" ) // StoredEvent represents an event with storage metadata type StoredEvent struct { Event Sequence int64 Stored time.Time } // PersistentEventBus stores events and supports replay type PersistentEventBus struct { mu sync.RWMutex events []StoredEvent sequence int64 subs map[string][]*PersistentSubscription closed bool } // PersistentSubscription supports replay from a specific point type PersistentSubscription struct { ID string Topic string Channel chan StoredEvent FromSeq int64 cancel context.CancelFunc } func (s *PersistentSubscription) Close() { if s.cancel != nil { s.cancel() } } // NewPersistentEventBus creates a new persistent event bus func NewPersistentEventBus() *PersistentEventBus { return &PersistentEventBus{ events: make([]StoredEvent, 0), subs: make(map[string][]*PersistentSubscription), } } // Subscribe creates a subscription with optional replay func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) sub := &PersistentSubscription{ ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan StoredEvent, 100), FromSeq: fromSequence, cancel: cancel, } peb.subs[topic] = append(peb.subs[topic], sub) // Replay historical events if requested if fromSequence >= 0 { go peb.replayEvents(sub) } // Clean up on context cancellation go func() { <-subCtx.Done() peb.unsubscribe(sub) close(sub.Channel) }() return sub } // replayEvents sends historical events to a subscription func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) { peb.mu.RLock() defer peb.mu.RUnlock() for _, storedEvent := range peb.events { if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic { select { case sub.Channel <- storedEvent: default: // Channel full, skip } } } } // unsubscribe removes a subscription func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) { peb.mu.Lock() defer peb.mu.Unlock() subs := peb.subs[sub.Topic] for i, s := range subs { if s.ID == sub.ID { peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish stores and distributes an event func (peb *PersistentEventBus) Publish(event Event) int64 { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return -1 } peb.sequence++ storedEvent := StoredEvent{ Event: event, Sequence: peb.sequence, Stored: time.Now(), } // Store event peb.events = append(peb.events, storedEvent) // Distribute to current subscribers for _, sub := range peb.subs[event.Type] { select { case sub.Channel <- storedEvent: default: // Channel full } } return peb.sequence } // GetLastSequence returns the last event sequence number func (peb *PersistentEventBus) GetLastSequence() int64 { peb.mu.RLock() defer peb.mu.RUnlock() return peb.sequence } func main() { ctx := context.Background() eventBus := NewPersistentEventBus() // Publish some initial events eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"}) eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"}) eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"}) fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence()) // Subscribe from the beginning (replay all events) replaySub := eventBus.Subscribe(ctx, "order", 0) // Subscribe from current point (no replay) liveSub := eventBus.Subscribe(ctx, "order", -1) // Process replayed events go func() { fmt.Println("Replay subscription:") for event := range replaySub.Channel { fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data) } }() // Process live events go func() { fmt.Println("Live subscription:") for event := range liveSub.Channel { fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data) } }() time.Sleep(100 * time.Millisecond) // Publish new events eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"}) eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"}) time.Sleep(100 * time.Millisecond) replaySub.Close() liveSub.Close() } Typed Pub/Sub System package main import ( "context" "fmt" "reflect" "sync" ) // TypedEventBus provides type-safe pub/sub type TypedEventBus struct { mu sync.RWMutex handlers map[reflect.Type][]reflect.Value closed bool } // NewTypedEventBus creates a new typed event bus func NewTypedEventBus() *TypedEventBus { return &TypedEventBus{ handlers: make(map[reflect.Type][]reflect.Value), } } // Subscribe registers a handler for a specific event type func (teb *TypedEventBus) Subscribe(handler interface{}) { teb.mu.Lock() defer teb.mu.Unlock() if teb.closed { return } handlerValue := reflect.ValueOf(handler) handlerType := handlerValue.Type() // Validate handler signature: func(EventType) if handlerType.Kind() != reflect.Func || handlerType.NumIn() != 1 || handlerType.NumOut() != 0 { panic("Handler must be func(EventType)") } eventType := handlerType.In(0) teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue) } // Publish sends an event to all registered handlers func (teb *TypedEventBus) Publish(event interface{}) { teb.mu.RLock() defer teb.mu.RUnlock() if teb.closed { return } eventType := reflect.TypeOf(event) eventValue := reflect.ValueOf(event) for _, handler := range teb.handlers[eventType] { go handler.Call([]reflect.Value{eventValue}) } } // Event types type UserCreated struct { UserID string Email string } type OrderPlaced struct { OrderID string UserID string Amount float64 } type PaymentProcessed struct { PaymentID string OrderID string Success bool } func main() { eventBus := NewTypedEventBus() // Subscribe to different event types eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Email service: Send welcome email to %s\n", event.Email) }) eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Analytics: Track user registration %s\n", event.UserID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", event.OrderID, event.Amount) }) eventBus.Subscribe(func(event PaymentProcessed) { if event.Success { fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID) } else { fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID) } }) // Publish events eventBus.Publish(UserCreated{ UserID: "user123", Email: "[email protected]", }) eventBus.Publish(OrderPlaced{ OrderID: "order456", UserID: "user123", Amount: 99.99, }) eventBus.Publish(PaymentProcessed{ PaymentID: "pay789", OrderID: "order456", Success: true, }) // Wait for async processing time.Sleep(100 * time.Millisecond) } Best Practices Async Processing: Handle events asynchronously to avoid blocking publishers Error Handling: Implement proper error handling in subscribers Buffering: Use buffered channels to handle bursts of events Graceful Shutdown: Ensure clean shutdown of all subscribers Dead Letter Queues: Handle failed message processing Monitoring: Track message rates, processing times, and failures Type Safety: Use typed events when possible Idempotency: Design subscribers to handle duplicate messages Common Pitfalls Memory Leaks: Not closing subscriptions properly Blocking Publishers: Slow subscribers blocking the entire system Lost Messages: Not handling channel buffer overflows Circular Dependencies: Events triggering other events in loops No Error Handling: Panics in subscribers affecting the system Testing Pub/Sub Systems package main import ( "context" "testing" "time" ) func TestEventBus(t *testing.T) { eventBus := NewEventBus(10) defer eventBus.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Subscribe to events sub := eventBus.Subscribe(ctx, "test", nil) // Publish event testEvent := Event{ ID: "test1", Type: "test", Data: "test data", } eventBus.Publish(testEvent) // Verify event received select { case received := <-sub.Channel: if received.ID != testEvent.ID { t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("Event not received within timeout") } } The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication.
...