Pub/Sub Pattern in Go

    Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response → What is the Pub/Sub Pattern? The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies. Key Components: Publisher: Sends messages/events Subscriber: Receives and processes messages/events Message Broker: Routes messages from publishers to subscribers Topics/Channels: Categories for organizing messages Real-World Use Cases Event-Driven Architecture: Microservices communication Real-Time Notifications: User activity feeds, alerts Data Streaming: Log aggregation, metrics collection UI Updates: React to state changes across components Workflow Orchestration: Trigger actions based on events Cache Invalidation: Notify when data changes Basic Pub/Sub Implementation package main import ( "fmt" "sync" "time" ) // Message represents a pub/sub message type Message struct { Topic string Payload interface{} } // Subscriber represents a message handler type Subscriber func(Message) // PubSub is a simple in-memory pub/sub system type PubSub struct { mu sync.RWMutex subscribers map[string][]Subscriber closed bool } // NewPubSub creates a new pub/sub instance func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]Subscriber), } } // Subscribe adds a subscriber to a topic func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) { ps.mu.Lock() defer ps.mu.Unlock() if ps.closed { return } ps.subscribers[topic] = append(ps.subscribers[topic], subscriber) } // Publish sends a message to all subscribers of a topic func (ps *PubSub) Publish(topic string, payload interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } message := Message{ Topic: topic, Payload: payload, } // Send to all subscribers asynchronously for _, subscriber := range ps.subscribers[topic] { go subscriber(message) } } // Close shuts down the pub/sub system func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true } func main() { pubsub := NewPubSub() defer pubsub.Close() // Subscribe to user events pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Email service: Welcome %v!\n", msg.Payload) }) pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Analytics: New user registered: %v\n", msg.Payload) }) pubsub.Subscribe("user.deleted", func(msg Message) { fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload) }) // Publish events pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.deleted", "[email protected]") // Wait for async processing time.Sleep(100 * time.Millisecond) } Advanced Pub/Sub with Channels package main import ( "context" "fmt" "sync" "time" ) // Event represents a structured event type Event struct { ID string Type string Timestamp time.Time Data interface{} } // Subscription represents an active subscription type Subscription struct { ID string Topic string Channel chan Event Filter func(Event) bool cancel context.CancelFunc } // Close cancels the subscription func (s *Subscription) Close() { if s.cancel != nil { s.cancel() } } // EventBus is a channel-based pub/sub system type EventBus struct { mu sync.RWMutex subscriptions map[string][]*Subscription buffer int closed bool } // NewEventBus creates a new event bus func NewEventBus(bufferSize int) *EventBus { return &EventBus{ subscriptions: make(map[string][]*Subscription), buffer: bufferSize, } } // Subscribe creates a new subscription with optional filtering func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription { eb.mu.Lock() defer eb.mu.Unlock() if eb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) subscription := &Subscription{ ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan Event, eb.buffer), Filter: filter, cancel: cancel, } eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription) // Clean up subscription when context is cancelled go func() { <-subCtx.Done() eb.unsubscribe(subscription) close(subscription.Channel) }() return subscription } // unsubscribe removes a subscription func (eb *EventBus) unsubscribe(sub *Subscription) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscriptions[sub.Topic] for i, s := range subs { if s.ID == sub.ID { eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish sends an event to all matching subscribers func (eb *EventBus) Publish(event Event) { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return } event.Timestamp = time.Now() for _, subscription := range eb.subscriptions[event.Type] { // Apply filter if present if subscription.Filter != nil && !subscription.Filter(event) { continue } // Non-blocking send select { case subscription.Channel <- event: default: // Channel is full, could log this fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID) } } } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true // Close all subscriptions for _, subs := range eb.subscriptions { for _, sub := range subs { sub.Close() } } } func main() { ctx := context.Background() eventBus := NewEventBus(10) defer eventBus.Close() // Subscribe to all user events userSub := eventBus.Subscribe(ctx, "user", nil) // Subscribe to only high-priority events prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool { if data, ok := e.Data.(map[string]interface{}); ok { return data["priority"] == "high" } return false }) // Start event processors go func() { for event := range userSub.Channel { fmt.Printf("User processor: %s - %v\n", event.Type, event.Data) } }() go func() { for event := range prioritySub.Channel { fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data) } }() // Publish events eventBus.Publish(Event{ ID: "1", Type: "user", Data: map[string]interface{}{ "action": "login", "user": "john", "priority": "low", }, }) eventBus.Publish(Event{ ID: "2", Type: "user", Data: map[string]interface{}{ "action": "payment", "user": "jane", "priority": "high", }, }) time.Sleep(100 * time.Millisecond) } Persistent Pub/Sub with Replay package main import ( "context" "fmt" "sync" "time" ) // StoredEvent represents an event with storage metadata type StoredEvent struct { Event Sequence int64 Stored time.Time } // PersistentEventBus stores events and supports replay type PersistentEventBus struct { mu sync.RWMutex events []StoredEvent sequence int64 subs map[string][]*PersistentSubscription closed bool } // PersistentSubscription supports replay from a specific point type PersistentSubscription struct { ID string Topic string Channel chan StoredEvent FromSeq int64 cancel context.CancelFunc } func (s *PersistentSubscription) Close() { if s.cancel != nil { s.cancel() } } // NewPersistentEventBus creates a new persistent event bus func NewPersistentEventBus() *PersistentEventBus { return &PersistentEventBus{ events: make([]StoredEvent, 0), subs: make(map[string][]*PersistentSubscription), } } // Subscribe creates a subscription with optional replay func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) sub := &PersistentSubscription{ ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan StoredEvent, 100), FromSeq: fromSequence, cancel: cancel, } peb.subs[topic] = append(peb.subs[topic], sub) // Replay historical events if requested if fromSequence >= 0 { go peb.replayEvents(sub) } // Clean up on context cancellation go func() { <-subCtx.Done() peb.unsubscribe(sub) close(sub.Channel) }() return sub } // replayEvents sends historical events to a subscription func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) { peb.mu.RLock() defer peb.mu.RUnlock() for _, storedEvent := range peb.events { if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic { select { case sub.Channel <- storedEvent: default: // Channel full, skip } } } } // unsubscribe removes a subscription func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) { peb.mu.Lock() defer peb.mu.Unlock() subs := peb.subs[sub.Topic] for i, s := range subs { if s.ID == sub.ID { peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish stores and distributes an event func (peb *PersistentEventBus) Publish(event Event) int64 { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return -1 } peb.sequence++ storedEvent := StoredEvent{ Event: event, Sequence: peb.sequence, Stored: time.Now(), } // Store event peb.events = append(peb.events, storedEvent) // Distribute to current subscribers for _, sub := range peb.subs[event.Type] { select { case sub.Channel <- storedEvent: default: // Channel full } } return peb.sequence } // GetLastSequence returns the last event sequence number func (peb *PersistentEventBus) GetLastSequence() int64 { peb.mu.RLock() defer peb.mu.RUnlock() return peb.sequence } func main() { ctx := context.Background() eventBus := NewPersistentEventBus() // Publish some initial events eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"}) eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"}) eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"}) fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence()) // Subscribe from the beginning (replay all events) replaySub := eventBus.Subscribe(ctx, "order", 0) // Subscribe from current point (no replay) liveSub := eventBus.Subscribe(ctx, "order", -1) // Process replayed events go func() { fmt.Println("Replay subscription:") for event := range replaySub.Channel { fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data) } }() // Process live events go func() { fmt.Println("Live subscription:") for event := range liveSub.Channel { fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data) } }() time.Sleep(100 * time.Millisecond) // Publish new events eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"}) eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"}) time.Sleep(100 * time.Millisecond) replaySub.Close() liveSub.Close() } Typed Pub/Sub System package main import ( "context" "fmt" "reflect" "sync" ) // TypedEventBus provides type-safe pub/sub type TypedEventBus struct { mu sync.RWMutex handlers map[reflect.Type][]reflect.Value closed bool } // NewTypedEventBus creates a new typed event bus func NewTypedEventBus() *TypedEventBus { return &TypedEventBus{ handlers: make(map[reflect.Type][]reflect.Value), } } // Subscribe registers a handler for a specific event type func (teb *TypedEventBus) Subscribe(handler interface{}) { teb.mu.Lock() defer teb.mu.Unlock() if teb.closed { return } handlerValue := reflect.ValueOf(handler) handlerType := handlerValue.Type() // Validate handler signature: func(EventType) if handlerType.Kind() != reflect.Func || handlerType.NumIn() != 1 || handlerType.NumOut() != 0 { panic("Handler must be func(EventType)") } eventType := handlerType.In(0) teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue) } // Publish sends an event to all registered handlers func (teb *TypedEventBus) Publish(event interface{}) { teb.mu.RLock() defer teb.mu.RUnlock() if teb.closed { return } eventType := reflect.TypeOf(event) eventValue := reflect.ValueOf(event) for _, handler := range teb.handlers[eventType] { go handler.Call([]reflect.Value{eventValue}) } } // Event types type UserCreated struct { UserID string Email string } type OrderPlaced struct { OrderID string UserID string Amount float64 } type PaymentProcessed struct { PaymentID string OrderID string Success bool } func main() { eventBus := NewTypedEventBus() // Subscribe to different event types eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Email service: Send welcome email to %s\n", event.Email) }) eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Analytics: Track user registration %s\n", event.UserID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", event.OrderID, event.Amount) }) eventBus.Subscribe(func(event PaymentProcessed) { if event.Success { fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID) } else { fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID) } }) // Publish events eventBus.Publish(UserCreated{ UserID: "user123", Email: "[email protected]", }) eventBus.Publish(OrderPlaced{ OrderID: "order456", UserID: "user123", Amount: 99.99, }) eventBus.Publish(PaymentProcessed{ PaymentID: "pay789", OrderID: "order456", Success: true, }) // Wait for async processing time.Sleep(100 * time.Millisecond) } Best Practices Async Processing: Handle events asynchronously to avoid blocking publishers Error Handling: Implement proper error handling in subscribers Buffering: Use buffered channels to handle bursts of events Graceful Shutdown: Ensure clean shutdown of all subscribers Dead Letter Queues: Handle failed message processing Monitoring: Track message rates, processing times, and failures Type Safety: Use typed events when possible Idempotency: Design subscribers to handle duplicate messages Common Pitfalls Memory Leaks: Not closing subscriptions properly Blocking Publishers: Slow subscribers blocking the entire system Lost Messages: Not handling channel buffer overflows Circular Dependencies: Events triggering other events in loops No Error Handling: Panics in subscribers affecting the system Testing Pub/Sub Systems package main import ( "context" "testing" "time" ) func TestEventBus(t *testing.T) { eventBus := NewEventBus(10) defer eventBus.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Subscribe to events sub := eventBus.Subscribe(ctx, "test", nil) // Publish event testEvent := Event{ ID: "test1", Type: "test", Data: "test data", } eventBus.Publish(testEvent) // Verify event received select { case received := <-sub.Channel: if received.ID != testEvent.ID { t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("Event not received within timeout") } } The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication. ...

    July 17, 2024 · 9 min · Rafiul Alam

    Go Concurrency Pattern: Pipeline

    Go Concurrency Patterns Series: ← Select Statement | Series Overview | Fan-Out/Fan-In → What is the Pipeline Pattern? The Pipeline pattern is a powerful way to structure concurrent data processing by breaking work into stages connected by channels. Each stage runs in its own goroutine, receives data from an input channel, processes it, and sends results to an output channel. This creates a chain of processing stages that can run concurrently, dramatically improving throughput. ...

    July 3, 2024 · 15 min · Rafiul Alam

    Fan-Out/Fan-In Pattern in Go

    Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern → What is the Fan-Out/Fan-In Pattern? The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently. Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination ...

    June 26, 2024 · 8 min · Rafiul Alam

    Go Concurrency Pattern: Select Statement

    Go Concurrency Patterns Series: ← Channel Fundamentals | Series Overview | Pipeline Pattern → What is the Select Statement? The select statement is Go’s powerful tool for handling multiple channel operations simultaneously. It’s like a switch statement, but for channels - it allows a goroutine to wait on multiple communication operations and proceed with whichever one becomes ready first. Think of select as a traffic controller at an intersection, managing multiple lanes of traffic (channels) and allowing the first available lane to proceed. This enables non-blocking communication, timeouts, and elegant multiplexing patterns that are essential for robust concurrent programs. ...

    June 26, 2024 · 12 min · Rafiul Alam

    Go Concurrency Pattern: Channel Fundamentals

    Go Concurrency Patterns Series: ← Goroutine Basics | Series Overview | Select Statement → What are Channels? Channels are Go’s primary mechanism for communication between goroutines. They embody Go’s concurrency philosophy: “Don’t communicate by sharing memory; share memory by communicating.” Think of channels as typed pipes that allow goroutines to safely pass data back and forth. Channels provide both communication and synchronization, making them incredibly powerful for building concurrent applications. They’re type-safe, can be buffered or unbuffered, and support directional constraints for better API design. ...

    June 19, 2024 · 12 min · Rafiul Alam