Go Concurrency Pattern: Pipeline

    Go Concurrency Patterns Series: ← Select Statement | Series Overview | Fan-Out/Fan-In → What is the Pipeline Pattern? The Pipeline pattern is a powerful way to structure concurrent data processing by breaking work into stages connected by channels. Each stage runs in its own goroutine, receives data from an input channel, processes it, and sends results to an output channel. This creates a chain of processing stages that can run concurrently, dramatically improving throughput. ...

    July 3, 2024 · 15 min · Rafiul Alam

    Mutex Patterns in Go

    Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern → What are Mutex Patterns? Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions. Key Types: Mutex: Exclusive access (one goroutine at a time) RWMutex: Reader-writer locks (multiple readers OR one writer) Lock-free: Atomic operations without explicit locks Real-World Use Cases Shared Counters: Statistics, metrics, rate limiting Cache Systems: Thread-safe caching with read/write operations Configuration Management: Safe updates to application config Connection Pools: Managing database/HTTP connection pools Resource Allocation: Tracking and managing limited resources State Machines: Protecting state transitions Basic Mutex Usage package main import ( "fmt" "sync" "time" ) // Counter demonstrates basic mutex usage type Counter struct { mu sync.Mutex value int } // Increment safely increments the counter func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } // Decrement safely decrements the counter func (c *Counter) Decrement() { c.mu.Lock() defer c.mu.Unlock() c.value-- } // Value safely returns the current value func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Add safely adds a value to the counter func (c *Counter) Add(delta int) { c.mu.Lock() defer c.mu.Unlock() c.value += delta } func main() { counter := &Counter{} var wg sync.WaitGroup // Start multiple goroutines incrementing the counter for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { counter.Increment() } fmt.Printf("Goroutine %d finished\n", id) }(i) } // Start some goroutines decrementing for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { counter.Decrement() } fmt.Printf("Decrementer %d finished\n", id) }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850 } RWMutex for Read-Heavy Workloads package main import ( "fmt" "math/rand" "sync" "time" ) // Cache demonstrates RWMutex usage for read-heavy scenarios type Cache struct { mu sync.RWMutex data map[string]interface{} } // NewCache creates a new cache func NewCache() *Cache { return &Cache{ data: make(map[string]interface{}), } } // Get retrieves a value (read operation) func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() defer c.mu.RUnlock() value, exists := c.data[key] return value, exists } // Set stores a value (write operation) func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() defer c.mu.Unlock() c.data[key] = value } // Delete removes a value (write operation) func (c *Cache) Delete(key string) { c.mu.Lock() defer c.mu.Unlock() delete(c.data, key) } // Keys returns all keys (read operation) func (c *Cache) Keys() []string { c.mu.RLock() defer c.mu.RUnlock() keys := make([]string, 0, len(c.data)) for key := range c.data { keys = append(keys, key) } return keys } // Size returns the number of items (read operation) func (c *Cache) Size() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) } // Clear removes all items (write operation) func (c *Cache) Clear() { c.mu.Lock() defer c.mu.Unlock() c.data = make(map[string]interface{}) } func main() { cache := NewCache() var wg sync.WaitGroup // Writers (fewer, less frequent) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { key := fmt.Sprintf("key-%d-%d", id, j) cache.Set(key, fmt.Sprintf("value-%d-%d", id, j)) time.Sleep(10 * time.Millisecond) } }(i) } // Readers (many, frequent) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { // Try to read random keys key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50)) if value, exists := cache.Get(key); exists { fmt.Printf("Reader %d found %s: %v\n", id, key, value) } time.Sleep(5 * time.Millisecond) } }(i) } // Size checker wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { size := cache.Size() fmt.Printf("Cache size: %d\n", size) time.Sleep(100 * time.Millisecond) } }() wg.Wait() fmt.Printf("Final cache size: %d\n", cache.Size()) } Advanced Mutex Patterns 1. Conditional Variables Pattern package main import ( "fmt" "sync" "time" ) // Buffer demonstrates conditional variables with mutex type Buffer struct { mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond items []interface{} capacity int } // NewBuffer creates a new bounded buffer func NewBuffer(capacity int) *Buffer { b := &Buffer{ items: make([]interface{}, 0, capacity), capacity: capacity, } b.notEmpty = sync.NewCond(&b.mu) b.notFull = sync.NewCond(&b.mu) return b } // Put adds an item to the buffer (blocks if full) func (b *Buffer) Put(item interface{}) { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is full for len(b.items) == b.capacity { b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not empty b.notEmpty.Signal() } // Get removes an item from the buffer (blocks if empty) func (b *Buffer) Get() interface{} { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is empty for len(b.items) == 0 { b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not full b.notFull.Signal() return item } // Size returns current buffer size func (b *Buffer) Size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.items) } func main() { buffer := NewBuffer(3) var wg sync.WaitGroup // Producers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := fmt.Sprintf("item-%d-%d", id, j) buffer.Put(item) time.Sleep(200 * time.Millisecond) } }(i) } // Consumers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := buffer.Get() fmt.Printf("Consumer %d processed: %v\n", id, item) time.Sleep(300 * time.Millisecond) } }(i) } wg.Wait() } 2. Lock-Free Patterns with Atomic Operations package main import ( "fmt" "sync" "sync/atomic" "time" ) // AtomicCounter demonstrates lock-free counter type AtomicCounter struct { value int64 } // Increment atomically increments the counter func (ac *AtomicCounter) Increment() int64 { return atomic.AddInt64(&ac.value, 1) } // Decrement atomically decrements the counter func (ac *AtomicCounter) Decrement() int64 { return atomic.AddInt64(&ac.value, -1) } // Value atomically reads the current value func (ac *AtomicCounter) Value() int64 { return atomic.LoadInt64(&ac.value) } // CompareAndSwap atomically compares and swaps func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&ac.value, old, new) } // AtomicFlag demonstrates atomic boolean operations type AtomicFlag struct { flag int32 } // Set atomically sets the flag to true func (af *AtomicFlag) Set() { atomic.StoreInt32(&af.flag, 1) } // Clear atomically sets the flag to false func (af *AtomicFlag) Clear() { atomic.StoreInt32(&af.flag, 0) } // IsSet atomically checks if flag is set func (af *AtomicFlag) IsSet() bool { return atomic.LoadInt32(&af.flag) == 1 } // TestAndSet atomically tests and sets the flag func (af *AtomicFlag) TestAndSet() bool { return atomic.SwapInt32(&af.flag, 1) == 1 } func main() { counter := &AtomicCounter{} flag := &AtomicFlag{} var wg sync.WaitGroup // Test atomic counter fmt.Println("Testing atomic counter...") for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Test atomic flag fmt.Println("\nTesting atomic flag...") // Multiple goroutines trying to set flag for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() if !flag.TestAndSet() { fmt.Printf("Goroutine %d acquired the flag\n", id) time.Sleep(100 * time.Millisecond) flag.Clear() fmt.Printf("Goroutine %d released the flag\n", id) } else { fmt.Printf("Goroutine %d failed to acquire flag\n", id) } }(i) } wg.Wait() } 3. Resource Pool Pattern package main import ( "errors" "fmt" "sync" "time" ) // Resource represents a limited resource type Resource struct { ID int Data string } // ResourcePool manages a pool of limited resources type ResourcePool struct { mu sync.Mutex resources []*Resource available chan *Resource maxSize int created int } // NewResourcePool creates a new resource pool func NewResourcePool(maxSize int) *ResourcePool { return &ResourcePool{ resources: make([]*Resource, 0, maxSize), available: make(chan *Resource, maxSize), maxSize: maxSize, } } // createResource creates a new resource func (rp *ResourcePool) createResource() *Resource { rp.mu.Lock() defer rp.mu.Unlock() if rp.created >= rp.maxSize { return nil } rp.created++ resource := &Resource{ ID: rp.created, Data: fmt.Sprintf("Resource-%d", rp.created), } fmt.Printf("Created resource %d\n", resource.ID) return resource } // Acquire gets a resource from the pool func (rp *ResourcePool) Acquire() (*Resource, error) { select { case resource := <-rp.available: fmt.Printf("Acquired existing resource %d\n", resource.ID) return resource, nil default: // No available resource, try to create one if resource := rp.createResource(); resource != nil { return resource, nil } // Pool is full, wait for available resource fmt.Println("Pool full, waiting for available resource...") select { case resource := <-rp.available: fmt.Printf("Acquired resource %d after waiting\n", resource.ID) return resource, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout waiting for resource") } } } // Release returns a resource to the pool func (rp *ResourcePool) Release(resource *Resource) { select { case rp.available <- resource: fmt.Printf("Released resource %d\n", resource.ID) default: // Channel full, resource will be garbage collected fmt.Printf("Pool full, discarding resource %d\n", resource.ID) } } // Size returns current pool statistics func (rp *ResourcePool) Size() (available, created int) { rp.mu.Lock() defer rp.mu.Unlock() return len(rp.available), rp.created } func main() { pool := NewResourcePool(3) var wg sync.WaitGroup // Multiple goroutines using resources for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := pool.Acquire() if err != nil { fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err) return } fmt.Printf("Worker %d using resource %d\n", id, resource.ID) // Simulate work time.Sleep(time.Duration(200+id*100) * time.Millisecond) pool.Release(resource) fmt.Printf("Worker %d finished\n", id) }(i) } wg.Wait() available, created := pool.Size() fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created) } Best Practices Always Use defer: Ensure locks are released even if panic occurs Keep Critical Sections Small: Minimize time holding locks Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies Use RWMutex for Read-Heavy: Better performance for read-heavy workloads Consider Lock-Free: Use atomic operations when possible Document Lock Order: If multiple locks needed, establish clear ordering Prefer Channels: Use channels for communication, locks for shared state Common Pitfalls 1. Deadlocks // Bad: Potential deadlock with nested locks type BadAccount struct { mu sync.Mutex balance int } func (a *BadAccount) Transfer(to *BadAccount, amount int) { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() // Potential deadlock if called concurrently defer to.mu.Unlock() a.balance -= amount to.balance += amount } // Good: Ordered locking to prevent deadlock func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) { // Always acquire locks in consistent order first, second := a, to if a.ID > to.ID { first, second = to, a } first.mu.Lock() defer first.mu.Unlock() second.mu.Lock() defer second.mu.Unlock() a.balance -= amount to.balance += amount } 2. Race Conditions // Bad: Race condition type BadCounter struct { mu sync.Mutex value int } func (c *BadCounter) IncrementIfEven() { if c.value%2 == 0 { // Race: value might change between check and increment c.mu.Lock() c.value++ c.mu.Unlock() } } // Good: Atomic check and update func (c *BadCounter) SafeIncrementIfEven() { c.mu.Lock() defer c.mu.Unlock() if c.value%2 == 0 { c.value++ } } Testing Concurrent Code package main import ( "sync" "testing" ) func TestCounter(t *testing.T) { counter := &Counter{} var wg sync.WaitGroup goroutines := 100 increments := 1000 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < increments; j++ { counter.Increment() } }() } wg.Wait() expected := goroutines * increments if counter.Value() != expected { t.Errorf("Expected %d, got %d", expected, counter.Value()) } } // Run with: go test -race func TestCounterRace(t *testing.T) { counter := &Counter{} go func() { for i := 0; i < 1000; i++ { counter.Increment() } }() go func() { for i := 0; i < 1000; i++ { _ = counter.Value() } }() } Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios. ...

    July 3, 2024 · 10 min · Rafiul Alam

    Distributed Tracing in Go

    Go Concurrency Patterns Series: ← Go Generics Patterns | Series Overview What is Distributed Tracing? Distributed tracing tracks requests as they flow through microservices, providing visibility into performance bottlenecks, service dependencies, and error propagation in distributed systems. Key Concepts: Trace: End-to-end journey of a request across services Span: Single unit of work within a trace Context Propagation: Carrying trace information across boundaries Sampling: Controlling which traces to collect Why OpenTelemetry? OpenTelemetry (OTel) is the industry standard for observability: ...

    June 29, 2024 · 10 min · Rafiul Alam

    Go Generics Design Patterns

    Go Concurrency Patterns Series: ← Graceful Shutdown | Series Overview | Distributed Tracing → What Are Go Generics? Go 1.18 introduced generics (type parameters), enabling type-safe, reusable code without sacrificing performance. This opens up new possibilities for implementing classic design patterns with compile-time type safety. Key Features: Type Parameters: Functions and types that work with any type Constraints: Restrict type parameters to specific interfaces Type Inference: Compiler deduces type arguments automatically Zero Runtime Cost: No boxing/unboxing like interface{} Real-World Use Cases Collections: Type-safe lists, maps, sets without reflection Algorithms: Generic sort, filter, map operations Data Structures: Stacks, queues, trees with any element type Caching: Generic cache implementations Functional Patterns: Map, filter, reduce with type safety Concurrent Patterns: Type-safe worker pools and pipelines Generic Data Structures Generic Stack package main import ( "fmt" "sync" ) // Stack is a generic LIFO data structure type Stack[T any] struct { items []T mu sync.RWMutex } func NewStack[T any]() *Stack[T] { return &Stack[T]{ items: make([]T, 0), } } func (s *Stack[T]) Push(item T) { s.mu.Lock() defer s.mu.Unlock() s.items = append(s.items, item) } func (s *Stack[T]) Pop() (T, bool) { s.mu.Lock() defer s.mu.Unlock() if len(s.items) == 0 { var zero T return zero, false } item := s.items[len(s.items)-1] s.items = s.items[:len(s.items)-1] return item, true } func (s *Stack[T]) Peek() (T, bool) { s.mu.RLock() defer s.mu.RUnlock() if len(s.items) == 0 { var zero T return zero, false } return s.items[len(s.items)-1], true } func (s *Stack[T]) Size() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.items) } func main() { // Integer stack intStack := NewStack[int]() intStack.Push(1) intStack.Push(2) intStack.Push(3) if val, ok := intStack.Pop(); ok { fmt.Printf("Popped: %d\n", val) // 3 } // String stack strStack := NewStack[string]() strStack.Push("hello") strStack.Push("world") if val, ok := strStack.Peek(); ok { fmt.Printf("Peek: %s\n", val) // world } } Generic Queue package main import ( "fmt" "sync" ) // Queue is a generic FIFO data structure type Queue[T any] struct { items []T mu sync.RWMutex } func NewQueue[T any]() *Queue[T] { return &Queue[T]{ items: make([]T, 0), } } func (q *Queue[T]) Enqueue(item T) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) } func (q *Queue[T]) Dequeue() (T, bool) { q.mu.Lock() defer q.mu.Unlock() if len(q.items) == 0 { var zero T return zero, false } item := q.items[0] q.items = q.items[1:] return item, true } func (q *Queue[T]) IsEmpty() bool { q.mu.RLock() defer q.mu.RUnlock() return len(q.items) == 0 } func main() { queue := NewQueue[string]() queue.Enqueue("first") queue.Enqueue("second") if item, ok := queue.Dequeue(); ok { fmt.Println(item) // first } } Generic Set package main import ( "fmt" "sync" ) // Set is a generic collection of unique elements type Set[T comparable] struct { items map[T]struct{} mu sync.RWMutex } func NewSet[T comparable]() *Set[T] { return &Set[T]{ items: make(map[T]struct{}), } } func (s *Set[T]) Add(item T) { s.mu.Lock() defer s.mu.Unlock() s.items[item] = struct{}{} } func (s *Set[T]) Remove(item T) { s.mu.Lock() defer s.mu.Unlock() delete(s.items, item) } func (s *Set[T]) Contains(item T) bool { s.mu.RLock() defer s.mu.RUnlock() _, exists := s.items[item] return exists } func (s *Set[T]) Size() int { s.mu.RLock() defer s.mu.RUnlock() return len(s.items) } func (s *Set[T]) Items() []T { s.mu.RLock() defer s.mu.RUnlock() items := make([]T, 0, len(s.items)) for item := range s.items { items = append(items, item) } return items } // Union returns a new set with elements from both sets func (s *Set[T]) Union(other *Set[T]) *Set[T] { result := NewSet[T]() s.mu.RLock() for item := range s.items { result.Add(item) } s.mu.RUnlock() other.mu.RLock() for item := range other.items { result.Add(item) } other.mu.RUnlock() return result } // Intersection returns a new set with common elements func (s *Set[T]) Intersection(other *Set[T]) *Set[T] { result := NewSet[T]() s.mu.RLock() defer s.mu.RUnlock() other.mu.RLock() defer other.mu.RUnlock() for item := range s.items { if _, exists := other.items[item]; exists { result.Add(item) } } return result } func main() { set1 := NewSet[int]() set1.Add(1) set1.Add(2) set1.Add(3) set2 := NewSet[int]() set2.Add(2) set2.Add(3) set2.Add(4) union := set1.Union(set2) fmt.Println("Union:", union.Items()) // [1 2 3 4] intersection := set1.Intersection(set2) fmt.Println("Intersection:", intersection.Items()) // [2 3] } Generic Cache Pattern package main import ( "fmt" "sync" "time" ) // CacheItem holds cached value with expiration type CacheItem[V any] struct { Value V Expiration time.Time } // Cache is a generic thread-safe cache with expiration type Cache[K comparable, V any] struct { items map[K]CacheItem[V] mu sync.RWMutex ttl time.Duration } func NewCache[K comparable, V any](ttl time.Duration) *Cache[K, V] { cache := &Cache[K, V]{ items: make(map[K]CacheItem[V]), ttl: ttl, } // Start cleanup goroutine go cache.cleanup() return cache } func (c *Cache[K, V]) Set(key K, value V) { c.mu.Lock() defer c.mu.Unlock() c.items[key] = CacheItem[V]{ Value: value, Expiration: time.Now().Add(c.ttl), } } func (c *Cache[K, V]) Get(key K) (V, bool) { c.mu.RLock() defer c.mu.RUnlock() item, exists := c.items[key] if !exists { var zero V return zero, false } // Check expiration if time.Now().After(item.Expiration) { var zero V return zero, false } return item.Value, true } func (c *Cache[K, V]) Delete(key K) { c.mu.Lock() defer c.mu.Unlock() delete(c.items, key) } func (c *Cache[K, V]) cleanup() { ticker := time.NewTicker(c.ttl) defer ticker.Stop() for range ticker.C { c.mu.Lock() now := time.Now() for key, item := range c.items { if now.After(item.Expiration) { delete(c.items, key) } } c.mu.Unlock() } } func main() { // String -> User cache type User struct { ID int Name string } cache := NewCache[string, User](5 * time.Second) cache.Set("user1", User{ID: 1, Name: "Alice"}) cache.Set("user2", User{ID: 2, Name: "Bob"}) if user, ok := cache.Get("user1"); ok { fmt.Printf("Found: %+v\n", user) } // Wait for expiration time.Sleep(6 * time.Second) if _, ok := cache.Get("user1"); !ok { fmt.Println("Cache expired") } } Generic Repository Pattern package main import ( "errors" "fmt" "sync" ) // Entity is a constraint for types with an ID type Entity interface { GetID() string } // User implements Entity type User struct { ID string Name string Email string } func (u User) GetID() string { return u.ID } // Product implements Entity type Product struct { ID string Name string Price float64 } func (p Product) GetID() string { return p.ID } // Repository is a generic CRUD interface type Repository[T Entity] interface { Create(item T) error Read(id string) (T, error) Update(item T) error Delete(id string) error List() []T } // InMemoryRepository is a generic in-memory implementation type InMemoryRepository[T Entity] struct { items map[string]T mu sync.RWMutex } func NewInMemoryRepository[T Entity]() *InMemoryRepository[T] { return &InMemoryRepository[T]{ items: make(map[string]T), } } func (r *InMemoryRepository[T]) Create(item T) error { r.mu.Lock() defer r.mu.Unlock() id := item.GetID() if _, exists := r.items[id]; exists { return errors.New("item already exists") } r.items[id] = item return nil } func (r *InMemoryRepository[T]) Read(id string) (T, error) { r.mu.RLock() defer r.mu.RUnlock() item, exists := r.items[id] if !exists { var zero T return zero, errors.New("item not found") } return item, nil } func (r *InMemoryRepository[T]) Update(item T) error { r.mu.Lock() defer r.mu.Unlock() id := item.GetID() if _, exists := r.items[id]; !exists { return errors.New("item not found") } r.items[id] = item return nil } func (r *InMemoryRepository[T]) Delete(id string) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.items[id]; !exists { return errors.New("item not found") } delete(r.items, id) return nil } func (r *InMemoryRepository[T]) List() []T { r.mu.RLock() defer r.mu.RUnlock() items := make([]T, 0, len(r.items)) for _, item := range r.items { items = append(items, item) } return items } func main() { // User repository userRepo := NewInMemoryRepository[User]() userRepo.Create(User{ID: "1", Name: "Alice", Email: "[email protected]"}) userRepo.Create(User{ID: "2", Name: "Bob", Email: "[email protected]"}) if user, err := userRepo.Read("1"); err == nil { fmt.Printf("User: %+v\n", user) } // Product repository productRepo := NewInMemoryRepository[Product]() productRepo.Create(Product{ID: "p1", Name: "Laptop", Price: 999.99}) productRepo.Create(Product{ID: "p2", Name: "Mouse", Price: 29.99}) products := productRepo.List() fmt.Printf("Products: %+v\n", products) } Generic Builder Pattern package main import "fmt" // Builder is a generic builder pattern type Builder[T any] struct { build func() T } func NewBuilder[T any](buildFunc func() T) *Builder[T] { return &Builder[T]{build: buildFunc} } func (b *Builder[T]) Build() T { return b.build() } // Fluent builder for complex types type HTTPRequest struct { Method string URL string Headers map[string]string Body string } type HTTPRequestBuilder struct { req HTTPRequest } func NewHTTPRequestBuilder() *HTTPRequestBuilder { return &HTTPRequestBuilder{ req: HTTPRequest{ Headers: make(map[string]string), }, } } func (b *HTTPRequestBuilder) Method(method string) *HTTPRequestBuilder { b.req.Method = method return b } func (b *HTTPRequestBuilder) URL(url string) *HTTPRequestBuilder { b.req.URL = url return b } func (b *HTTPRequestBuilder) Header(key, value string) *HTTPRequestBuilder { b.req.Headers[key] = value return b } func (b *HTTPRequestBuilder) Body(body string) *HTTPRequestBuilder { b.req.Body = body return b } func (b *HTTPRequestBuilder) Build() HTTPRequest { return b.req } // Generic fluent builder type FluentBuilder[T any] struct { value T } func NewFluentBuilder[T any](initial T) *FluentBuilder[T] { return &FluentBuilder[T]{value: initial} } func (b *FluentBuilder[T]) Apply(fn func(T) T) *FluentBuilder[T] { b.value = fn(b.value) return b } func (b *FluentBuilder[T]) Build() T { return b.value } func main() { // HTTP Request builder req := NewHTTPRequestBuilder(). Method("POST"). URL("https://api.example.com/users"). Header("Content-Type", "application/json"). Body(`{"name": "Alice"}`). Build() fmt.Printf("Request: %+v\n", req) // Generic fluent builder result := NewFluentBuilder(0). Apply(func(n int) int { return n + 10 }). Apply(func(n int) int { return n * 2 }). Apply(func(n int) int { return n - 5 }). Build() fmt.Printf("Result: %d\n", result) // 15 } Generic Option Pattern package main import ( "fmt" "time" ) // Option is a generic option function type Option[T any] func(*T) // Server configuration type ServerConfig struct { Host string Port int Timeout time.Duration MaxConns int ReadTimeout time.Duration WriteTimeout time.Duration } // Option functions func WithHost[T interface{ Host string }](host string) Option[T] { return func(c *T) { c.Host = host } } func WithPort[T interface{ Port int }](port int) Option[T] { return func(c *T) { c.Port = port } } func WithTimeout[T interface{ Timeout time.Duration }](timeout time.Duration) Option[T] { return func(c *T) { c.Timeout = timeout } } // Generic constructor with options func NewWithOptions[T any](initial T, opts ...Option[T]) T { for _, opt := range opts { opt(&initial) } return initial } // Server-specific options func ServerWithHost(host string) Option[ServerConfig] { return func(c *ServerConfig) { c.Host = host } } func ServerWithPort(port int) Option[ServerConfig] { return func(c *ServerConfig) { c.Port = port } } func ServerWithTimeout(timeout time.Duration) Option[ServerConfig] { return func(c *ServerConfig) { c.Timeout = timeout } } func NewServer(opts ...Option[ServerConfig]) ServerConfig { config := ServerConfig{ Host: "localhost", Port: 8080, Timeout: 30 * time.Second, MaxConns: 100, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } for _, opt := range opts { opt(&config) } return config } func main() { server := NewServer( ServerWithHost("0.0.0.0"), ServerWithPort(9000), ServerWithTimeout(60*time.Second), ) fmt.Printf("Server config: %+v\n", server) } Generic Result Type package main import ( "errors" "fmt" ) // Result represents either a value or an error type Result[T any] struct { value T err error } func Ok[T any](value T) Result[T] { return Result[T]{value: value} } func Err[T any](err error) Result[T] { var zero T return Result[T]{value: zero, err: err} } func (r Result[T]) IsOk() bool { return r.err == nil } func (r Result[T]) IsErr() bool { return r.err != nil } func (r Result[T]) Unwrap() (T, error) { return r.value, r.err } func (r Result[T]) UnwrapOr(defaultValue T) T { if r.IsErr() { return defaultValue } return r.value } // Map transforms the value if Ok func (r Result[T]) Map(fn func(T) T) Result[T] { if r.IsErr() { return r } return Ok(fn(r.value)) } // FlatMap chains operations func FlatMap[T any, U any](r Result[T], fn func(T) Result[U]) Result[U] { if r.IsErr() { return Err[U](r.err) } return fn(r.value) } // Example usage func divide(a, b int) Result[int] { if b == 0 { return Err[int](errors.New("division by zero")) } return Ok(a / b) } func main() { // Success case result1 := divide(10, 2) if value, err := result1.Unwrap(); err == nil { fmt.Printf("Result: %d\n", value) // 5 } // Error case result2 := divide(10, 0) value := result2.UnwrapOr(-1) fmt.Printf("Result with default: %d\n", value) // -1 // Chaining operations result3 := divide(20, 2). Map(func(n int) int { return n * 2 }). Map(func(n int) int { return n + 5 }) if value, err := result3.Unwrap(); err == nil { fmt.Printf("Chained result: %d\n", value) // 25 } } Generic Pipeline Pattern package main import ( "fmt" ) // Pipeline represents a chain of transformations type Pipeline[T any] struct { stages []func(T) T } func NewPipeline[T any]() *Pipeline[T] { return &Pipeline[T]{ stages: make([]func(T) T, 0), } } func (p *Pipeline[T]) Add(stage func(T) T) *Pipeline[T] { p.stages = append(p.stages, stage) return p } func (p *Pipeline[T]) Execute(input T) T { result := input for _, stage := range p.stages { result = stage(result) } return result } // Generic filter, map, reduce func Filter[T any](items []T, predicate func(T) bool) []T { result := make([]T, 0) for _, item := range items { if predicate(item) { result = append(result, item) } } return result } func Map[T any, U any](items []T, mapper func(T) U) []U { result := make([]U, len(items)) for i, item := range items { result[i] = mapper(item) } return result } func Reduce[T any, U any](items []T, initial U, reducer func(U, T) U) U { result := initial for _, item := range items { result = reducer(result, item) } return result } func main() { // Pipeline example pipeline := NewPipeline[int](). Add(func(n int) int { return n * 2 }). Add(func(n int) int { return n + 10 }). Add(func(n int) int { return n / 2 }) result := pipeline.Execute(5) fmt.Printf("Pipeline result: %d\n", result) // 10 // Filter, map, reduce numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} // Filter even numbers evens := Filter(numbers, func(n int) bool { return n%2 == 0 }) fmt.Printf("Evens: %v\n", evens) // Map to squares squares := Map(evens, func(n int) int { return n * n }) fmt.Printf("Squares: %v\n", squares) // Reduce to sum sum := Reduce(squares, 0, func(acc, n int) int { return acc + n }) fmt.Printf("Sum: %d\n", sum) } Generic Worker Pool package main import ( "context" "fmt" "sync" ) // Task represents a generic task type Task[T any, R any] struct { Input T Result chan R } // WorkerPool is a generic worker pool type WorkerPool[T any, R any] struct { workers int tasks chan Task[T, R] process func(T) R wg sync.WaitGroup } func NewWorkerPool[T any, R any](workers int, process func(T) R) *WorkerPool[T, R] { return &WorkerPool[T, R]{ workers: workers, tasks: make(chan Task[T, R], workers*2), process: process, } } func (wp *WorkerPool[T, R]) Start(ctx context.Context) { for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go wp.worker(ctx) } } func (wp *WorkerPool[T, R]) worker(ctx context.Context) { defer wp.wg.Done() for { select { case task, ok := <-wp.tasks: if !ok { return } result := wp.process(task.Input) task.Result <- result close(task.Result) case <-ctx.Done(): return } } } func (wp *WorkerPool[T, R]) Submit(input T) <-chan R { resultChan := make(chan R, 1) task := Task[T, R]{ Input: input, Result: resultChan, } wp.tasks <- task return resultChan } func (wp *WorkerPool[T, R]) Shutdown() { close(wp.tasks) wp.wg.Wait() } func main() { // Integer -> String worker pool pool := NewWorkerPool(3, func(n int) string { return fmt.Sprintf("Result: %d", n*n) }) ctx := context.Background() pool.Start(ctx) // Submit tasks results := make([]<-chan string, 0) for i := 1; i <= 10; i++ { results = append(results, pool.Submit(i)) } // Collect results for i, resultChan := range results { result := <-resultChan fmt.Printf("Task %d: %s\n", i+1, result) } pool.Shutdown() } Best Practices 1. Use Constraints Wisely // Too restrictive func Sum[T int | int64](values []T) T { ... } // Better: Use constraints package import "golang.org/x/exp/constraints" func Sum[T constraints.Ordered](values []T) T { ... } 2. Prefer Type Inference // Explicit type arguments result := Map[int, string](numbers, toString) // Better: Let compiler infer result := Map(numbers, toString) 3. Keep It Simple // Overly complex func Process[T any, U any, V any](a T, fn1 func(T) U, fn2 func(U) V) V { ... } // Better: Multiple simpler functions func Step1[T, U any](a T, fn func(T) U) U { ... } func Step2[U, V any](u U, fn func(U) V) V { ... } Performance Considerations Compile Time: Generics increase compile time slightly Runtime: Zero runtime cost - monomorphization at compile time Code Size: Can increase binary size due to type specialization Type Inference: Reduces verbosity but can slow compilation Conclusion Go generics enable type-safe, reusable patterns while maintaining Go’s simplicity and performance. ...

    June 28, 2024 · 13 min · Rafiul Alam

    Graceful Shutdown Patterns in Go

    Go Concurrency Patterns Series: ← Go Memory Model | Series Overview | Generics Patterns → What is Graceful Shutdown? Graceful shutdown is the process of cleanly stopping a running application by: Receiving shutdown signals (SIGTERM, SIGINT) Stopping acceptance of new requests Finishing in-flight requests Closing database connections and other resources Flushing logs and metrics Exiting with appropriate status code Why It Matters: Zero-downtime deployments: No dropped requests during rollouts Data integrity: Complete ongoing transactions Resource cleanup: Prevent leaks and corruption Observability: Flush pending logs and metrics Container orchestration: Proper Kubernetes pod termination Real-World Use Cases HTTP/gRPC servers: Drain active connections before shutdown Background workers: Complete current jobs, reject new ones Message consumers: Finish processing messages, commit offsets Database connections: Close pools cleanly Caching layers: Persist in-memory state Kubernetes deployments: Respect termination grace period Basic Signal Handling Simple Shutdown Handler package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" ) func main() { // Create signal channel sigChan := make(chan os.Signal, 1) // Register for SIGINT and SIGTERM signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Simulate application work done := make(chan bool) go func() { fmt.Println("Application running...") time.Sleep(30 * time.Second) done <- true }() // Wait for signal or completion select { case sig := <-sigChan: fmt.Printf("\nReceived signal: %v\n", sig) fmt.Println("Initiating graceful shutdown...") // Perform cleanup cleanup() fmt.Println("Shutdown complete") os.Exit(0) case <-done: fmt.Println("Application completed normally") } } func cleanup() { fmt.Println("Cleaning up resources...") time.Sleep(2 * time.Second) fmt.Println("Cleanup complete") } HTTP Server Graceful Shutdown Basic HTTP Server Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { // Create HTTP server server := &http.Server{ Addr: ":8080", Handler: setupRoutes(), } // Channel to listen for errors from the server serverErrors := make(chan error, 1) // Start HTTP server in goroutine go func() { log.Printf("Server starting on %s", server.Addr) serverErrors <- server.ListenAndServe() }() // Channel to listen for interrupt signals shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) // Block until we receive a signal or server error select { case err := <-serverErrors: log.Fatalf("Server error: %v", err) case sig := <-shutdown: log.Printf("Received signal: %v. Starting graceful shutdown...", sig) // Create context with timeout for shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Attempt graceful shutdown if err := server.Shutdown(ctx); err != nil { log.Printf("Graceful shutdown failed: %v", err) // Force close if graceful shutdown fails if err := server.Close(); err != nil { log.Fatalf("Force close failed: %v", err) } } log.Println("Server shutdown complete") } } func setupRoutes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello, World!") }) mux.HandleFunc("/slow", func(w http.ResponseWriter, r *http.Request) { // Simulate slow endpoint time.Sleep(5 * time.Second) fmt.Fprintf(w, "Slow response complete") }) return mux } Advanced HTTP Server with Multiple Resources package main import ( "context" "database/sql" "fmt" "log" "net/http" "os" "os/signal" "sync" "syscall" "time" _ "github.com/lib/pq" ) type Application struct { server *http.Server db *sql.DB logger *log.Logger shutdown chan struct{} wg sync.WaitGroup } func NewApplication() (*Application, error) { // Initialize database db, err := sql.Open("postgres", "postgres://localhost/mydb") if err != nil { return nil, fmt.Errorf("database connection failed: %w", err) } app := &Application{ db: db, logger: log.New(os.Stdout, "APP: ", log.LstdFlags), shutdown: make(chan struct{}), } // Setup HTTP server app.server = &http.Server{ Addr: ":8080", Handler: app.routes(), ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } return app, nil } func (app *Application) routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/health", app.healthHandler) mux.HandleFunc("/api/data", app.dataHandler) return mux } func (app *Application) healthHandler(w http.ResponseWriter, r *http.Request) { select { case <-app.shutdown: // Signal shutdown in progress w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "Shutting down") default: w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "OK") } } func (app *Application) dataHandler(w http.ResponseWriter, r *http.Request) { // Check if shutdown initiated select { case <-app.shutdown: http.Error(w, "Service shutting down", http.StatusServiceUnavailable) return default: } // Simulate database query ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() var count int err := app.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM items").Scan(&count) if err != nil { http.Error(w, "Database error", http.StatusInternalServerError) return } fmt.Fprintf(w, "Count: %d", count) } func (app *Application) Run() error { // Start HTTP server app.wg.Add(1) go func() { defer app.wg.Done() app.logger.Printf("Starting server on %s", app.server.Addr) if err := app.server.ListenAndServe(); err != http.ErrServerClosed { app.logger.Printf("Server error: %v", err) } }() // Start background worker app.wg.Add(1) go func() { defer app.wg.Done() app.backgroundWorker() }() return nil } func (app *Application) backgroundWorker() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: app.logger.Println("Background job executing...") // Do work case <-app.shutdown: app.logger.Println("Background worker shutting down...") return } } } func (app *Application) Shutdown(ctx context.Context) error { app.logger.Println("Starting graceful shutdown...") // Signal all components to stop close(app.shutdown) // Shutdown HTTP server app.logger.Println("Shutting down HTTP server...") if err := app.server.Shutdown(ctx); err != nil { return fmt.Errorf("HTTP server shutdown failed: %w", err) } // Wait for background workers to finish app.logger.Println("Waiting for background workers...") done := make(chan struct{}) go func() { app.wg.Wait() close(done) }() select { case <-done: app.logger.Println("All workers stopped") case <-ctx.Done(): return fmt.Errorf("shutdown timeout exceeded") } // Close database connections app.logger.Println("Closing database connections...") if err := app.db.Close(); err != nil { return fmt.Errorf("database close failed: %w", err) } app.logger.Println("Graceful shutdown complete") return nil } func main() { app, err := NewApplication() if err != nil { log.Fatalf("Application initialization failed: %v", err) } if err := app.Run(); err != nil { log.Fatalf("Application run failed: %v", err) } // Wait for shutdown signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) // Graceful shutdown with 30 second timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Application stopped") } Worker Pool Graceful Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Job struct { ID int Data string } type WorkerPool struct { jobs chan Job results chan int numWorkers int wg sync.WaitGroup shutdown chan struct{} } func NewWorkerPool(numWorkers, queueSize int) *WorkerPool { return &WorkerPool{ jobs: make(chan Job, queueSize), results: make(chan int, queueSize), numWorkers: numWorkers, shutdown: make(chan struct{}), } } func (wp *WorkerPool) Start() { for i := 0; i < wp.numWorkers; i++ { wp.wg.Add(1) go wp.worker(i) } log.Printf("Started %d workers", wp.numWorkers) } func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() log.Printf("Worker %d started", id) for { select { case job, ok := <-wp.jobs: if !ok { log.Printf("Worker %d: job channel closed, exiting", id) return } // Process job log.Printf("Worker %d processing job %d", id, job.ID) result := wp.processJob(job) wp.results <- result case <-wp.shutdown: // Drain remaining jobs before shutdown log.Printf("Worker %d: shutdown signal received, draining jobs...", id) for job := range wp.jobs { log.Printf("Worker %d processing remaining job %d", id, job.ID) result := wp.processJob(job) wp.results <- result } log.Printf("Worker %d: shutdown complete", id) return } } } func (wp *WorkerPool) processJob(job Job) int { // Simulate work time.Sleep(1 * time.Second) return job.ID * 2 } func (wp *WorkerPool) Submit(job Job) bool { select { case <-wp.shutdown: return false // Pool is shutting down case wp.jobs <- job: return true } } func (wp *WorkerPool) Shutdown(ctx context.Context) error { log.Println("WorkerPool: initiating shutdown...") // Signal workers to start draining close(wp.shutdown) // Close job channel to signal no more jobs close(wp.jobs) // Wait for workers with timeout done := make(chan struct{}) go func() { wp.wg.Wait() close(done) close(wp.results) }() select { case <-done: log.Println("WorkerPool: all workers completed") return nil case <-ctx.Done(): return fmt.Errorf("shutdown timeout: %w", ctx.Err()) } } func main() { pool := NewWorkerPool(3, 10) pool.Start() // Submit jobs go func() { for i := 1; i <= 20; i++ { job := Job{ID: i, Data: fmt.Sprintf("Job %d", i)} if !pool.Submit(job) { log.Printf("Failed to submit job %d (pool shutting down)", i) return } time.Sleep(200 * time.Millisecond) } }() // Collect results go func() { for result := range pool.results { log.Printf("Result: %d", result) } log.Println("All results collected") }() // Wait a bit then shutdown time.Sleep(5 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := pool.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Kubernetes-Aware Graceful Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) type KubernetesServer struct { server *http.Server shutdownDelay time.Duration terminationPeriod time.Duration } func NewKubernetesServer() *KubernetesServer { ks := &KubernetesServer{ // Delay before starting shutdown to allow load balancer de-registration shutdownDelay: 5 * time.Second, // Total Kubernetes termination grace period terminationPeriod: 30 * time.Second, } mux := http.NewServeMux() mux.HandleFunc("/health", ks.healthHandler) mux.HandleFunc("/readiness", ks.readinessHandler) mux.HandleFunc("/", ks.requestHandler) ks.server = &http.Server{ Addr: ":8080", Handler: mux, } return ks } var ( isHealthy = true isReady = true ) func (ks *KubernetesServer) healthHandler(w http.ResponseWriter, r *http.Request) { if isHealthy { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "healthy") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "unhealthy") } } func (ks *KubernetesServer) readinessHandler(w http.ResponseWriter, r *http.Request) { if isReady { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "ready") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "not ready") } } func (ks *KubernetesServer) requestHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Request processed at %s", time.Now().Format(time.RFC3339)) } func (ks *KubernetesServer) Run() error { go func() { log.Printf("Server starting on %s", ks.server.Addr) if err := ks.server.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } }() return nil } func (ks *KubernetesServer) GracefulShutdown() error { // Step 1: Mark as not ready (stop receiving new traffic from load balancer) log.Println("Step 1: Marking pod as not ready...") isReady = false // Step 2: Wait for load balancer to de-register log.Printf("Step 2: Waiting %v for load balancer de-registration...", ks.shutdownDelay) time.Sleep(ks.shutdownDelay) // Step 3: Stop accepting new connections and drain existing ones log.Println("Step 3: Shutting down HTTP server...") // Calculate remaining time for shutdown shutdownTimeout := ks.terminationPeriod - ks.shutdownDelay - (2 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := ks.server.Shutdown(ctx); err != nil { log.Printf("Server shutdown error: %v", err) return err } log.Println("Step 4: Graceful shutdown complete") return nil } func main() { server := NewKubernetesServer() if err := server.Run(); err != nil { log.Fatalf("Server run failed: %v", err) } // Wait for termination signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) if err := server.GracefulShutdown(); err != nil { log.Fatalf("Graceful shutdown failed: %v", err) os.Exit(1) } log.Println("Server stopped successfully") } Database Connection Cleanup package main import ( "context" "database/sql" "log" "time" _ "github.com/lib/pq" ) type DatabaseManager struct { db *sql.DB } func NewDatabaseManager(connString string) (*DatabaseManager, error) { db, err := sql.Open("postgres", connString) if err != nil { return nil, err } // Configure connection pool db.SetMaxOpenConns(25) db.SetMaxIdleConns(5) db.SetConnMaxLifetime(5 * time.Minute) db.SetConnMaxIdleTime(10 * time.Minute) return &DatabaseManager{db: db}, nil } func (dm *DatabaseManager) Shutdown(ctx context.Context) error { log.Println("Closing database connections...") // Wait for active queries to complete or timeout done := make(chan error, 1) go func() { // Close will wait for all connections to be returned to pool done <- dm.db.Close() }() select { case err := <-done: if err != nil { return err } log.Println("Database connections closed successfully") return nil case <-ctx.Done(): log.Println("Database close timeout exceeded") return ctx.Err() } } // QueryWithShutdown performs query with shutdown awareness func (dm *DatabaseManager) QueryWithShutdown(ctx context.Context, query string) error { // Check if context is already cancelled (shutdown initiated) if ctx.Err() != nil { return ctx.Err() } rows, err := dm.db.QueryContext(ctx, query) if err != nil { return err } defer rows.Close() // Process rows for rows.Next() { // Check for shutdown during processing if ctx.Err() != nil { return ctx.Err() } // Process row... } return rows.Err() } Message Queue Consumer Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Message struct { ID string Payload string } type Consumer struct { messages chan Message shutdown chan struct{} wg sync.WaitGroup } func NewConsumer() *Consumer { return &Consumer{ messages: make(chan Message, 100), shutdown: make(chan struct{}), } } func (c *Consumer) Start() { c.wg.Add(1) go c.consume() log.Println("Consumer started") } func (c *Consumer) consume() { defer c.wg.Done() for { select { case msg := <-c.messages: // Process message if err := c.processMessage(msg); err != nil { log.Printf("Error processing message %s: %v", msg.ID, err) // In production: send to dead letter queue } case <-c.shutdown: log.Println("Consumer: shutdown initiated, processing remaining messages...") // Drain remaining messages for msg := range c.messages { log.Printf("Consumer: processing remaining message %s", msg.ID) if err := c.processMessage(msg); err != nil { log.Printf("Error processing remaining message %s: %v", msg.ID, err) } } log.Println("Consumer: all messages processed") return } } } func (c *Consumer) processMessage(msg Message) error { log.Printf("Processing message: %s", msg.ID) // Simulate processing time.Sleep(500 * time.Millisecond) // Acknowledge message log.Printf("Message %s processed successfully", msg.ID) return nil } func (c *Consumer) Shutdown(ctx context.Context) error { log.Println("Consumer: initiating shutdown...") // Stop accepting new messages close(c.shutdown) close(c.messages) // Wait for processing to complete done := make(chan struct{}) go func() { c.wg.Wait() close(done) }() select { case <-done: log.Println("Consumer: shutdown complete") return nil case <-ctx.Done(): return fmt.Errorf("consumer shutdown timeout: %w", ctx.Err()) } } func main() { consumer := NewConsumer() consumer.Start() // Simulate receiving messages go func() { for i := 1; i <= 10; i++ { msg := Message{ ID: fmt.Sprintf("msg-%d", i), Payload: fmt.Sprintf("Payload %d", i), } consumer.messages <- msg time.Sleep(300 * time.Millisecond) } }() // Wait then shutdown time.Sleep(3 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := consumer.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Best Practices 1. Use Context for Timeouts // Set realistic shutdown timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { // Handle timeout } 2. Implement Shutdown Sequence func (app *Application) Shutdown(ctx context.Context) error { // 1. Stop health checks (remove from load balancer) app.setUnhealthy() // 2. Wait for de-registration time.Sleep(5 * time.Second) // 3. Stop accepting new requests app.server.Shutdown(ctx) // 4. Finish in-flight requests app.wg.Wait() // 5. Close resources app.db.Close() return nil } 3. Test Graceful Shutdown func TestGracefulShutdown(t *testing.T) { app := NewApplication() app.Run() // Start long-running request go func() { resp, err := http.Get("http://localhost:8080/slow") if err != nil { t.Errorf("Request failed: %v", err) } defer resp.Body.Close() }() // Wait for request to start time.Sleep(100 * time.Millisecond) // Initiate shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { t.Fatalf("Shutdown failed: %v", err) } } Kubernetes Configuration Pod Termination Lifecycle apiVersion: apps/v1 kind: Deployment metadata: name: myapp spec: template: spec: containers: - name: myapp image: myapp:latest ports: - containerPort: 8080 # Liveness probe (restart if unhealthy) livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 # Readiness probe (remove from service if not ready) readinessProbe: httpGet: path: /readiness port: 8080 initialDelaySeconds: 5 periodSeconds: 5 # Graceful shutdown configuration lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 5"] # Termination grace period (must be > shutdown delay + max request time) terminationGracePeriodSeconds: 30 Common Pitfalls Not Handling SIGTERM: Container orchestrators send SIGTERM for graceful shutdown Insufficient Timeout: Set timeout longer than longest request duration Ignoring In-Flight Requests: Always wait for active requests to complete Not Closing Resources: Explicitly close databases, files, connections Immediate Exit: Don’t call os.Exit(0) without cleanup Performance Considerations Shutdown Delay: Balance between deployment speed and zero downtime Timeout Values: Consider 95th percentile request duration Resource Cleanup: Close in order of dependency Logging: Flush logs before exit Metrics: Report shutdown metrics for monitoring Conclusion Graceful shutdown is essential for production Go services to ensure zero-downtime deployments and data integrity. ...

    June 27, 2024 · 13 min · Rafiul Alam

    Fan-Out/Fan-In Pattern in Go

    Go Concurrency Patterns Series: ← Pipeline Pattern | Series Overview | Pub/Sub Pattern → What is the Fan-Out/Fan-In Pattern? The Fan-Out/Fan-In pattern is a powerful concurrency pattern that distributes work across multiple goroutines (fan-out) and then collects the results back into a single channel (fan-in). This pattern is perfect for parallelizing CPU-intensive tasks or I/O operations that can be processed independently. Fan-Out: Distribute work from one source to multiple workers Fan-In: Collect results from multiple workers into a single destination ...

    June 26, 2024 · 8 min · Rafiul Alam

    Go Concurrency Pattern: Select Statement

    Go Concurrency Patterns Series: ← Channel Fundamentals | Series Overview | Pipeline Pattern → What is the Select Statement? The select statement is Go’s powerful tool for handling multiple channel operations simultaneously. It’s like a switch statement, but for channels - it allows a goroutine to wait on multiple communication operations and proceed with whichever one becomes ready first. Think of select as a traffic controller at an intersection, managing multiple lanes of traffic (channels) and allowing the first available lane to proceed. This enables non-blocking communication, timeouts, and elegant multiplexing patterns that are essential for robust concurrent programs. ...

    June 26, 2024 · 12 min · Rafiul Alam

    Go Memory Model Explained

    Go Concurrency Patterns Series: ← Context Propagation | Series Overview | Graceful Shutdown → What is the Go Memory Model? The Go Memory Model specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine. Understanding this model is crucial for writing correct concurrent code without data races. Core Concepts: Happens-Before: Ordering guarantees between memory operations Memory Visibility: When writes in one goroutine are visible to reads in another Synchronization: Mechanisms that establish happens-before relationships Data Races: Concurrent memory accesses without proper synchronization Real-World Impact Correctness: Prevent subtle bugs in concurrent code Performance: Understand when synchronization is necessary Debugging: Diagnose race conditions and memory visibility issues Optimization: Make informed decisions about lock-free algorithms Code Review: Identify potential concurrency bugs The Happens-Before Relationship Definition A happens-before relationship guarantees that one event occurs before another in program order, and that effects of the first event are visible to the second. ...

    June 25, 2024 · 12 min · Rafiul Alam

    Context Propagation Patterns in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Memory Model → What is Context Propagation? Context propagation is the practice of threading context.Context through your application to carry cancellation signals, deadlines, and request-scoped values across API boundaries, goroutines, and service boundaries. This is critical for building observable, responsive distributed systems. Key Capabilities: Distributed Tracing: Propagate trace IDs across services Cancellation Cascades: Cancel entire request trees Deadline Enforcement: Ensure requests complete within time budgets Request-Scoped Values: Carry metadata without polluting function signatures Real-World Use Cases Microservices: Trace requests across multiple services API Gateways: Propagate timeouts and user context Database Layers: Cancel queries when requests are abandoned Message Queues: Propagate processing deadlines HTTP Middleware: Extract and inject trace headers gRPC Services: Automatic context propagation Basic Context Propagation Propagating Through Function Calls package main import ( "context" "fmt" "time" ) // ServiceA calls ServiceB which calls ServiceC // Context propagates through all layers func ServiceA(ctx context.Context, userID string) error { // Add request-scoped value ctx = context.WithValue(ctx, "user_id", userID) ctx = context.WithValue(ctx, "request_id", generateRequestID()) fmt.Printf("[ServiceA] Processing request for user: %s\n", userID) // Propagate context to next service return ServiceB(ctx) } func ServiceB(ctx context.Context) error { // Retrieve values from context userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceB] User: %s, Request: %s\n", userID, requestID) // Add timeout for downstream call ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() return ServiceC(ctx) } func ServiceC(ctx context.Context) error { userID := ctx.Value("user_id").(string) requestID := ctx.Value("request_id").(string) fmt.Printf("[ServiceC] Processing for User: %s, Request: %s\n", userID, requestID) // Simulate work select { case <-time.After(1 * time.Second): fmt.Println("[ServiceC] Work completed") return nil case <-ctx.Done(): fmt.Printf("[ServiceC] Cancelled: %v\n", ctx.Err()) return ctx.Err() } } func generateRequestID() string { return fmt.Sprintf("req-%d", time.Now().UnixNano()) } func main() { ctx := context.Background() err := ServiceA(ctx, "user-123") if err != nil { fmt.Printf("Error: %v\n", err) } } Output: ...

    June 24, 2024 · 11 min · Rafiul Alam

    Context Pattern in Go

    Go Concurrency Patterns Series: ← Once Pattern | Series Overview | Circuit Breaker → What is the Context Pattern? The Context pattern uses Go’s context package to carry cancellation signals, deadlines, timeouts, and request-scoped values across API boundaries and between goroutines. It’s essential for building responsive, cancellable operations and managing request lifecycles. Key Features: Cancellation: Signal when operations should stop Timeouts: Automatically cancel after a duration Deadlines: Cancel at a specific time Values: Carry request-scoped data Real-World Use Cases HTTP Servers: Request cancellation and timeouts Database Operations: Query timeouts and cancellation API Calls: External service timeouts Background Jobs: Graceful shutdown Microservices: Request tracing and correlation IDs File Operations: Long-running I/O with cancellation Basic Context Usage package main import ( "context" "fmt" "math/rand" "time" ) // simulateWork simulates a long-running operation func simulateWork(ctx context.Context, name string, duration time.Duration) error { fmt.Printf("%s: Starting work (expected duration: %v)\n", name, duration) select { case <-time.After(duration): fmt.Printf("%s: Work completed successfully\n", name) return nil case <-ctx.Done(): fmt.Printf("%s: Work cancelled: %v\n", name, ctx.Err()) return ctx.Err() } } func main() { // Example 1: Context with timeout fmt.Println("=== Context with Timeout ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel1() err := simulateWork(ctx1, "Task1", 1*time.Second) // Should complete if err != nil { fmt.Printf("Task1 error: %v\n", err) } err = simulateWork(ctx1, "Task2", 3*time.Second) // Should timeout if err != nil { fmt.Printf("Task2 error: %v\n", err) } // Example 2: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx2, cancel2 := context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) fmt.Println("Cancelling context...") cancel2() }() err = simulateWork(ctx2, "Task3", 3*time.Second) // Should be cancelled if err != nil { fmt.Printf("Task3 error: %v\n", err) } // Example 3: Context with deadline fmt.Println("\n=== Context with Deadline ===") deadline := time.Now().Add(1500 * time.Millisecond) ctx3, cancel3 := context.WithDeadline(context.Background(), deadline) defer cancel3() err = simulateWork(ctx3, "Task4", 2*time.Second) // Should hit deadline if err != nil { fmt.Printf("Task4 error: %v\n", err) } } Context with Values package main import ( "context" "fmt" "log" "net/http" "time" ) // Key types for context values type contextKey string const ( RequestIDKey contextKey = "requestID" UserIDKey contextKey = "userID" TraceIDKey contextKey = "traceID" ) // RequestInfo holds request-scoped information type RequestInfo struct { RequestID string UserID string TraceID string StartTime time.Time } // withRequestInfo adds request information to context func withRequestInfo(ctx context.Context, info RequestInfo) context.Context { ctx = context.WithValue(ctx, RequestIDKey, info.RequestID) ctx = context.WithValue(ctx, UserIDKey, info.UserID) ctx = context.WithValue(ctx, TraceIDKey, info.TraceID) return ctx } // getRequestID extracts request ID from context func getRequestID(ctx context.Context) string { if id, ok := ctx.Value(RequestIDKey).(string); ok { return id } return "unknown" } // getUserID extracts user ID from context func getUserID(ctx context.Context) string { if id, ok := ctx.Value(UserIDKey).(string); ok { return id } return "anonymous" } // getTraceID extracts trace ID from context func getTraceID(ctx context.Context) string { if id, ok := ctx.Value(TraceIDKey).(string); ok { return id } return "no-trace" } // logWithContext logs with context information func logWithContext(ctx context.Context, message string) { requestID := getRequestID(ctx) userID := getUserID(ctx) traceID := getTraceID(ctx) fmt.Printf("[%s][%s][%s] %s\n", requestID, userID, traceID, message) } // businessLogic simulates business logic that uses context func businessLogic(ctx context.Context) error { logWithContext(ctx, "Starting business logic") // Simulate some work select { case <-time.After(500 * time.Millisecond): logWithContext(ctx, "Business logic completed") return nil case <-ctx.Done(): logWithContext(ctx, "Business logic cancelled") return ctx.Err() } } // databaseOperation simulates a database operation func databaseOperation(ctx context.Context, query string) error { logWithContext(ctx, fmt.Sprintf("Executing query: %s", query)) select { case <-time.After(200 * time.Millisecond): logWithContext(ctx, "Database operation completed") return nil case <-ctx.Done(): logWithContext(ctx, "Database operation cancelled") return ctx.Err() } } // externalAPICall simulates calling an external API func externalAPICall(ctx context.Context, endpoint string) error { logWithContext(ctx, fmt.Sprintf("Calling external API: %s", endpoint)) select { case <-time.After(300 * time.Millisecond): logWithContext(ctx, "External API call completed") return nil case <-ctx.Done(): logWithContext(ctx, "External API call cancelled") return ctx.Err() } } // handleRequest simulates handling an HTTP request func handleRequest(ctx context.Context) error { logWithContext(ctx, "Handling request") // Perform multiple operations if err := databaseOperation(ctx, "SELECT * FROM users"); err != nil { return err } if err := externalAPICall(ctx, "/api/v1/data"); err != nil { return err } if err := businessLogic(ctx); err != nil { return err } logWithContext(ctx, "Request handled successfully") return nil } func main() { // Simulate incoming request requestInfo := RequestInfo{ RequestID: "req-12345", UserID: "user-67890", TraceID: "trace-abcdef", StartTime: time.Now(), } // Create context with timeout and request info ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() ctx = withRequestInfo(ctx, requestInfo) // Handle the request if err := handleRequest(ctx); err != nil { logWithContext(ctx, fmt.Sprintf("Request failed: %v", err)) } // Example with early cancellation fmt.Println("\n=== Early Cancellation Example ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) requestInfo2 := RequestInfo{ RequestID: "req-54321", UserID: "user-09876", TraceID: "trace-fedcba", StartTime: time.Now(), } ctx2 = withRequestInfo(ctx2, requestInfo2) // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) logWithContext(ctx2, "Cancelling request early") cancel2() }() if err := handleRequest(ctx2); err != nil { logWithContext(ctx2, fmt.Sprintf("Request failed: %v", err)) } } HTTP Server with Context package main import ( "context" "encoding/json" "fmt" "log" "math/rand" "net/http" "strconv" "time" ) // Response represents an API response type Response struct { Message string `json:"message"` RequestID string `json:"request_id"` Duration time.Duration `json:"duration"` Data interface{} `json:"data,omitempty"` } // middleware adds request ID and timeout to context func middleware(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Generate request ID requestID := fmt.Sprintf("req-%d", time.Now().UnixNano()) // Get timeout from query parameter (default 5 seconds) timeoutStr := r.URL.Query().Get("timeout") timeout := 5 * time.Second if timeoutStr != "" { if t, err := time.ParseDuration(timeoutStr); err == nil { timeout = t } } // Create context with timeout ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() // Add request ID to context ctx = context.WithValue(ctx, RequestIDKey, requestID) // Create new request with updated context r = r.WithContext(ctx) // Add request ID to response headers w.Header().Set("X-Request-ID", requestID) next(w, r) } } // simulateSlowOperation simulates a slow operation that respects context func simulateSlowOperation(ctx context.Context, duration time.Duration) (string, error) { select { case <-time.After(duration): return fmt.Sprintf("Operation completed after %v", duration), nil case <-ctx.Done(): return "", ctx.Err() } } // fastHandler handles requests quickly func fastHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) result, err := simulateSlowOperation(ctx, 100*time.Millisecond) duration := time.Since(start) response := Response{ RequestID: requestID, Duration: duration, } if err != nil { response.Message = "Request failed" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // slowHandler handles requests that might timeout func slowHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Random duration between 1-10 seconds duration := time.Duration(1+rand.Intn(10)) * time.Second result, err := simulateSlowOperation(ctx, duration) elapsed := time.Since(start) response := Response{ RequestID: requestID, Duration: elapsed, } if err != nil { response.Message = "Request timed out or cancelled" w.WriteHeader(http.StatusRequestTimeout) } else { response.Message = "Success" response.Data = result } json.NewEncoder(w).Encode(response) } // batchHandler processes multiple operations func batchHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() ctx := r.Context() requestID := getRequestID(ctx) // Get batch size from query parameter batchSizeStr := r.URL.Query().Get("size") batchSize := 3 if batchSizeStr != "" { if size, err := strconv.Atoi(batchSizeStr); err == nil && size > 0 { batchSize = size } } results := make([]string, 0, batchSize) // Process operations sequentially, checking context each time for i := 0; i < batchSize; i++ { select { case <-ctx.Done(): // Context cancelled, return partial results response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch cancelled after %d/%d operations", i, batchSize), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return default: } result, err := simulateSlowOperation(ctx, 200*time.Millisecond) if err != nil { response := Response{ RequestID: requestID, Duration: time.Since(start), Message: fmt.Sprintf("Batch failed at operation %d: %v", i+1, err), Data: results, } w.WriteHeader(http.StatusRequestTimeout) json.NewEncoder(w).Encode(response) return } results = append(results, fmt.Sprintf("Op%d: %s", i+1, result)) } response := Response{ RequestID: requestID, Duration: time.Since(start), Message: "Batch completed successfully", Data: results, } json.NewEncoder(w).Encode(response) } func main() { http.HandleFunc("/fast", middleware(fastHandler)) http.HandleFunc("/slow", middleware(slowHandler)) http.HandleFunc("/batch", middleware(batchHandler)) fmt.Println("Server starting on :8080") fmt.Println("Endpoints:") fmt.Println(" GET /fast - Fast operation (100ms)") fmt.Println(" GET /slow - Slow operation (1-10s random)") fmt.Println(" GET /batch?size=N - Batch operations") fmt.Println(" Add ?timeout=5s to set custom timeout") log.Fatal(http.ListenAndServe(":8080", nil)) } Context Propagation in Goroutines package main import ( "context" "fmt" "sync" "time" ) // Worker represents a worker that processes tasks type Worker struct { ID int Name string } // ProcessTask processes a task with context func (w *Worker) ProcessTask(ctx context.Context, taskID int) error { requestID := getRequestID(ctx) fmt.Printf("Worker %d (%s) [%s]: Starting task %d\n", w.ID, w.Name, requestID, taskID) // Simulate work with multiple steps for step := 1; step <= 3; step++ { select { case <-time.After(200 * time.Millisecond): fmt.Printf("Worker %d (%s) [%s]: Task %d step %d completed\n", w.ID, w.Name, requestID, taskID, step) case <-ctx.Done(): fmt.Printf("Worker %d (%s) [%s]: Task %d cancelled at step %d: %v\n", w.ID, w.Name, requestID, taskID, step, ctx.Err()) return ctx.Err() } } fmt.Printf("Worker %d (%s) [%s]: Task %d completed successfully\n", w.ID, w.Name, requestID, taskID) return nil } // TaskManager manages task distribution type TaskManager struct { workers []Worker } // NewTaskManager creates a new task manager func NewTaskManager() *TaskManager { return &TaskManager{ workers: []Worker{ {ID: 1, Name: "Alice"}, {ID: 2, Name: "Bob"}, {ID: 3, Name: "Charlie"}, }, } } // ProcessTasksConcurrently processes tasks using multiple workers func (tm *TaskManager) ProcessTasksConcurrently(ctx context.Context, taskCount int) error { var wg sync.WaitGroup taskChan := make(chan int, taskCount) errorChan := make(chan error, len(tm.workers)) // Send tasks to channel go func() { defer close(taskChan) for i := 1; i <= taskCount; i++ { select { case taskChan <- i: case <-ctx.Done(): return } } }() // Start workers for _, worker := range tm.workers { wg.Add(1) go func(w Worker) { defer wg.Done() for { select { case taskID, ok := <-taskChan: if !ok { return // No more tasks } if err := w.ProcessTask(ctx, taskID); err != nil { select { case errorChan <- err: case <-ctx.Done(): } return } case <-ctx.Done(): return } } }(worker) } // Wait for completion or cancellation done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: close(errorChan) // Check for errors for err := range errorChan { if err != nil { return err } } return nil case <-ctx.Done(): return ctx.Err() } } func main() { manager := NewTaskManager() // Example 1: Normal completion fmt.Println("=== Normal Completion ===") ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) ctx1 = context.WithValue(ctx1, RequestIDKey, "batch-001") defer cancel1() err := manager.ProcessTasksConcurrently(ctx1, 6) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 2: Timeout scenario fmt.Println("\n=== Timeout Scenario ===") ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) ctx2 = context.WithValue(ctx2, RequestIDKey, "batch-002") defer cancel2() err = manager.ProcessTasksConcurrently(ctx2, 10) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } time.Sleep(1 * time.Second) // Example 3: Manual cancellation fmt.Println("\n=== Manual Cancellation ===") ctx3, cancel3 := context.WithCancel(context.Background()) ctx3 = context.WithValue(ctx3, RequestIDKey, "batch-003") // Cancel after 800ms go func() { time.Sleep(800 * time.Millisecond) fmt.Println("Manually cancelling batch...") cancel3() }() err = manager.ProcessTasksConcurrently(ctx3, 8) if err != nil { fmt.Printf("Batch processing failed: %v\n", err) } else { fmt.Println("Batch processing completed successfully") } } Best Practices Always Accept Context: Functions that might block should accept context as first parameter Don’t Store Context: Pass context as parameter, don’t store in structs Use context.TODO(): When you don’t have context but need one Derive Contexts: Create child contexts from parent contexts Handle Cancellation: Always check ctx.Done() in long-running operations Limit Context Values: Use sparingly and for request-scoped data only Use Typed Keys: Define custom types for context keys to avoid collisions Common Pitfalls 1. Ignoring Context Cancellation // Bad: Ignoring context cancellation func badOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { // Long operation without checking context time.Sleep(10 * time.Millisecond) // Process item i } return nil } // Good: Checking context regularly func goodOperation(ctx context.Context) error { for i := 0; i < 1000; i++ { select { case <-ctx.Done(): return ctx.Err() default: } time.Sleep(10 * time.Millisecond) // Process item i } return nil } 2. Using Context for Optional Parameters // Bad: Using context for optional parameters func badFunction(ctx context.Context) { if timeout, ok := ctx.Value("timeout").(time.Duration); ok { // Use timeout } } // Good: Use function parameters for optional values func goodFunction(ctx context.Context, timeout time.Duration) { // Use timeout parameter } The Context pattern is fundamental for building robust, cancellable operations in Go. It enables graceful handling of timeouts, cancellations, and request-scoped data, making your applications more responsive and resource-efficient. ...

    June 19, 2024 · 10 min · Rafiul Alam