What is Observer Pattern?
The Observer pattern is a behavioral design pattern that defines a one-to-many dependency between objects. When one object (the subject) changes state, all its dependents (observers) are notified and updated automatically. Think of it like a newsletter subscription - when new content is published, all subscribers get notified.
I’ll demonstrate this pattern with a practical scenario that shows how powerful it can be in Go applications.
Let’s start with a scenario: Stock Price Monitoring
Imagine you’re building a stock trading platform where multiple components need to react when stock prices change: mobile app notifications, email alerts, trading bots, and analytics systems. Instead of tightly coupling all these components, we can use the Observer pattern.
Basic Observer Implementation
Let’s start with a simple implementation:
package main
import (
"fmt"
"sync"
)
// Observer interface - all observers must implement this
type Observer interface {
Update(stock string, price float64)
}
// Subject interface - the thing being observed
type Subject interface {
Subscribe(observer Observer)
Unsubscribe(observer Observer)
Notify(stock string, price float64)
}
// StockPriceMonitor - our concrete subject
type StockPriceMonitor struct {
observers []Observer
mu sync.RWMutex
}
func NewStockPriceMonitor() *StockPriceMonitor {
return &StockPriceMonitor{
observers: make([]Observer, 0),
}
}
func (s *StockPriceMonitor) Subscribe(observer Observer) {
s.mu.Lock()
defer s.mu.Unlock()
s.observers = append(s.observers, observer)
}
func (s *StockPriceMonitor) Unsubscribe(observer Observer) {
s.mu.Lock()
defer s.mu.Unlock()
for i, obs := range s.observers {
if obs == observer {
s.observers = append(s.observers[:i], s.observers[i+1:]...)
break
}
}
}
func (s *StockPriceMonitor) Notify(stock string, price float64) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, observer := range s.observers {
go observer.Update(stock, price) // Async notification
}
}
func (s *StockPriceMonitor) UpdatePrice(stock string, price float64) {
fmt.Printf("Stock %s price updated to $%.2f\n", stock, price)
s.Notify(stock, price)
}
// Concrete Observers
type MobileNotifier struct {
UserID string
}
func (m *MobileNotifier) Update(stock string, price float64) {
fmt.Printf("📱 Mobile notification to user %s: %s is now $%.2f\n",
m.UserID, stock, price)
}
type EmailNotifier struct {
Email string
}
func (e *EmailNotifier) Update(stock string, price float64) {
fmt.Printf("📧 Email to %s: %s price alert - $%.2f\n",
e.Email, stock, price)
}
type TradingBot struct {
BotName string
}
func (t *TradingBot) Update(stock string, price float64) {
fmt.Printf("🤖 Trading bot %s: Analyzing %s at $%.2f\n",
t.BotName, stock, price)
}
func main() {
// Create the subject
monitor := NewStockPriceMonitor()
// Create observers
mobileNotifier := &MobileNotifier{UserID: "user123"}
emailNotifier := &EmailNotifier{Email: "[email protected]"}
tradingBot := &TradingBot{BotName: "AlgoBot-1"}
// Subscribe observers
monitor.Subscribe(mobileNotifier)
monitor.Subscribe(emailNotifier)
monitor.Subscribe(tradingBot)
// Simulate price updates
monitor.UpdatePrice("AAPL", 150.25)
monitor.UpdatePrice("GOOGL", 2800.50)
// Unsubscribe an observer
monitor.Unsubscribe(emailNotifier)
monitor.UpdatePrice("TSLA", 800.75)
}
Advanced Observer with Event Types
Let’s create a more sophisticated version that handles different types of events:
package main
import (
"fmt"
"sync"
"time"
)
// Event types
type EventType string
const (
PriceUpdate EventType = "price_update"
VolumeSpike EventType = "volume_spike"
NewsAlert EventType = "news_alert"
)
// Event structure
type Event struct {
Type EventType
Stock string
Data map[string]interface{}
Timestamp time.Time
}
// Enhanced Observer interface
type EventObserver interface {
OnEvent(event Event)
GetInterestedEvents() []EventType
}
// Enhanced Subject
type EventSubject interface {
Subscribe(observer EventObserver)
Unsubscribe(observer EventObserver)
Publish(event Event)
}
// StockEventBus - our enhanced subject
type StockEventBus struct {
observers map[EventType][]EventObserver
mu sync.RWMutex
}
func NewStockEventBus() *StockEventBus {
return &StockEventBus{
observers: make(map[EventType][]EventObserver),
}
}
func (s *StockEventBus) Subscribe(observer EventObserver) {
s.mu.Lock()
defer s.mu.Unlock()
for _, eventType := range observer.GetInterestedEvents() {
s.observers[eventType] = append(s.observers[eventType], observer)
}
}
func (s *StockEventBus) Unsubscribe(observer EventObserver) {
s.mu.Lock()
defer s.mu.Unlock()
for eventType, observers := range s.observers {
for i, obs := range observers {
if obs == observer {
s.observers[eventType] = append(observers[:i], observers[i+1:]...)
break
}
}
}
}
func (s *StockEventBus) Publish(event Event) {
s.mu.RLock()
observers := s.observers[event.Type]
s.mu.RUnlock()
for _, observer := range observers {
go observer.OnEvent(event) // Async processing
}
}
// Concrete Observers with specific interests
type PriceAlertService struct {
Name string
}
func (p *PriceAlertService) OnEvent(event Event) {
if event.Type == PriceUpdate {
price := event.Data["price"].(float64)
fmt.Printf("🚨 Price Alert Service: %s reached $%.2f\n", event.Stock, price)
}
}
func (p *PriceAlertService) GetInterestedEvents() []EventType {
return []EventType{PriceUpdate}
}
type NewsAnalyzer struct {
Name string
}
func (n *NewsAnalyzer) OnEvent(event Event) {
switch event.Type {
case NewsAlert:
headline := event.Data["headline"].(string)
fmt.Printf("📰 News Analyzer: Processing news for %s - %s\n",
event.Stock, headline)
case VolumeSpike:
volume := event.Data["volume"].(int64)
fmt.Printf("📊 News Analyzer: Volume spike detected for %s - %d shares\n",
event.Stock, volume)
}
}
func (n *NewsAnalyzer) GetInterestedEvents() []EventType {
return []EventType{NewsAlert, VolumeSpike}
}
type RiskManager struct {
Name string
}
func (r *RiskManager) OnEvent(event Event) {
switch event.Type {
case PriceUpdate:
price := event.Data["price"].(float64)
change := event.Data["change"].(float64)
if change > 5.0 || change < -5.0 {
fmt.Printf("⚠️ Risk Manager: High volatility alert for %s - %.2f%% change\n",
event.Stock, change)
}
case VolumeSpike:
fmt.Printf("⚠️ Risk Manager: Unusual trading activity detected for %s\n",
event.Stock)
}
}
func (r *RiskManager) GetInterestedEvents() []EventType {
return []EventType{PriceUpdate, VolumeSpike}
}
func main() {
// Create event bus
eventBus := NewStockEventBus()
// Create observers
priceAlert := &PriceAlertService{Name: "PriceAlert"}
newsAnalyzer := &NewsAnalyzer{Name: "NewsBot"}
riskManager := &RiskManager{Name: "RiskBot"}
// Subscribe observers
eventBus.Subscribe(priceAlert)
eventBus.Subscribe(newsAnalyzer)
eventBus.Subscribe(riskManager)
// Simulate events
eventBus.Publish(Event{
Type: PriceUpdate,
Stock: "AAPL",
Data: map[string]interface{}{"price": 155.50, "change": 3.2},
Timestamp: time.Now(),
})
eventBus.Publish(Event{
Type: VolumeSpike,
Stock: "TSLA",
Data: map[string]interface{}{"volume": int64(50000000)},
Timestamp: time.Now(),
})
eventBus.Publish(Event{
Type: NewsAlert,
Stock: "GOOGL",
Data: map[string]interface{}{"headline": "Google announces new AI breakthrough"},
Timestamp: time.Now(),
})
// Give goroutines time to process
time.Sleep(100 * time.Millisecond)
}
Channel-based Observer (Go Idiomatic Way)
Here’s a more Go-idiomatic approach using channels:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type StockUpdate struct {
Symbol string
Price float64
Volume int64
}
type ChannelObserver struct {
ID string
Updates chan StockUpdate
ctx context.Context
cancel context.CancelFunc
}
func NewChannelObserver(id string, bufferSize int) *ChannelObserver {
ctx, cancel := context.WithCancel(context.Background())
return &ChannelObserver{
ID: id,
Updates: make(chan StockUpdate, bufferSize),
ctx: ctx,
cancel: cancel,
}
}
func (o *ChannelObserver) Start(handler func(StockUpdate)) {
go func() {
for {
select {
case update := <-o.Updates:
handler(update)
case <-o.ctx.Done():
return
}
}
}()
}
func (o *ChannelObserver) Stop() {
o.cancel()
close(o.Updates)
}
type ChannelSubject struct {
observers map[string]*ChannelObserver
mu sync.RWMutex
}
func NewChannelSubject() *ChannelSubject {
return &ChannelSubject{
observers: make(map[string]*ChannelObserver),
}
}
func (s *ChannelSubject) Subscribe(observer *ChannelObserver) {
s.mu.Lock()
defer s.mu.Unlock()
s.observers[observer.ID] = observer
}
func (s *ChannelSubject) Unsubscribe(observerID string) {
s.mu.Lock()
defer s.mu.Unlock()
if observer, exists := s.observers[observerID]; exists {
observer.Stop()
delete(s.observers, observerID)
}
}
func (s *ChannelSubject) Broadcast(update StockUpdate) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, observer := range s.observers {
select {
case observer.Updates <- update:
default:
// Channel is full, skip this observer
fmt.Printf("Warning: Observer %s channel is full\n", observer.ID)
}
}
}
func main() {
subject := NewChannelSubject()
// Create observers
priceTracker := NewChannelObserver("price-tracker", 10)
alertSystem := NewChannelObserver("alert-system", 5)
// Start observers with their handlers
priceTracker.Start(func(update StockUpdate) {
fmt.Printf("📊 Price Tracker: %s = $%.2f (Volume: %d)\n",
update.Symbol, update.Price, update.Volume)
})
alertSystem.Start(func(update StockUpdate) {
if update.Price > 1000 {
fmt.Printf("🚨 Alert: %s price is high: $%.2f\n",
update.Symbol, update.Price)
}
})
// Subscribe observers
subject.Subscribe(priceTracker)
subject.Subscribe(alertSystem)
// Simulate stock updates
updates := []StockUpdate{
{"AAPL", 150.25, 1000000},
{"GOOGL", 2800.50, 500000},
{"TSLA", 1200.75, 2000000},
}
for _, update := range updates {
subject.Broadcast(update)
time.Sleep(100 * time.Millisecond)
}
// Cleanup
subject.Unsubscribe("price-tracker")
subject.Unsubscribe("alert-system")
}
Real-world Use Cases
I frequently use the Observer pattern in these scenarios:
- Event-driven Microservices: Services subscribe to domain events
- Real-time Dashboards: UI components react to data changes
- Audit Logging: Log all system changes automatically
- Cache Invalidation: Clear caches when data changes
- Notification Systems: Multiple channels (email, SMS, push) for alerts
Benefits of Observer Pattern
- Loose Coupling: Subjects and observers are independent
- Dynamic Relationships: Add/remove observers at runtime
- Broadcast Communication: One-to-many notifications
- Open/Closed Principle: Easy to add new observers without changing subjects
Caveats
Be aware of these potential issues:
- Memory Leaks: Forgotten observers can cause memory leaks
- Performance: Too many observers can slow down notifications
- Ordering: No guarantee of notification order
- Error Handling: One observer’s error shouldn’t affect others
Thank you
The Observer pattern is incredibly useful for building reactive systems in Go. Combined with goroutines and channels, it becomes even more powerful for concurrent applications. Please drop an email at [email protected] if you would like to share any feedback or suggestions. Peace!