What is Observer Pattern?

The Observer pattern is a behavioral design pattern that defines a one-to-many dependency between objects. When one object (the subject) changes state, all its dependents (observers) are notified and updated automatically. Think of it like a newsletter subscription - when new content is published, all subscribers get notified.

I’ll demonstrate this pattern with a practical scenario that shows how powerful it can be in Go applications.

Let’s start with a scenario: Stock Price Monitoring

Imagine you’re building a stock trading platform where multiple components need to react when stock prices change: mobile app notifications, email alerts, trading bots, and analytics systems. Instead of tightly coupling all these components, we can use the Observer pattern.

Basic Observer Implementation

Let’s start with a simple implementation:

package main

import (
    "fmt"
    "sync"
)

// Observer interface - all observers must implement this
type Observer interface {
    Update(stock string, price float64)
}

// Subject interface - the thing being observed
type Subject interface {
    Subscribe(observer Observer)
    Unsubscribe(observer Observer)
    Notify(stock string, price float64)
}

// StockPriceMonitor - our concrete subject
type StockPriceMonitor struct {
    observers []Observer
    mu        sync.RWMutex
}

func NewStockPriceMonitor() *StockPriceMonitor {
    return &StockPriceMonitor{
        observers: make([]Observer, 0),
    }
}

func (s *StockPriceMonitor) Subscribe(observer Observer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.observers = append(s.observers, observer)
}

func (s *StockPriceMonitor) Unsubscribe(observer Observer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    for i, obs := range s.observers {
        if obs == observer {
            s.observers = append(s.observers[:i], s.observers[i+1:]...)
            break
        }
    }
}

func (s *StockPriceMonitor) Notify(stock string, price float64) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    for _, observer := range s.observers {
        go observer.Update(stock, price) // Async notification
    }
}

func (s *StockPriceMonitor) UpdatePrice(stock string, price float64) {
    fmt.Printf("Stock %s price updated to $%.2f\n", stock, price)
    s.Notify(stock, price)
}

// Concrete Observers
type MobileNotifier struct {
    UserID string
}

func (m *MobileNotifier) Update(stock string, price float64) {
    fmt.Printf("📱 Mobile notification to user %s: %s is now $%.2f\n", 
        m.UserID, stock, price)
}

type EmailNotifier struct {
    Email string
}

func (e *EmailNotifier) Update(stock string, price float64) {
    fmt.Printf("📧 Email to %s: %s price alert - $%.2f\n", 
        e.Email, stock, price)
}

type TradingBot struct {
    BotName string
}

func (t *TradingBot) Update(stock string, price float64) {
    fmt.Printf("🤖 Trading bot %s: Analyzing %s at $%.2f\n", 
        t.BotName, stock, price)
}

func main() {
    // Create the subject
    monitor := NewStockPriceMonitor()
    
    // Create observers
    mobileNotifier := &MobileNotifier{UserID: "user123"}
    emailNotifier := &EmailNotifier{Email: "[email protected]"}
    tradingBot := &TradingBot{BotName: "AlgoBot-1"}
    
    // Subscribe observers
    monitor.Subscribe(mobileNotifier)
    monitor.Subscribe(emailNotifier)
    monitor.Subscribe(tradingBot)
    
    // Simulate price updates
    monitor.UpdatePrice("AAPL", 150.25)
    monitor.UpdatePrice("GOOGL", 2800.50)
    
    // Unsubscribe an observer
    monitor.Unsubscribe(emailNotifier)
    
    monitor.UpdatePrice("TSLA", 800.75)
}

Advanced Observer with Event Types

Let’s create a more sophisticated version that handles different types of events:

package main

import (
    "fmt"
    "sync"
    "time"
)

// Event types
type EventType string

const (
    PriceUpdate EventType = "price_update"
    VolumeSpike EventType = "volume_spike"
    NewsAlert   EventType = "news_alert"
)

// Event structure
type Event struct {
    Type      EventType
    Stock     string
    Data      map[string]interface{}
    Timestamp time.Time
}

// Enhanced Observer interface
type EventObserver interface {
    OnEvent(event Event)
    GetInterestedEvents() []EventType
}

