Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response →
What is the Pub/Sub Pattern?
The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies.
Key Components:
- Publisher: Sends messages/events
- Subscriber: Receives and processes messages/events
- Message Broker: Routes messages from publishers to subscribers
- Topics/Channels: Categories for organizing messages
Real-World Use Cases
- Event-Driven Architecture: Microservices communication
- Real-Time Notifications: User activity feeds, alerts
- Data Streaming: Log aggregation, metrics collection
- UI Updates: React to state changes across components
- Workflow Orchestration: Trigger actions based on events
- Cache Invalidation: Notify when data changes
Basic Pub/Sub Implementation
package main
import (
"fmt"
"sync"
"time"
)
// Message represents a pub/sub message
type Message struct {
Topic string
Payload interface{}
}
// Subscriber represents a message handler
type Subscriber func(Message)
// PubSub is a simple in-memory pub/sub system
type PubSub struct {
mu sync.RWMutex
subscribers map[string][]Subscriber
closed bool
}
// NewPubSub creates a new pub/sub instance
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]Subscriber),
}
}
// Subscribe adds a subscriber to a topic
func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.closed {
return
}
ps.subscribers[topic] = append(ps.subscribers[topic], subscriber)
}
// Publish sends a message to all subscribers of a topic
func (ps *PubSub) Publish(topic string, payload interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.closed {
return
}
message := Message{
Topic: topic,
Payload: payload,
}
// Send to all subscribers asynchronously
for _, subscriber := range ps.subscribers[topic] {
go subscriber(message)
}
}
// Close shuts down the pub/sub system
func (ps *PubSub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.closed = true
}
func main() {
pubsub := NewPubSub()
defer pubsub.Close()
// Subscribe to user events
pubsub.Subscribe("user.created", func(msg Message) {
fmt.Printf("Email service: Welcome %v!\n", msg.Payload)
})
pubsub.Subscribe("user.created", func(msg Message) {
fmt.Printf("Analytics: New user registered: %v\n", msg.Payload)
})
pubsub.Subscribe("user.deleted", func(msg Message) {
fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload)
})
// Publish events
pubsub.Publish("user.created", "[email protected]")
pubsub.Publish("user.created", "[email protected]")
pubsub.Publish("user.deleted", "[email protected]")
// Wait for async processing
time.Sleep(100 * time.Millisecond)
}
Advanced Pub/Sub with Channels
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Event represents a structured event
type Event struct {
ID string
Type string
Timestamp time.Time
Data interface{}
}
// Subscription represents an active subscription
type Subscription struct {
ID string
Topic string
Channel chan Event
Filter func(Event) bool
cancel context.CancelFunc
}
// Close cancels the subscription
func (s *Subscription) Close() {
if s.cancel != nil {
s.cancel()
}
}
// EventBus is a channel-based pub/sub system
type EventBus struct {
mu sync.RWMutex
subscriptions map[string][]*Subscription
buffer int
closed bool
}
// NewEventBus creates a new event bus
func NewEventBus(bufferSize int) *EventBus {
return &EventBus{
subscriptions: make(map[string][]*Subscription),
buffer: bufferSize,
}
}
// Subscribe creates a new subscription with optional filtering
func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription {
eb.mu.Lock()
defer eb.mu.Unlock()
if eb.closed {
return nil
}
subCtx, cancel := context.WithCancel(ctx)
subscription := &Subscription{
ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()),
Topic: topic,
Channel: make(chan Event, eb.buffer),
Filter: filter,
cancel: cancel,
}
eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription)
// Clean up subscription when context is cancelled
go func() {
<-subCtx.Done()
eb.unsubscribe(subscription)
close(subscription.Channel)
}()
return subscription
}
// unsubscribe removes a subscription
func (eb *EventBus) unsubscribe(sub *Subscription) {
eb.mu.Lock()
defer eb.mu.Unlock()
subs := eb.subscriptions[sub.Topic]
for i, s := range subs {
if s.ID == sub.ID {
eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...)
break
}
}
}
// Publish sends an event to all matching subscribers
func (eb *EventBus) Publish(event Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
if eb.closed {
return
}
event.Timestamp = time.Now()
for _, subscription := range eb.subscriptions[event.Type] {
// Apply filter if present
if subscription.Filter != nil && !subscription.Filter(event) {
continue
}
// Non-blocking send
select {
case subscription.Channel <- event:
default:
// Channel is full, could log this
fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID)
}
}
}
// Close shuts down the event bus
func (eb *EventBus) Close() {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.closed = true
// Close all subscriptions
for _, subs := range eb.subscriptions {
for _, sub := range subs {
sub.Close()
}
}
}
func main() {
ctx := context.Background()
eventBus := NewEventBus(10)
defer eventBus.Close()
// Subscribe to all user events
userSub := eventBus.Subscribe(ctx, "user", nil)
// Subscribe to only high-priority events
prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool {
if data, ok := e.Data.(map[string]interface{}); ok {
return data["priority"] == "high"
}
return false
})
// Start event processors
go func() {
for event := range userSub.Channel {
fmt.Printf("User processor: %s - %v\n", event.Type, event.Data)
}
}()
go func() {
for event := range prioritySub.Channel {
fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data)
}
}()
// Publish events
eventBus.Publish(Event{
ID: "1",
Type: "user",
Data: map[string]interface{}{
"action": "login",
"user": "john",
"priority": "low",
},
})
eventBus.Publish(Event{
ID: "2",
Type: "user",
Data: map[string]interface{}{
"action": "payment",
"user": "jane",
"priority": "high",
},
})
time.Sleep(100 * time.Millisecond)
}
Persistent Pub/Sub with Replay
package main
import (
"context"
"fmt"
"sync"
"time"
)
// StoredEvent represents an event with storage metadata
type StoredEvent struct {
Event
Sequence int64
Stored time.Time
}
// PersistentEventBus stores events and supports replay
type PersistentEventBus struct {
mu sync.RWMutex
events []StoredEvent
sequence int64
subs map[string][]*PersistentSubscription
closed bool
}
// PersistentSubscription supports replay from a specific point
type PersistentSubscription struct {
ID string
Topic string
Channel chan StoredEvent
FromSeq int64
cancel context.CancelFunc
}
func (s *PersistentSubscription) Close() {
if s.cancel != nil {
s.cancel()
}
}
// NewPersistentEventBus creates a new persistent event bus
func NewPersistentEventBus() *PersistentEventBus {
return &PersistentEventBus{
events: make([]StoredEvent, 0),
subs: make(map[string][]*PersistentSubscription),
}
}
// Subscribe creates a subscription with optional replay
func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription {
peb.mu.Lock()
defer peb.mu.Unlock()
if peb.closed {
return nil
}
subCtx, cancel := context.WithCancel(ctx)
sub := &PersistentSubscription{
ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()),
Topic: topic,
Channel: make(chan StoredEvent, 100),
FromSeq: fromSequence,
cancel: cancel,
}
peb.subs[topic] = append(peb.subs[topic], sub)
// Replay historical events if requested
if fromSequence >= 0 {
go peb.replayEvents(sub)
}
// Clean up on context cancellation
go func() {
<-subCtx.Done()
peb.unsubscribe(sub)
close(sub.Channel)
}()
return sub
}
// replayEvents sends historical events to a subscription
func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) {
peb.mu.RLock()
defer peb.mu.RUnlock()
for _, storedEvent := range peb.events {
if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic {
select {
case sub.Channel <- storedEvent:
default:
// Channel full, skip
}
}
}
}
// unsubscribe removes a subscription
func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) {
peb.mu.Lock()
defer peb.mu.Unlock()
subs := peb.subs[sub.Topic]
for i, s := range subs {
if s.ID == sub.ID {
peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...)
break
}
}
}
// Publish stores and distributes an event
func (peb *PersistentEventBus) Publish(event Event) int64 {
peb.mu.Lock()
defer peb.mu.Unlock()
if peb.closed {
return -1
}
peb.sequence++
storedEvent := StoredEvent{
Event: event,
Sequence: peb.sequence,
Stored: time.Now(),
}
// Store event
peb.events = append(peb.events, storedEvent)
// Distribute to current subscribers
for _, sub := range peb.subs[event.Type] {
select {
case sub.Channel <- storedEvent:
default:
// Channel full
}
}
return peb.sequence
}
// GetLastSequence returns the last event sequence number
func (peb *PersistentEventBus) GetLastSequence() int64 {
peb.mu.RLock()
defer peb.mu.RUnlock()
return peb.sequence
}
func main() {
ctx := context.Background()
eventBus := NewPersistentEventBus()
// Publish some initial events
eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"})
eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"})
eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"})
fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence())
// Subscribe from the beginning (replay all events)
replaySub := eventBus.Subscribe(ctx, "order", 0)
// Subscribe from current point (no replay)
liveSub := eventBus.Subscribe(ctx, "order", -1)
// Process replayed events
go func() {
fmt.Println("Replay subscription:")
for event := range replaySub.Channel {
fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data)
}
}()
// Process live events
go func() {
fmt.Println("Live subscription:")
for event := range liveSub.Channel {
fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data)
}
}()
time.Sleep(100 * time.Millisecond)
// Publish new events
eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"})
eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"})
time.Sleep(100 * time.Millisecond)
replaySub.Close()
liveSub.Close()
}
Typed Pub/Sub System
package main
import (
"context"
"fmt"
"reflect"
"sync"
)
// TypedEventBus provides type-safe pub/sub
type TypedEventBus struct {
mu sync.RWMutex
handlers map[reflect.Type][]reflect.Value
closed bool
}
// NewTypedEventBus creates a new typed event bus
func NewTypedEventBus() *TypedEventBus {
return &TypedEventBus{
handlers: make(map[reflect.Type][]reflect.Value),
}
}
// Subscribe registers a handler for a specific event type
func (teb *TypedEventBus) Subscribe(handler interface{}) {
teb.mu.Lock()
defer teb.mu.Unlock()
if teb.closed {
return
}
handlerValue := reflect.ValueOf(handler)
handlerType := handlerValue.Type()
// Validate handler signature: func(EventType)
if handlerType.Kind() != reflect.Func ||
handlerType.NumIn() != 1 ||
handlerType.NumOut() != 0 {
panic("Handler must be func(EventType)")
}
eventType := handlerType.In(0)
teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue)
}
// Publish sends an event to all registered handlers
func (teb *TypedEventBus) Publish(event interface{}) {
teb.mu.RLock()
defer teb.mu.RUnlock()
if teb.closed {
return
}
eventType := reflect.TypeOf(event)
eventValue := reflect.ValueOf(event)
for _, handler := range teb.handlers[eventType] {
go handler.Call([]reflect.Value{eventValue})
}
}
// Event types
type UserCreated struct {
UserID string
Email string
}
type OrderPlaced struct {
OrderID string
UserID string
Amount float64
}
type PaymentProcessed struct {
PaymentID string
OrderID string
Success bool
}
func main() {
eventBus := NewTypedEventBus()
// Subscribe to different event types
eventBus.Subscribe(func(event UserCreated) {
fmt.Printf("Email service: Send welcome email to %s\n", event.Email)
})
eventBus.Subscribe(func(event UserCreated) {
fmt.Printf("Analytics: Track user registration %s\n", event.UserID)
})
eventBus.Subscribe(func(event OrderPlaced) {
fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID)
})
eventBus.Subscribe(func(event OrderPlaced) {
fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n",
event.OrderID, event.Amount)
})
eventBus.Subscribe(func(event PaymentProcessed) {
if event.Success {
fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID)
} else {
fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID)
}
})
// Publish events
eventBus.Publish(UserCreated{
UserID: "user123",
Email: "[email protected]",
})
eventBus.Publish(OrderPlaced{
OrderID: "order456",
UserID: "user123",
Amount: 99.99,
})
eventBus.Publish(PaymentProcessed{
PaymentID: "pay789",
OrderID: "order456",
Success: true,
})
// Wait for async processing
time.Sleep(100 * time.Millisecond)
}
Best Practices
- Async Processing: Handle events asynchronously to avoid blocking publishers
- Error Handling: Implement proper error handling in subscribers
- Buffering: Use buffered channels to handle bursts of events
- Graceful Shutdown: Ensure clean shutdown of all subscribers
- Dead Letter Queues: Handle failed message processing
- Monitoring: Track message rates, processing times, and failures
- Type Safety: Use typed events when possible
- Idempotency: Design subscribers to handle duplicate messages
Common Pitfalls
- Memory Leaks: Not closing subscriptions properly
- Blocking Publishers: Slow subscribers blocking the entire system
- Lost Messages: Not handling channel buffer overflows
- Circular Dependencies: Events triggering other events in loops
- No Error Handling: Panics in subscribers affecting the system
Testing Pub/Sub Systems
package main
import (
"context"
"testing"
"time"
)
func TestEventBus(t *testing.T) {
eventBus := NewEventBus(10)
defer eventBus.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Subscribe to events
sub := eventBus.Subscribe(ctx, "test", nil)
// Publish event
testEvent := Event{
ID: "test1",
Type: "test",
Data: "test data",
}
eventBus.Publish(testEvent)
// Verify event received
select {
case received := <-sub.Channel:
if received.ID != testEvent.ID {
t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("Event not received within timeout")
}
}
The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication.
Next: Learn about Request/Response Pattern for synchronous communication patterns.