Pub/Sub Pattern in Go

    Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response → What is the Pub/Sub Pattern? The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies. Key Components: Publisher: Sends messages/events Subscriber: Receives and processes messages/events Message Broker: Routes messages from publishers to subscribers Topics/Channels: Categories for organizing messages Real-World Use Cases Event-Driven Architecture: Microservices communication Real-Time Notifications: User activity feeds, alerts Data Streaming: Log aggregation, metrics collection UI Updates: React to state changes across components Workflow Orchestration: Trigger actions based on events Cache Invalidation: Notify when data changes Basic Pub/Sub Implementation package main import ( "fmt" "sync" "time" ) // Message represents a pub/sub message type Message struct { Topic string Payload interface{} } // Subscriber represents a message handler type Subscriber func(Message) // PubSub is a simple in-memory pub/sub system type PubSub struct { mu sync.RWMutex subscribers map[string][]Subscriber closed bool } // NewPubSub creates a new pub/sub instance func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]Subscriber), } } // Subscribe adds a subscriber to a topic func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) { ps.mu.Lock() defer ps.mu.Unlock() if ps.closed { return } ps.subscribers[topic] = append(ps.subscribers[topic], subscriber) } // Publish sends a message to all subscribers of a topic func (ps *PubSub) Publish(topic string, payload interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } message := Message{ Topic: topic, Payload: payload, } // Send to all subscribers asynchronously for _, subscriber := range ps.subscribers[topic] { go subscriber(message) } } // Close shuts down the pub/sub system func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true } func main() { pubsub := NewPubSub() defer pubsub.Close() // Subscribe to user events pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Email service: Welcome %v!\n", msg.Payload) }) pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Analytics: New user registered: %v\n", msg.Payload) }) pubsub.Subscribe("user.deleted", func(msg Message) { fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload) }) // Publish events pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.deleted", "[email protected]") // Wait for async processing time.Sleep(100 * time.Millisecond) } Advanced Pub/Sub with Channels package main import ( "context" "fmt" "sync" "time" ) // Event represents a structured event type Event struct { ID string Type string Timestamp time.Time Data interface{} } // Subscription represents an active subscription type Subscription struct { ID string Topic string Channel chan Event Filter func(Event) bool cancel context.CancelFunc } // Close cancels the subscription func (s *Subscription) Close() { if s.cancel != nil { s.cancel() } } // EventBus is a channel-based pub/sub system type EventBus struct { mu sync.RWMutex subscriptions map[string][]*Subscription buffer int closed bool } // NewEventBus creates a new event bus func NewEventBus(bufferSize int) *EventBus { return &EventBus{ subscriptions: make(map[string][]*Subscription), buffer: bufferSize, } } // Subscribe creates a new subscription with optional filtering func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription { eb.mu.Lock() defer eb.mu.Unlock() if eb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) subscription := &Subscription{ ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan Event, eb.buffer), Filter: filter, cancel: cancel, } eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription) // Clean up subscription when context is cancelled go func() { <-subCtx.Done() eb.unsubscribe(subscription) close(subscription.Channel) }() return subscription } // unsubscribe removes a subscription func (eb *EventBus) unsubscribe(sub *Subscription) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscriptions[sub.Topic] for i, s := range subs { if s.ID == sub.ID { eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish sends an event to all matching subscribers func (eb *EventBus) Publish(event Event) { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return } event.Timestamp = time.Now() for _, subscription := range eb.subscriptions[event.Type] { // Apply filter if present if subscription.Filter != nil && !subscription.Filter(event) { continue } // Non-blocking send select { case subscription.Channel <- event: default: // Channel is full, could log this fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID) } } } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true // Close all subscriptions for _, subs := range eb.subscriptions { for _, sub := range subs { sub.Close() } } } func main() { ctx := context.Background() eventBus := NewEventBus(10) defer eventBus.Close() // Subscribe to all user events userSub := eventBus.Subscribe(ctx, "user", nil) // Subscribe to only high-priority events prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool { if data, ok := e.Data.(map[string]interface{}); ok { return data["priority"] == "high" } return false }) // Start event processors go func() { for event := range userSub.Channel { fmt.Printf("User processor: %s - %v\n", event.Type, event.Data) } }() go func() { for event := range prioritySub.Channel { fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data) } }() // Publish events eventBus.Publish(Event{ ID: "1", Type: "user", Data: map[string]interface{}{ "action": "login", "user": "john", "priority": "low", }, }) eventBus.Publish(Event{ ID: "2", Type: "user", Data: map[string]interface{}{ "action": "payment", "user": "jane", "priority": "high", }, }) time.Sleep(100 * time.Millisecond) } Persistent Pub/Sub with Replay package main import ( "context" "fmt" "sync" "time" ) // StoredEvent represents an event with storage metadata type StoredEvent struct { Event Sequence int64 Stored time.Time } // PersistentEventBus stores events and supports replay type PersistentEventBus struct { mu sync.RWMutex events []StoredEvent sequence int64 subs map[string][]*PersistentSubscription closed bool } // PersistentSubscription supports replay from a specific point type PersistentSubscription struct { ID string Topic string Channel chan StoredEvent FromSeq int64 cancel context.CancelFunc } func (s *PersistentSubscription) Close() { if s.cancel != nil { s.cancel() } } // NewPersistentEventBus creates a new persistent event bus func NewPersistentEventBus() *PersistentEventBus { return &PersistentEventBus{ events: make([]StoredEvent, 0), subs: make(map[string][]*PersistentSubscription), } } // Subscribe creates a subscription with optional replay func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) sub := &PersistentSubscription{ ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan StoredEvent, 100), FromSeq: fromSequence, cancel: cancel, } peb.subs[topic] = append(peb.subs[topic], sub) // Replay historical events if requested if fromSequence >= 0 { go peb.replayEvents(sub) } // Clean up on context cancellation go func() { <-subCtx.Done() peb.unsubscribe(sub) close(sub.Channel) }() return sub } // replayEvents sends historical events to a subscription func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) { peb.mu.RLock() defer peb.mu.RUnlock() for _, storedEvent := range peb.events { if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic { select { case sub.Channel <- storedEvent: default: // Channel full, skip } } } } // unsubscribe removes a subscription func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) { peb.mu.Lock() defer peb.mu.Unlock() subs := peb.subs[sub.Topic] for i, s := range subs { if s.ID == sub.ID { peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish stores and distributes an event func (peb *PersistentEventBus) Publish(event Event) int64 { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return -1 } peb.sequence++ storedEvent := StoredEvent{ Event: event, Sequence: peb.sequence, Stored: time.Now(), } // Store event peb.events = append(peb.events, storedEvent) // Distribute to current subscribers for _, sub := range peb.subs[event.Type] { select { case sub.Channel <- storedEvent: default: // Channel full } } return peb.sequence } // GetLastSequence returns the last event sequence number func (peb *PersistentEventBus) GetLastSequence() int64 { peb.mu.RLock() defer peb.mu.RUnlock() return peb.sequence } func main() { ctx := context.Background() eventBus := NewPersistentEventBus() // Publish some initial events eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"}) eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"}) eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"}) fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence()) // Subscribe from the beginning (replay all events) replaySub := eventBus.Subscribe(ctx, "order", 0) // Subscribe from current point (no replay) liveSub := eventBus.Subscribe(ctx, "order", -1) // Process replayed events go func() { fmt.Println("Replay subscription:") for event := range replaySub.Channel { fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data) } }() // Process live events go func() { fmt.Println("Live subscription:") for event := range liveSub.Channel { fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data) } }() time.Sleep(100 * time.Millisecond) // Publish new events eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"}) eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"}) time.Sleep(100 * time.Millisecond) replaySub.Close() liveSub.Close() } Typed Pub/Sub System package main import ( "context" "fmt" "reflect" "sync" ) // TypedEventBus provides type-safe pub/sub type TypedEventBus struct { mu sync.RWMutex handlers map[reflect.Type][]reflect.Value closed bool } // NewTypedEventBus creates a new typed event bus func NewTypedEventBus() *TypedEventBus { return &TypedEventBus{ handlers: make(map[reflect.Type][]reflect.Value), } } // Subscribe registers a handler for a specific event type func (teb *TypedEventBus) Subscribe(handler interface{}) { teb.mu.Lock() defer teb.mu.Unlock() if teb.closed { return } handlerValue := reflect.ValueOf(handler) handlerType := handlerValue.Type() // Validate handler signature: func(EventType) if handlerType.Kind() != reflect.Func || handlerType.NumIn() != 1 || handlerType.NumOut() != 0 { panic("Handler must be func(EventType)") } eventType := handlerType.In(0) teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue) } // Publish sends an event to all registered handlers func (teb *TypedEventBus) Publish(event interface{}) { teb.mu.RLock() defer teb.mu.RUnlock() if teb.closed { return } eventType := reflect.TypeOf(event) eventValue := reflect.ValueOf(event) for _, handler := range teb.handlers[eventType] { go handler.Call([]reflect.Value{eventValue}) } } // Event types type UserCreated struct { UserID string Email string } type OrderPlaced struct { OrderID string UserID string Amount float64 } type PaymentProcessed struct { PaymentID string OrderID string Success bool } func main() { eventBus := NewTypedEventBus() // Subscribe to different event types eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Email service: Send welcome email to %s\n", event.Email) }) eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Analytics: Track user registration %s\n", event.UserID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", event.OrderID, event.Amount) }) eventBus.Subscribe(func(event PaymentProcessed) { if event.Success { fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID) } else { fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID) } }) // Publish events eventBus.Publish(UserCreated{ UserID: "user123", Email: "[email protected]", }) eventBus.Publish(OrderPlaced{ OrderID: "order456", UserID: "user123", Amount: 99.99, }) eventBus.Publish(PaymentProcessed{ PaymentID: "pay789", OrderID: "order456", Success: true, }) // Wait for async processing time.Sleep(100 * time.Millisecond) } Best Practices Async Processing: Handle events asynchronously to avoid blocking publishers Error Handling: Implement proper error handling in subscribers Buffering: Use buffered channels to handle bursts of events Graceful Shutdown: Ensure clean shutdown of all subscribers Dead Letter Queues: Handle failed message processing Monitoring: Track message rates, processing times, and failures Type Safety: Use typed events when possible Idempotency: Design subscribers to handle duplicate messages Common Pitfalls Memory Leaks: Not closing subscriptions properly Blocking Publishers: Slow subscribers blocking the entire system Lost Messages: Not handling channel buffer overflows Circular Dependencies: Events triggering other events in loops No Error Handling: Panics in subscribers affecting the system Testing Pub/Sub Systems package main import ( "context" "testing" "time" ) func TestEventBus(t *testing.T) { eventBus := NewEventBus(10) defer eventBus.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Subscribe to events sub := eventBus.Subscribe(ctx, "test", nil) // Publish event testEvent := Event{ ID: "test1", Type: "test", Data: "test data", } eventBus.Publish(testEvent) // Verify event received select { case received := <-sub.Channel: if received.ID != testEvent.ID { t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("Event not received within timeout") } } The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication. ...

    July 17, 2024 · 9 min · Rafiul Alam

    Once Pattern in Go

    Go Concurrency Patterns Series: ← WaitGroup Pattern | Series Overview | Context Pattern → What is the Once Pattern? The Once pattern uses sync.Once to ensure that a piece of code executes exactly once, regardless of how many goroutines call it. This is essential for thread-safe initialization, singleton patterns, and one-time setup operations in concurrent programs. Key Characteristics: Thread-safe: Multiple goroutines can call it safely Exactly once: Code executes only on the first call Blocking: Subsequent calls wait for the first execution to complete No return values: The function passed to Do() cannot return values Real-World Use Cases Singleton Initialization: Create single instances of objects Configuration Loading: Load config files once at startup Database Connections: Initialize connection pools Logger Setup: Configure logging systems Resource Initialization: Set up expensive resources Feature Flags: Initialize feature flag systems Basic Once Usage package main import ( "fmt" "sync" "time" ) var ( instance *Database once sync.Once ) // Database represents a database connection type Database struct { ConnectionString string IsConnected bool } // Connect simulates database connection func (db *Database) Connect() { fmt.Println("Connecting to database...") time.Sleep(100 * time.Millisecond) // Simulate connection time db.IsConnected = true fmt.Println("Database connected!") } // GetDatabase returns the singleton database instance func GetDatabase() *Database { once.Do(func() { fmt.Println("Initializing database instance...") instance = &Database{ ConnectionString: "localhost:5432", } instance.Connect() }) return instance } func main() { var wg sync.WaitGroup // Multiple goroutines trying to get database instance for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Goroutine %d requesting database\n", id) db := GetDatabase() fmt.Printf("Goroutine %d got database: %+v\n", id, db) }(i) } wg.Wait() // Verify all goroutines got the same instance fmt.Printf("Final instance: %p\n", GetDatabase()) } Configuration Manager with Once package main import ( "encoding/json" "fmt" "os" "sync" ) // Config represents application configuration type Config struct { DatabaseURL string `json:"database_url"` APIKey string `json:"api_key"` Debug bool `json:"debug"` Port int `json:"port"` } // ConfigManager manages application configuration type ConfigManager struct { config *Config once sync.Once err error } // NewConfigManager creates a new config manager func NewConfigManager() *ConfigManager { return &ConfigManager{} } // loadConfig loads configuration from file func (cm *ConfigManager) loadConfig() { fmt.Println("Loading configuration...") // Simulate config file reading configData := `{ "database_url": "postgres://localhost:5432/myapp", "api_key": "secret-api-key-123", "debug": true, "port": 8080 }` var config Config if err := json.Unmarshal([]byte(configData), &config); err != nil { cm.err = fmt.Errorf("failed to parse config: %w", err) return } cm.config = &config fmt.Println("Configuration loaded successfully!") } // GetConfig returns the configuration, loading it once if needed func (cm *ConfigManager) GetConfig() (*Config, error) { cm.once.Do(cm.loadConfig) return cm.config, cm.err } func main() { configManager := NewConfigManager() var wg sync.WaitGroup // Multiple goroutines accessing configuration for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() config, err := configManager.GetConfig() if err != nil { fmt.Printf("Goroutine %d: Error loading config: %v\n", id, err) return } fmt.Printf("Goroutine %d: Port=%d, Debug=%v\n", id, config.Port, config.Debug) }(i) } wg.Wait() } Logger Initialization with Once package main import ( "fmt" "log" "os" "sync" ) // Logger wraps the standard logger with additional functionality type Logger struct { *log.Logger level string } var ( logger *Logger loggerOnce sync.Once ) // initLogger initializes the global logger func initLogger() { fmt.Println("Initializing logger...") // Create log file file, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalln("Failed to open log file:", err) } logger = &Logger{ Logger: log.New(file, "APP: ", log.Ldate|log.Ltime|log.Lshortfile), level: "INFO", } logger.Println("Logger initialized") fmt.Println("Logger setup complete!") } // GetLogger returns the singleton logger instance func GetLogger() *Logger { loggerOnce.Do(initLogger) return logger } // Info logs an info message func (l *Logger) Info(msg string) { l.Printf("[INFO] %s", msg) } // Error logs an error message func (l *Logger) Error(msg string) { l.Printf("[ERROR] %s", msg) } func main() { var wg sync.WaitGroup // Multiple goroutines using the logger for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() logger := GetLogger() logger.Info(fmt.Sprintf("Message from goroutine %d", id)) if id%2 == 0 { logger.Error(fmt.Sprintf("Error from goroutine %d", id)) } }(i) } wg.Wait() // Clean up if logger != nil { logger.Info("Application shutting down") } } Resource Pool Initialization package main import ( "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int Connected bool } // Connect simulates connecting to database func (c *Connection) Connect() error { time.Sleep(50 * time.Millisecond) // Simulate connection time c.Connected = true return nil } // Close simulates closing the connection func (c *Connection) Close() error { c.Connected = false return nil } // ConnectionPool manages a pool of database connections type ConnectionPool struct { connections []*Connection available chan *Connection once sync.Once initErr error } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { return &ConnectionPool{ available: make(chan *Connection, size), } } // initialize sets up the connection pool func (cp *ConnectionPool) initialize() { fmt.Println("Initializing connection pool...") poolSize := cap(cp.available) cp.connections = make([]*Connection, poolSize) // Create and connect all connections for i := 0; i < poolSize; i++ { conn := &Connection{ID: i + 1} if err := conn.Connect(); err != nil { cp.initErr = fmt.Errorf("failed to connect connection %d: %w", i+1, err) return } cp.connections[i] = conn cp.available <- conn } fmt.Printf("Connection pool initialized with %d connections\n", poolSize) } // GetConnection gets a connection from the pool func (cp *ConnectionPool) GetConnection() (*Connection, error) { cp.once.Do(cp.initialize) if cp.initErr != nil { return nil, cp.initErr } select { case conn := <-cp.available: return conn, nil case <-time.After(5 * time.Second): return nil, fmt.Errorf("timeout waiting for connection") } } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { select { case cp.available <- conn: default: // Pool is full, close the connection conn.Close() } } // Close closes all connections in the pool func (cp *ConnectionPool) Close() error { close(cp.available) for _, conn := range cp.connections { if conn != nil { conn.Close() } } return nil } func main() { pool := NewConnectionPool(3) defer pool.Close() var wg sync.WaitGroup // Multiple goroutines using the connection pool for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, err := pool.GetConnection() if err != nil { fmt.Printf("Worker %d: Failed to get connection: %v\n", id, err) return } fmt.Printf("Worker %d: Got connection %d\n", id, conn.ID) // Simulate work time.Sleep(200 * time.Millisecond) pool.ReturnConnection(conn) fmt.Printf("Worker %d: Returned connection %d\n", id, conn.ID) }(i) } wg.Wait() } Advanced Once Patterns 1. Once with Error Handling package main import ( "fmt" "sync" ) // OnceWithError provides Once functionality with error handling type OnceWithError struct { once sync.Once err error } // Do executes the function once and stores any error func (o *OnceWithError) Do(f func() error) error { o.once.Do(func() { o.err = f() }) return o.err } // ExpensiveResource represents a resource that's expensive to initialize type ExpensiveResource struct { Data string } var ( resource *ExpensiveResource resourceOnce OnceWithError ) // initResource initializes the expensive resource func initResource() error { fmt.Println("Initializing expensive resource...") // Simulate potential failure if false { // Change to true to simulate error return fmt.Errorf("failed to initialize resource") } resource = &ExpensiveResource{ Data: "Important data", } fmt.Println("Resource initialized successfully!") return nil } // GetResource returns the resource, initializing it once if needed func GetResource() (*ExpensiveResource, error) { err := resourceOnce.Do(initResource) if err != nil { return nil, err } return resource, nil } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := GetResource() if err != nil { fmt.Printf("Goroutine %d: Error: %v\n", id, err) return } fmt.Printf("Goroutine %d: Got resource: %s\n", id, resource.Data) }(i) } wg.Wait() } 2. Resettable Once package main import ( "fmt" "sync" "sync/atomic" ) // ResettableOnce allows resetting the once behavior type ResettableOnce struct { mu sync.Mutex done uint32 } // Do executes the function once func (ro *ResettableOnce) Do(f func()) { if atomic.LoadUint32(&ro.done) == 0 { ro.doSlow(f) } } func (ro *ResettableOnce) doSlow(f func()) { ro.mu.Lock() defer ro.mu.Unlock() if ro.done == 0 { defer atomic.StoreUint32(&ro.done, 1) f() } } // Reset allows the once to be used again func (ro *ResettableOnce) Reset() { ro.mu.Lock() defer ro.mu.Unlock() atomic.StoreUint32(&ro.done, 0) } // IsDone returns true if the function has been executed func (ro *ResettableOnce) IsDone() bool { return atomic.LoadUint32(&ro.done) == 1 } func main() { var once ResettableOnce counter := 0 task := func() { counter++ fmt.Printf("Task executed, counter: %d\n", counter) } // First round fmt.Println("First round:") for i := 0; i < 3; i++ { once.Do(task) } fmt.Printf("Done: %v\n", once.IsDone()) // Reset and second round fmt.Println("\nAfter reset:") once.Reset() fmt.Printf("Done: %v\n", once.IsDone()) for i := 0; i < 3; i++ { once.Do(task) } } Best Practices Use for Initialization: Perfect for one-time setup operations Keep Functions Simple: The function passed to Do() should be straightforward Handle Errors Separately: Use wrapper types for error handling Avoid Side Effects: Be careful with functions that have external side effects Don’t Nest Once Calls: Avoid calling Do() from within another Do() Consider Alternatives: Use init() for package-level initialization when appropriate Common Pitfalls 1. Expecting Return Values // Bad: Once.Do doesn't support return values var once sync.Once var result string func badExample() string { once.Do(func() { // Can't return from here result = "computed value" }) return result // This works but is not ideal } // Good: Use a wrapper or store results in accessible variables type OnceResult struct { once sync.Once result string err error } func (or *OnceResult) Get() (string, error) { or.once.Do(func() { or.result, or.err = computeValue() }) return or.result, or.err } 2. Panic in Once Function // Bad: Panic prevents future calls var once sync.Once func badOnceFunc() { once.Do(func() { panic("something went wrong") // Once will never execute again }) } // Good: Handle panics appropriately func goodOnceFunc() { once.Do(func() { defer func() { if r := recover(); r != nil { // Handle panic appropriately fmt.Printf("Recovered from panic: %v\n", r) } }() // risky operation }) } Testing Once Patterns package main import ( "sync" "testing" ) func TestOnceExecution(t *testing.T) { var once sync.Once counter := 0 var wg sync.WaitGroup // Start multiple goroutines for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() once.Do(func() { counter++ }) }() } wg.Wait() if counter != 1 { t.Errorf("Expected counter to be 1, got %d", counter) } } func TestOnceWithError(t *testing.T) { var onceErr OnceWithError callCount := 0 // First call with error err1 := onceErr.Do(func() error { callCount++ return fmt.Errorf("test error") }) // Second call should return same error without executing function err2 := onceErr.Do(func() error { callCount++ return nil }) if callCount != 1 { t.Errorf("Expected function to be called once, got %d", callCount) } if err1 == nil || err2 == nil { t.Error("Expected both calls to return error") } if err1.Error() != err2.Error() { t.Error("Expected same error from both calls") } } The Once pattern is essential for thread-safe initialization in Go. It ensures that expensive or critical setup operations happen exactly once, making it perfect for singletons, configuration loading, and resource initialization in concurrent applications. ...

    July 10, 2024 · 9 min · Rafiul Alam

    Go Concurrency Pattern: Pipeline

    Go Concurrency Patterns Series: ← Select Statement | Series Overview | Fan-Out/Fan-In → What is the Pipeline Pattern? The Pipeline pattern is a powerful way to structure concurrent data processing by breaking work into stages connected by channels. Each stage runs in its own goroutine, receives data from an input channel, processes it, and sends results to an output channel. This creates a chain of processing stages that can run concurrently, dramatically improving throughput. ...

    July 3, 2024 · 15 min · Rafiul Alam

    Mutex Patterns in Go

    Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern → What are Mutex Patterns? Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions. Key Types: Mutex: Exclusive access (one goroutine at a time) RWMutex: Reader-writer locks (multiple readers OR one writer) Lock-free: Atomic operations without explicit locks Real-World Use Cases Shared Counters: Statistics, metrics, rate limiting Cache Systems: Thread-safe caching with read/write operations Configuration Management: Safe updates to application config Connection Pools: Managing database/HTTP connection pools Resource Allocation: Tracking and managing limited resources State Machines: Protecting state transitions Basic Mutex Usage package main import ( "fmt" "sync" "time" ) // Counter demonstrates basic mutex usage type Counter struct { mu sync.Mutex value int } // Increment safely increments the counter func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } // Decrement safely decrements the counter func (c *Counter) Decrement() { c.mu.Lock() defer c.mu.Unlock() c.value-- } // Value safely returns the current value func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Add safely adds a value to the counter func (c *Counter) Add(delta int) { c.mu.Lock() defer c.mu.Unlock() c.value += delta } func main() { counter := &Counter{} var wg sync.WaitGroup // Start multiple goroutines incrementing the counter for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { counter.Increment() } fmt.Printf("Goroutine %d finished\n", id) }(i) } // Start some goroutines decrementing for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { counter.Decrement() } fmt.Printf("Decrementer %d finished\n", id) }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850 } RWMutex for Read-Heavy Workloads package main import ( "fmt" "math/rand" "sync" "time" ) // Cache demonstrates RWMutex usage for read-heavy scenarios type Cache struct { mu sync.RWMutex data map[string]interface{} } // NewCache creates a new cache func NewCache() *Cache { return &Cache{ data: make(map[string]interface{}), } } // Get retrieves a value (read operation) func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() defer c.mu.RUnlock() value, exists := c.data[key] return value, exists } // Set stores a value (write operation) func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() defer c.mu.Unlock() c.data[key] = value } // Delete removes a value (write operation) func (c *Cache) Delete(key string) { c.mu.Lock() defer c.mu.Unlock() delete(c.data, key) } // Keys returns all keys (read operation) func (c *Cache) Keys() []string { c.mu.RLock() defer c.mu.RUnlock() keys := make([]string, 0, len(c.data)) for key := range c.data { keys = append(keys, key) } return keys } // Size returns the number of items (read operation) func (c *Cache) Size() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) } // Clear removes all items (write operation) func (c *Cache) Clear() { c.mu.Lock() defer c.mu.Unlock() c.data = make(map[string]interface{}) } func main() { cache := NewCache() var wg sync.WaitGroup // Writers (fewer, less frequent) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { key := fmt.Sprintf("key-%d-%d", id, j) cache.Set(key, fmt.Sprintf("value-%d-%d", id, j)) time.Sleep(10 * time.Millisecond) } }(i) } // Readers (many, frequent) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { // Try to read random keys key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50)) if value, exists := cache.Get(key); exists { fmt.Printf("Reader %d found %s: %v\n", id, key, value) } time.Sleep(5 * time.Millisecond) } }(i) } // Size checker wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { size := cache.Size() fmt.Printf("Cache size: %d\n", size) time.Sleep(100 * time.Millisecond) } }() wg.Wait() fmt.Printf("Final cache size: %d\n", cache.Size()) } Advanced Mutex Patterns 1. Conditional Variables Pattern package main import ( "fmt" "sync" "time" ) // Buffer demonstrates conditional variables with mutex type Buffer struct { mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond items []interface{} capacity int } // NewBuffer creates a new bounded buffer func NewBuffer(capacity int) *Buffer { b := &Buffer{ items: make([]interface{}, 0, capacity), capacity: capacity, } b.notEmpty = sync.NewCond(&b.mu) b.notFull = sync.NewCond(&b.mu) return b } // Put adds an item to the buffer (blocks if full) func (b *Buffer) Put(item interface{}) { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is full for len(b.items) == b.capacity { b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not empty b.notEmpty.Signal() } // Get removes an item from the buffer (blocks if empty) func (b *Buffer) Get() interface{} { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is empty for len(b.items) == 0 { b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not full b.notFull.Signal() return item } // Size returns current buffer size func (b *Buffer) Size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.items) } func main() { buffer := NewBuffer(3) var wg sync.WaitGroup // Producers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := fmt.Sprintf("item-%d-%d", id, j) buffer.Put(item) time.Sleep(200 * time.Millisecond) } }(i) } // Consumers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := buffer.Get() fmt.Printf("Consumer %d processed: %v\n", id, item) time.Sleep(300 * time.Millisecond) } }(i) } wg.Wait() } 2. Lock-Free Patterns with Atomic Operations package main import ( "fmt" "sync" "sync/atomic" "time" ) // AtomicCounter demonstrates lock-free counter type AtomicCounter struct { value int64 } // Increment atomically increments the counter func (ac *AtomicCounter) Increment() int64 { return atomic.AddInt64(&ac.value, 1) } // Decrement atomically decrements the counter func (ac *AtomicCounter) Decrement() int64 { return atomic.AddInt64(&ac.value, -1) } // Value atomically reads the current value func (ac *AtomicCounter) Value() int64 { return atomic.LoadInt64(&ac.value) } // CompareAndSwap atomically compares and swaps func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&ac.value, old, new) } // AtomicFlag demonstrates atomic boolean operations type AtomicFlag struct { flag int32 } // Set atomically sets the flag to true func (af *AtomicFlag) Set() { atomic.StoreInt32(&af.flag, 1) } // Clear atomically sets the flag to false func (af *AtomicFlag) Clear() { atomic.StoreInt32(&af.flag, 0) } // IsSet atomically checks if flag is set func (af *AtomicFlag) IsSet() bool { return atomic.LoadInt32(&af.flag) == 1 } // TestAndSet atomically tests and sets the flag func (af *AtomicFlag) TestAndSet() bool { return atomic.SwapInt32(&af.flag, 1) == 1 } func main() { counter := &AtomicCounter{} flag := &AtomicFlag{} var wg sync.WaitGroup // Test atomic counter fmt.Println("Testing atomic counter...") for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Test atomic flag fmt.Println("\nTesting atomic flag...") // Multiple goroutines trying to set flag for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() if !flag.TestAndSet() { fmt.Printf("Goroutine %d acquired the flag\n", id) time.Sleep(100 * time.Millisecond) flag.Clear() fmt.Printf("Goroutine %d released the flag\n", id) } else { fmt.Printf("Goroutine %d failed to acquire flag\n", id) } }(i) } wg.Wait() } 3. Resource Pool Pattern package main import ( "errors" "fmt" "sync" "time" ) // Resource represents a limited resource type Resource struct { ID int Data string } // ResourcePool manages a pool of limited resources type ResourcePool struct { mu sync.Mutex resources []*Resource available chan *Resource maxSize int created int } // NewResourcePool creates a new resource pool func NewResourcePool(maxSize int) *ResourcePool { return &ResourcePool{ resources: make([]*Resource, 0, maxSize), available: make(chan *Resource, maxSize), maxSize: maxSize, } } // createResource creates a new resource func (rp *ResourcePool) createResource() *Resource { rp.mu.Lock() defer rp.mu.Unlock() if rp.created >= rp.maxSize { return nil } rp.created++ resource := &Resource{ ID: rp.created, Data: fmt.Sprintf("Resource-%d", rp.created), } fmt.Printf("Created resource %d\n", resource.ID) return resource } // Acquire gets a resource from the pool func (rp *ResourcePool) Acquire() (*Resource, error) { select { case resource := <-rp.available: fmt.Printf("Acquired existing resource %d\n", resource.ID) return resource, nil default: // No available resource, try to create one if resource := rp.createResource(); resource != nil { return resource, nil } // Pool is full, wait for available resource fmt.Println("Pool full, waiting for available resource...") select { case resource := <-rp.available: fmt.Printf("Acquired resource %d after waiting\n", resource.ID) return resource, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout waiting for resource") } } } // Release returns a resource to the pool func (rp *ResourcePool) Release(resource *Resource) { select { case rp.available <- resource: fmt.Printf("Released resource %d\n", resource.ID) default: // Channel full, resource will be garbage collected fmt.Printf("Pool full, discarding resource %d\n", resource.ID) } } // Size returns current pool statistics func (rp *ResourcePool) Size() (available, created int) { rp.mu.Lock() defer rp.mu.Unlock() return len(rp.available), rp.created } func main() { pool := NewResourcePool(3) var wg sync.WaitGroup // Multiple goroutines using resources for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := pool.Acquire() if err != nil { fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err) return } fmt.Printf("Worker %d using resource %d\n", id, resource.ID) // Simulate work time.Sleep(time.Duration(200+id*100) * time.Millisecond) pool.Release(resource) fmt.Printf("Worker %d finished\n", id) }(i) } wg.Wait() available, created := pool.Size() fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created) } Best Practices Always Use defer: Ensure locks are released even if panic occurs Keep Critical Sections Small: Minimize time holding locks Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies Use RWMutex for Read-Heavy: Better performance for read-heavy workloads Consider Lock-Free: Use atomic operations when possible Document Lock Order: If multiple locks needed, establish clear ordering Prefer Channels: Use channels for communication, locks for shared state Common Pitfalls 1. Deadlocks // Bad: Potential deadlock with nested locks type BadAccount struct { mu sync.Mutex balance int } func (a *BadAccount) Transfer(to *BadAccount, amount int) { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() // Potential deadlock if called concurrently defer to.mu.Unlock() a.balance -= amount to.balance += amount } // Good: Ordered locking to prevent deadlock func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) { // Always acquire locks in consistent order first, second := a, to if a.ID > to.ID { first, second = to, a } first.mu.Lock() defer first.mu.Unlock() second.mu.Lock() defer second.mu.Unlock() a.balance -= amount to.balance += amount } 2. Race Conditions // Bad: Race condition type BadCounter struct { mu sync.Mutex value int } func (c *BadCounter) IncrementIfEven() { if c.value%2 == 0 { // Race: value might change between check and increment c.mu.Lock() c.value++ c.mu.Unlock() } } // Good: Atomic check and update func (c *BadCounter) SafeIncrementIfEven() { c.mu.Lock() defer c.mu.Unlock() if c.value%2 == 0 { c.value++ } } Testing Concurrent Code package main import ( "sync" "testing" ) func TestCounter(t *testing.T) { counter := &Counter{} var wg sync.WaitGroup goroutines := 100 increments := 1000 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < increments; j++ { counter.Increment() } }() } wg.Wait() expected := goroutines * increments if counter.Value() != expected { t.Errorf("Expected %d, got %d", expected, counter.Value()) } } // Run with: go test -race func TestCounterRace(t *testing.T) { counter := &Counter{} go func() { for i := 0; i < 1000; i++ { counter.Increment() } }() go func() { for i := 0; i < 1000; i++ { _ = counter.Value() } }() } Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios. ...

    July 3, 2024 · 10 min · Rafiul Alam

    Fan-Out/Fan-In Pattern in Go

    Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern → What is the Fan-Out/Fan-In Pattern? The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently. Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination ...

    June 26, 2024 · 8 min · Rafiul Alam

    Go Concurrency Pattern: Select Statement

    Go Concurrency Patterns Series: ← Channel Fundamentals | Series Overview | Pipeline Pattern → What is the Select Statement? The select statement is Go’s powerful tool for handling multiple channel operations simultaneously. It’s like a switch statement, but for channels - it allows a goroutine to wait on multiple communication operations and proceed with whichever one becomes ready first. Think of select as a traffic controller at an intersection, managing multiple lanes of traffic (channels) and allowing the first available lane to proceed. This enables non-blocking communication, timeouts, and elegant multiplexing patterns that are essential for robust concurrent programs. ...

    June 26, 2024 · 12 min · Rafiul Alam

    Go Memory Model Explained

    Go Concurrency Patterns Series: ← Context Propagation | Series Overview | Graceful Shutdown → What is the Go Memory Model? The Go Memory Model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine. Understanding this model is crucial for writing correct concurrent code without data races. Core Concepts: Happens-Before: Ordering guarantees between memory operations Memory Visibility: When writes in one goroutine are visible to reads in another Synchronization: Mechanisms that establish happens-before relationships Data Races: Concurrent memory accesses without proper synchronization Real-World Impact Correctness: Prevent subtle bugs in concurrent code Performance: Understand when synchronization is necessary Debugging: Diagnose race conditions and memory visibility issues Optimization: Make informed decisions about lock-free algorithms Code Review: Identify potential concurrency bugs The Happens-Before Relationship Definition A happens-before relationship guarantees that one event occurs before another in program order, and that effects of the first event are visible to the second. ...

    June 25, 2024 · 12 min · Rafiul Alam

    Context Propagation Patterns in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Memory Model → What is Context Propagation? Context propagation is the practice of threading context.Context through your application to carry cancellation signals, deadlines, and request-scoped values across API boundaries, goroutines, and service boundaries. This is critical for building observable, responsive distributed systems. Key Capabilities: Distributed Tracing: Propagate trace IDs across services Cancellation Cascades: Cancel entire request trees Deadline Enforcement: Ensure requests complete within time budgets Request-Scoped Values: Carry metadata without polluting function signatures Real-World Use Cases Microservices: Trace requests across multiple services API Gateways: Propagate timeouts and user context Database Layers: Cancel queries when requests are abandoned Message Queues: Propagate processing deadlines HTTP Middleware: Extract and inject trace headers gRPC Services: Automatic context propagation Basic Context Propagation Propagating Through Function Calls package main import ( "context" "fmt" "time" ) // ServiceA calls ServiceB which calls ServiceC // Context propagates through all layers func ServiceA(ctx context.Context, userID string) error { // Add request-scoped value ctx = context.WithValue(ctx, "user_id", userID) ctx = context.WithValue(ctx, "request_id", generateRequestID()) fmt.Printf("[ServiceA] Processing request for user: %s\n", userID) // Propagate context to next service return ServiceB(ctx) } func ServiceB(ctx context.Context) error { // Retrieve values from context userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceB] User: %s, Request: %s\n", userID, requestID) // Add timeout for downstream call ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() return ServiceC(ctx) } func ServiceC(ctx context.Context) error { userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceC] Processing for User: %s, Request: %s\n", userID, requestID) // Simulate work select { case <-time.After(1 * time.Second): fmt.Println("[ServiceC] Work completed") return nil case <-ctx.Done(): fmt.Printf("[ServiceC] Cancelled: %v\n", ctx.Err()) return ctx.Err() } } func generateRequestID() string { return fmt.Sprintf("req-%d", time.Now().UnixNano()) } func main() { ctx := context.Background() err := ServiceA(ctx, "user-123") if err != nil { fmt.Printf("Error: %v\n", err) } } Output: ...

    June 24, 2024 · 11 min · Rafiul Alam

    Context Pattern in Go

    Go Concurrency Patterns Series: ← Once Pattern | Series Overview | Circuit Breaker → What is the Context Pattern? The Context pattern uses Go’s context package to carry cancellation signals, deadlines, timeouts, and request-scoped values across API boundaries and between goroutines. It’s essential for building responsive, cancellable operations and managing request lifecycles. Key Features: Cancellation: Signal when operations should stop Timeouts: Automatically cancel after a duration Deadlines: Cancel at a specific time Values: Carry request-scoped data Real-World Use Cases HTTP Servers: Request cancellation and timeouts Database Operations: Query timeouts and cancellation API Calls: External service timeouts Background Jobs: Graceful shutdown Microservices: Request tracing and correlation IDs File Operations: Long-running I/O with cancellation Basic Context Usage package main import ( "context" "fmt" "math/rand" "time" ) // simulateWork simulates a long-running operation func simulateWork(ctx context.Context, name string, duration time.Duration) error { fmt.Printf("%s: Starting work (expected duration: %v)\n", name, duration) select { case <-time.After(duration): fmt.Printf("%s: Work completed successfully\n", name) return nil case <-ctx.Done(): fmt.Printf("%s: Work cancelled: %v\n", name, ctx.Err()) return ctx.Err() } } func main() { // Example 1: Context with timeout fmt.Println("=== Context with Timeout ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel1() err := simulateWork(ctx1, "Task1", 1*time.Second) // Should complete if err != nil { fmt.Printf("Task1 error: %v\n", err) } err = simulateWork(ctx1, "Task2", 3*time.Second) // Should timeout if err != nil { fmt.Printf("Task2 error: %v\n", err) } // Example 2: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx2, cancel2 := context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) fmt.Println("Cancelling context...") cancel2() }() err = simulateWork(ctx2, "Task3", 3*time.Second) // Should be cancelled if err != nil { fmt.Printf("Task3 error: %v\n", err) } // Example 3: Context with deadline fmt.Println("\n=== Context with Deadline ===") deadline := time.Now().Add(1500 * time.Millisecond) ctx3, cancel3 := context.WithDeadline(context.Background(), deadline) defer cancel3() err = simulateWork(ctx3, "Task4", 2*time.Second) // Should hit deadline if err != nil { fmt.Printf("Task4 error: %v\n", err) } } Context with Values package main import ( "context" "fmt" "log" "net/http" "time" ) // Key types for context values type contextKey string const ( RequestIDKey contextKey = "requestID" UserIDKey contextKey = "userID" TraceIDKey contextKey = "traceID" ) // RequestInfo holds request-scoped information type RequestInfo struct { RequestID string UserID string TraceID string StartTime time.Time } // withRequestInfo adds request information to context func withRequestInfo(ctx context.Context, info RequestInfo) context.Context { ctx = context.WithValue(ctx, RequestIDKey, info.RequestID) ctx = context.WithValue(ctx, UserIDKey, info.UserID) ctx = context.WithValue(ctx, TraceIDKey, info.TraceID) return ctx } // getRequestID extracts request ID from context func getRequestID(ctx context.Context) string { if id, ok := ctx.Value(RequestIDKey).(string); ok { return id } return "unknown" } // getUserID extracts user ID from context func getUserID(ctx context.Context) string { if id, ok := ctx.Value(UserIDKey).(string); ok { return id } return "anonymous" } // getTraceID extracts trace ID from context func getTraceID(ctx context.Context) string { if id, ok := ctx.Value(TraceIDKey).(string); ok { return id } return "no-trace" } // logWithContext logs with context information func logWithContext(ctx context.Context, message string) { requestID := getRequestID(ctx) userID := getUserID(ctx) traceID := getTraceID(ctx) fmt.Printf("[%s][%s][%s] %s\n", requestID, userID, traceID, message) } // businessLogic simulates business logic that uses context func businessLogic(ctx context.Context) error { logWithContext(ctx, "Starting business logic") // Simulate some work select { case <-time.After(500 * time.Millisecond): logWithContext(ctx, "Business logic completed") return nil case <-ctx.Done(): logWithContext(ctx, "Business logic cancelled") return ctx.Err() } } // databaseOperation simulates a database operation func databaseOperation(ctx context.Context, query string) error { logWithContext(ctx, fmt.Sprintf("Executing query: %s", query)) select { case <-time.After(200 * time.Millisecond): logWithContext(ctx, "Database operation completed") return nil case <-ctx.Done(): logWithContext(ctx, "Database operation cancelled") return ctx.Err() } } // externalAPICall simulates calling an external API func externalAPICall(ctx context.Context, endpoint string) error { logWithContext(ctx, fmt.Sprintf("Calling external API: %s", endpoint)) select { case <-time.After(300 * time.Millisecond): logWithContext(ctx, "External API call completed") return nil case <-ctx.Done(): logWithContext(ctx, "External API call cancelled") return ctx.Err() } } // handleRequest simulates handling an HTTP request func handleRequest(ctx context.Context) error { logWithContext(ctx, "Handling request") // Perform multiple operations if err := databaseOperation(ctx, "SELECT * FROM users"); err != nil { return err } if err := externalAPICall(ctx, "/api/v1/data"); err != nil { return err } if err := businessLogic(ctx); err != nil { return err } logWithContext(ctx, "Request handled successfully") return nil } func main() { // Simulate incoming request requestInfo := RequestInfo{ RequestID: "req-12345", UserID: "user-67890", TraceID: "trace-abcdef", StartTime: time.Now(), } // Create context with timeout and request info ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() ctx = withRequestInfo(ctx, requestInfo) // Handle the request if err := handleRequest(ctx); err != nil { logWithContext(ctx, fmt.Sprintf("Request failed: %v", err)) } // Example with early cancellation fmt.Println("\n=== Early Cancellation Example ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) requestInfo2 := RequestInfo{ RequestID: "req-54321", UserID: "user-09876", TraceID: "trace-fedcba", StartTime: time.Now(), } ctx2 = withRequestInfo(ctx2, requestInfo2) // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) logWithContext(ctx2, "Cancelling request early") cancel2() }() if err := handleRequest(ctx2); err != nil { logWithContext(ctx2, fmt.Sprintf("Request failed: %v", err)) } } HTTP Server with Context package main import ( "context" "encoding/json" "fmt" "log" "math/rand" "net/http" "strconv" "time" ) // Response represents an API response type Response struct { Message string `json:"message"` RequestID string `json:"request_id"` Duration time.Duration `json:"duration"` Data interface{} `json:"data,omitempty"` } // middleware adds request ID and timeout to context func middleware(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Generate request ID requestID := fmt.Sprintf("req-%d", time.Now().UnixNano()) // Get timeout from query parameter (default 5 seconds) timeoutStr := r.URL.Query().Get("timeout") timeout := 5 * time.Second if timeoutStr != "" { if t, err := time.ParseDuration(timeoutStr); err == nil { timeout = t } } // Create context with timeout ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() // Add request ID to context ctx = context.WithValue(ctx, RequestIDKey, requestID) // Create new request with updated context r = r.WithContext(ctx) // Add request ID to response headers w.Header().Set("X-Request-ID", requestID) next(w, r) } } // simulateSlowOperation simulates a slow operation that respects context func simulateSlowOperation(ctx context.Context, duration time.Duration) (string, error) { select { case <-time.After(duration): return fmt.Sprintf("Operation completed after %v", duration), nil case <-ctx.Done(): return "", ctx.Err() } } // fastHandler handles requests quickly func fastHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) result, err := simulateSlowOperation(ctx, 100*time.Millisecond) duration := time.Since(start) response := Response{ RequestID: requestID, Duration: duration, } if err != nil { response.Message = "Request failed" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // slowHandler handles requests that might timeout func slowHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Random duration between 1-10 seconds duration := time.Duration(1+rand.Intn(10)) * time.Second result, err := simulateSlowOperation(ctx, duration) elapsed := time.Since(start) response := Response{ RequestID: requestID, Duration: elapsed, } if err != nil { response.Message = "Request timed out or cancelled" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // batchHandler processes multiple operations func batchHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Get batch size from query parameter batchSizeStr := r.URL.Query().Get("size") batchSize := 3 if batchSizeStr != "" { if size, err := strconv.Atoi(batchSizeStr); err == nil && size > 0 { batchSize = size } } results := make([]string, 0, batchSize) // Process operations sequentially, checking context each time for i := 0; i < batchSize; i++ { select { case <-ctx.Done(): // Context cancelled, return partial results response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch cancelled after %d/%d operations", i, batchSize), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return default: } result, err := simulateSlowOperation(ctx, 200*time.Millisecond) if err != nil { response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch failed at operation %d: %v", i+1, err), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return } results = append(results, fmt.Sprintf("Op%d: %s", i+1, result)) } response := Response{ RequestID: requestID, Duration: time.Since(start), Message: "Batch completed successfully", Data: results, } json.NewEncoder(w).Encode(response) } func main() { http.HandleFunc("/fast", middleware(fastHandler)) http.HandleFunc("/slow", middleware(slowHandler)) http.HandleFunc("/batch", middleware(batchHandler)) fmt.Println("Server starting on :8080") fmt.Println("Endpoints:") fmt.Println(" GET /fast - Fast operation (100ms)") fmt.Println(" GET /slow - Slow operation (1-10s random)") fmt.Println(" GET /batch?size=N - Batch operations") fmt.Println(" Add ?timeout=5s to set custom timeout") log.Fatal(http.ListenAndServe(":8080", nil)) } Context Propagation in Goroutines package main import ( "context" "fmt" "sync" "time" ) // Worker represents a worker that processes tasks type Worker struct { ID int Name string } // ProcessTask processes a task with context func (w *Worker) ProcessTask(ctx context.Context, taskID int) error { requestID := getRequestID(ctx) fmt.Printf("Worker %d (%s) [%s]: Starting task %d\n", w.ID, w.Name, requestID, taskID) // Simulate work with multiple steps for step := 1; step <= 3; step++ { select { case <-time.After(200 * time.Millisecond): fmt.Printf("Worker %d (%s) [%s]: Task %d step %d completed\n", w.ID, w.Name, requestID, taskID, step) case <-ctx.Done(): fmt.Printf("Worker %d (%s) [%s]: Task %d cancelled at step %d: %v\n", w.ID, w.Name, requestID, taskID, step, ctx.Err()) return ctx.Err() } } fmt.Printf("Worker %d (%s) [%s]: Task %d completed successfully\n", w.ID, w.Name, requestID, taskID) return nil } // TaskManager manages task distribution type TaskManager struct { workers []Worker } // NewTaskManager creates a new task manager func NewTaskManager() *TaskManager { return &TaskManager{ workers: []Worker{ {ID: 1, Name: "Alice"}, {ID: 2, Name: "Bob"}, {ID: 3, Name: "Charlie"}, }, } } // ProcessTasksConcurrently processes tasks using multiple workers func (tm *TaskManager) ProcessTasksConcurrently(ctx context.Context, taskCount int) error { var wg sync.WaitGroup taskChan := make(chan int, taskCount) errorChan := make(chan error, len(tm.workers)) // Send tasks to channel go func() { defer close(taskChan) for i := 1; i <= taskCount; i++ { select { case taskChan <- i: case <-ctx.Done(): return } } }() // Start workers for _, worker := range tm.workers { wg.Add(1) go func(w Worker) { defer wg.Done() for { select { case taskID, ok := <-taskChan: if !ok { return // No more tasks } if err := w.ProcessTask(ctx, taskID); err != nil { select { case errorChan <- err: case <-ctx.Done(): } return } case <-ctx.Done(): return } } }(worker) } // Wait for completion or cancellation done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: close(errorChan) // Check for errors for err := range errorChan { if err != nil { return err } } return nil case <-ctx.Done(): return ctx.Err() } } func main() { manager := NewTaskManager() // Example 1: Normal completion fmt.Println("=== Normal Completion ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) ctx1 = context.WithValue(ctx1, RequestIDKey, "batch-001") defer cancel1() err := manager.ProcessTasksConcurrently(ctx1, 6) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 2: Timeout scenario fmt.Println("\n=== Timeout Scenario ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) ctx2 = context.WithValue(ctx2, RequestIDKey, "batch-002") defer cancel2() err = manager.ProcessTasksConcurrently(ctx2, 10) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 3: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx3, cancel3 := context.WithCancel(context.Background()) ctx3 = context.WithValue(ctx3, RequestIDKey, "batch-003") // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) fmt.Println("Manually cancelling batch...") cancel3() }() err = manager.ProcessTasksConcurrently(ctx3, 8) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } } Best Practices Always Accept Context: Functions that might block should accept context as first parameter Don’t Store Context: Pass context as parameter, don’t store in structs Use context.TODO(): When you don’t have context but need one Derive Contexts: Create child contexts from parent contexts Handle Cancellation: Always check ctx.Done() in long-running operations Limit Context Values: Use sparingly and for request-scoped data only Use Typed Keys: Define custom types for context keys to avoid collisions Common Pitfalls 1. Ignoring Context Cancellation // Bad: Ignoring context cancellation func badOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { // Long operation without checking context time.Sleep(10 * time.Millisecond) // Process item i } return nil } // Good: Checking context regularly func goodOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { select { case <-ctx.Done(): return ctx.Err() default: } time.Sleep(10 * time.Millisecond) // Process item i } return nil } 2. Using Context for Optional Parameters // Bad: Using context for optional parameters func badFunction(ctx context.Context) { if timeout, ok := ctx.Value("timeout").(time.Duration); ok { // Use timeout } } // Good: Use function parameters for optional values func goodFunction(ctx context.Context, timeout time.Duration) { // Use timeout parameter } The Context pattern is fundamental for building robust, cancellable operations in Go. It enables graceful handling of timeouts, cancellations, and request-scoped data, making your applications more responsive and resource-efficient. ...

    June 19, 2024 · 10 min · Rafiul Alam

    Go Concurrency Pattern: Channel Fundamentals

    Go Concurrency Patterns Series: ← Goroutine Basics | Series Overview | Select Statement → What are Channels? Channels are Go’s primary mechanism for communication between goroutines. They embody Go’s concurrency philosophy: “Don’t communicate by sharing memory; share memory by communicating.” Think of channels as typed pipes that allow goroutines to safely pass data back and forth. Channels provide both communication and synchronization, making them incredibly powerful for building concurrent applications. They’re type-safe, can be buffered or unbuffered, and support directional constraints for better API design. ...

    June 19, 2024 · 12 min · Rafiul Alam