// Enhanced Subject
type EventSubject interface {
    Subscribe(observer EventObserver)
    Unsubscribe(observer EventObserver)
    Publish(event Event)
}

// StockEventBus - our enhanced subject
type StockEventBus struct {
    observers map[EventType][]EventObserver
    mu        sync.RWMutex
}

func NewStockEventBus() *StockEventBus {
    return &StockEventBus{
        observers: make(map[EventType][]EventObserver),
    }
}

func (s *StockEventBus) Subscribe(observer EventObserver) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    for _, eventType := range observer.GetInterestedEvents() {
        s.observers[eventType] = append(s.observers[eventType], observer)
    }
}

func (s *StockEventBus) Unsubscribe(observer EventObserver) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    for eventType, observers := range s.observers {
        for i, obs := range observers {
            if obs == observer {
                s.observers[eventType] = append(observers[:i], observers[i+1:]...)
                break
            }
        }
    }
}

func (s *StockEventBus) Publish(event Event) {
    s.mu.RLock()
    observers := s.observers[event.Type]
    s.mu.RUnlock()
    
    for _, observer := range observers {
        go observer.OnEvent(event) // Async processing
    }
}

// Concrete Observers with specific interests
type PriceAlertService struct {
    Name string
}

func (p *PriceAlertService) OnEvent(event Event) {
    if event.Type == PriceUpdate {
        price := event.Data["price"].(float64)
        fmt.Printf("🚨 Price Alert Service: %s reached $%.2f\n", event.Stock, price)
    }
}

func (p *PriceAlertService) GetInterestedEvents() []EventType {
    return []EventType{PriceUpdate}
}

type NewsAnalyzer struct {
    Name string
}

func (n *NewsAnalyzer) OnEvent(event Event) {
    switch event.Type {
    case NewsAlert:
        headline := event.Data["headline"].(string)
        fmt.Printf("📰 News Analyzer: Processing news for %s - %s\n", 
            event.Stock, headline)
    case VolumeSpike:
        volume := event.Data["volume"].(int64)
        fmt.Printf("📊 News Analyzer: Volume spike detected for %s - %d shares\n", 
            event.Stock, volume)
    }
}

func (n *NewsAnalyzer) GetInterestedEvents() []EventType {
    return []EventType{NewsAlert, VolumeSpike}
}

type RiskManager struct {
    Name string
}

func (r *RiskManager) OnEvent(event Event) {
    switch event.Type {
    case PriceUpdate:
        price := event.Data["price"].(float64)
        change := event.Data["change"].(float64)
        if change > 5.0 || change < -5.0 {
            fmt.Printf("⚠️  Risk Manager: High volatility alert for %s - %.2f%% change\n", 
                event.Stock, change)
        }
    case VolumeSpike:
        fmt.Printf("⚠️  Risk Manager: Unusual trading activity detected for %s\n", 
            event.Stock)
    }
}

func (r *RiskManager) GetInterestedEvents() []EventType {
    return []EventType{PriceUpdate, VolumeSpike}
}

func main() {
    // Create event bus
    eventBus := NewStockEventBus()
    
    // Create observers
    priceAlert := &PriceAlertService{Name: "PriceAlert"}
    newsAnalyzer := &NewsAnalyzer{Name: "NewsBot"}
    riskManager := &RiskManager{Name: "RiskBot"}
    
    // Subscribe observers
    eventBus.Subscribe(priceAlert)
    eventBus.Subscribe(newsAnalyzer)
    eventBus.Subscribe(riskManager)
    
    // Simulate events
    eventBus.Publish(Event{
        Type:      PriceUpdate,
        Stock:     "AAPL",
        Data:      map[string]interface{}{"price": 155.50, "change": 3.2},
        Timestamp: time.Now(),
    })
    
    eventBus.Publish(Event{
        Type:      VolumeSpike,
        Stock:     "TSLA",
        Data:      map[string]interface{}{"volume": int64(50000000)},
        Timestamp: time.Now(),
    })
    
    eventBus.Publish(Event{
        Type:      NewsAlert,
        Stock:     "GOOGL",
        Data:      map[string]interface{}{"headline": "Google announces new AI breakthrough"},
        Timestamp: time.Now(),
    })
    
    // Give goroutines time to process
    time.Sleep(100 * time.Millisecond)
}

Channel-based Observer (Go Idiomatic Way)

Here’s a more Go-idiomatic approach using channels:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type StockUpdate struct {
    Symbol string
    Price  float64
    Volume int64
}

type ChannelObserver struct {
    ID      string
    Updates chan StockUpdate
    ctx     context.Context
    cancel  context.CancelFunc
}

func NewChannelObserver(id string, bufferSize int) *ChannelObserver {
    ctx, cancel := context.WithCancel(context.Background())
    return &ChannelObserver{
        ID:      id,
        Updates: make(chan StockUpdate, bufferSize),
        ctx:     ctx,
        cancel:  cancel,
    }
}

func (o *ChannelObserver) Start(handler func(StockUpdate)) {
    go func() {
        for {
            select {
            case update := <-o.Updates:
                handler(update)
            case <-o.ctx.Done():
                return
            }
        }
    }()
}

func (o *ChannelObserver) Stop() {
    o.cancel()
    close(o.Updates)
}

type ChannelSubject struct {
    observers map[string]*ChannelObserver
    mu        sync.RWMutex
}

func NewChannelSubject() *ChannelSubject {
    return &ChannelSubject{
        observers: make(map[string]*ChannelObserver),
    }
}

func (s *ChannelSubject) Subscribe(observer *ChannelObserver) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.observers[observer.ID] = observer
}

func (s *ChannelSubject) Unsubscribe(observerID string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if observer, exists := s.observers[observerID]; exists {
        observer.Stop()
        delete(s.observers, observerID)
    }
}

func (s *ChannelSubject) Broadcast(update StockUpdate) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for _, observer := range s.observers {
        select {
        case observer.Updates <- update:
        default:
            // Channel is full, skip this observer
            fmt.Printf("Warning: Observer %s channel is full\n", observer.ID)
        }
    }
}

func main() {
    subject := NewChannelSubject()
    
    // Create observers
    priceTracker := NewChannelObserver("price-tracker", 10)
    alertSystem := NewChannelObserver("alert-system", 5)
    
    // Start observers with their handlers
    priceTracker.Start(func(update StockUpdate) {
        fmt.Printf("📊 Price Tracker: %s = $%.2f (Volume: %d)\n", 
            update.Symbol, update.Price, update.Volume)
    })
    
    alertSystem.Start(func(update StockUpdate) {
        if update.Price > 1000 {
            fmt.Printf("🚨 Alert: %s price is high: $%.2f\n", 
                update.Symbol, update.Price)
        }
    })
    
    // Subscribe observers
    subject.Subscribe(priceTracker)
    subject.Subscribe(alertSystem)
    
    // Simulate stock updates
    updates := []StockUpdate{
        {"AAPL", 150.25, 1000000},
        {"GOOGL", 2800.50, 500000},
        {"TSLA", 1200.75, 2000000},
    }
    
    for _, update := range updates {
        subject.Broadcast(update)
        time.Sleep(100 * time.Millisecond)
    }
    
    // Cleanup
    subject.Unsubscribe("price-tracker")
    subject.Unsubscribe("alert-system")
}

Real-world Use Cases

I frequently use the Observer pattern in these scenarios:

  1. Event-driven Microservices: Services subscribe to domain events
  2. Real-time Dashboards: UI components react to data changes
  3. Audit Logging: Log all system changes automatically
  4. Cache Invalidation: Clear caches when data changes
  5. Notification Systems: Multiple channels (email, SMS, push) for alerts

Benefits of Observer Pattern

  1. Loose Coupling: Subjects and observers are independent
  2. Dynamic Relationships: Add/remove observers at runtime
  3. Broadcast Communication: One-to-many notifications
  4. Open/Closed Principle: Easy to add new observers without changing subjects

Caveats

Be aware of these potential issues:

  1. Memory Leaks: Forgotten observers can cause memory leaks
  2. Performance: Too many observers can slow down notifications
  3. Ordering: No guarantee of notification order
  4. Error Handling: One observer’s error shouldn’t affect others

Thank you

The Observer pattern is incredibly useful for building reactive systems in Go. Combined with goroutines and channels, it becomes even more powerful for concurrent applications. Please drop an email at [email protected] if you would like to share any feedback or suggestions. Peace!