Mutex Patterns in Go

    Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern → What are Mutex Patterns? Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions. Key Types: Mutex: Exclusive access (one goroutine at a time) RWMutex: Reader-writer locks (multiple readers OR one writer) Lock-free: Atomic operations without explicit locks Real-World Use Cases Shared Counters: Statistics, metrics, rate limiting Cache Systems: Thread-safe caching with read/write operations Configuration Management: Safe updates to application config Connection Pools: Managing database/HTTP connection pools Resource Allocation: Tracking and managing limited resources State Machines: Protecting state transitions Basic Mutex Usage package main import ( "fmt" "sync" "time" ) // Counter demonstrates basic mutex usage type Counter struct { mu sync.Mutex value int } // Increment safely increments the counter func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } // Decrement safely decrements the counter func (c *Counter) Decrement() { c.mu.Lock() defer c.mu.Unlock() c.value-- } // Value safely returns the current value func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Add safely adds a value to the counter func (c *Counter) Add(delta int) { c.mu.Lock() defer c.mu.Unlock() c.value += delta } func main() { counter := &Counter{} var wg sync.WaitGroup // Start multiple goroutines incrementing the counter for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { counter.Increment() } fmt.Printf("Goroutine %d finished\n", id) }(i) } // Start some goroutines decrementing for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { counter.Decrement() } fmt.Printf("Decrementer %d finished\n", id) }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850 } RWMutex for Read-Heavy Workloads package main import ( "fmt" "math/rand" "sync" "time" ) // Cache demonstrates RWMutex usage for read-heavy scenarios type Cache struct { mu sync.RWMutex data map[string]interface{} } // NewCache creates a new cache func NewCache() *Cache { return &Cache{ data: make(map[string]interface{}), } } // Get retrieves a value (read operation) func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() defer c.mu.RUnlock() value, exists := c.data[key] return value, exists } // Set stores a value (write operation) func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() defer c.mu.Unlock() c.data[key] = value } // Delete removes a value (write operation) func (c *Cache) Delete(key string) { c.mu.Lock() defer c.mu.Unlock() delete(c.data, key) } // Keys returns all keys (read operation) func (c *Cache) Keys() []string { c.mu.RLock() defer c.mu.RUnlock() keys := make([]string, 0, len(c.data)) for key := range c.data { keys = append(keys, key) } return keys } // Size returns the number of items (read operation) func (c *Cache) Size() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) } // Clear removes all items (write operation) func (c *Cache) Clear() { c.mu.Lock() defer c.mu.Unlock() c.data = make(map[string]interface{}) } func main() { cache := NewCache() var wg sync.WaitGroup // Writers (fewer, less frequent) for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 50; j++ { key := fmt.Sprintf("key-%d-%d", id, j) cache.Set(key, fmt.Sprintf("value-%d-%d", id, j)) time.Sleep(10 * time.Millisecond) } }(i) } // Readers (many, frequent) for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { // Try to read random keys key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50)) if value, exists := cache.Get(key); exists { fmt.Printf("Reader %d found %s: %v\n", id, key, value) } time.Sleep(5 * time.Millisecond) } }(i) } // Size checker wg.Add(1) go func() { defer wg.Done() for i := 0; i < 20; i++ { size := cache.Size() fmt.Printf("Cache size: %d\n", size) time.Sleep(100 * time.Millisecond) } }() wg.Wait() fmt.Printf("Final cache size: %d\n", cache.Size()) } Advanced Mutex Patterns 1. Conditional Variables Pattern package main import ( "fmt" "sync" "time" ) // Buffer demonstrates conditional variables with mutex type Buffer struct { mu sync.Mutex notEmpty *sync.Cond notFull *sync.Cond items []interface{} capacity int } // NewBuffer creates a new bounded buffer func NewBuffer(capacity int) *Buffer { b := &Buffer{ items: make([]interface{}, 0, capacity), capacity: capacity, } b.notEmpty = sync.NewCond(&b.mu) b.notFull = sync.NewCond(&b.mu) return b } // Put adds an item to the buffer (blocks if full) func (b *Buffer) Put(item interface{}) { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is full for len(b.items) == b.capacity { b.notFull.Wait() } b.items = append(b.items, item) fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not empty b.notEmpty.Signal() } // Get removes an item from the buffer (blocks if empty) func (b *Buffer) Get() interface{} { b.mu.Lock() defer b.mu.Unlock() // Wait while buffer is empty for len(b.items) == 0 { b.notEmpty.Wait() } item := b.items[0] b.items = b.items[1:] fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items)) // Signal that buffer is not full b.notFull.Signal() return item } // Size returns current buffer size func (b *Buffer) Size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.items) } func main() { buffer := NewBuffer(3) var wg sync.WaitGroup // Producers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := fmt.Sprintf("item-%d-%d", id, j) buffer.Put(item) time.Sleep(200 * time.Millisecond) } }(i) } // Consumers for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { item := buffer.Get() fmt.Printf("Consumer %d processed: %v\n", id, item) time.Sleep(300 * time.Millisecond) } }(i) } wg.Wait() } 2. Lock-Free Patterns with Atomic Operations package main import ( "fmt" "sync" "sync/atomic" "time" ) // AtomicCounter demonstrates lock-free counter type AtomicCounter struct { value int64 } // Increment atomically increments the counter func (ac *AtomicCounter) Increment() int64 { return atomic.AddInt64(&ac.value, 1) } // Decrement atomically decrements the counter func (ac *AtomicCounter) Decrement() int64 { return atomic.AddInt64(&ac.value, -1) } // Value atomically reads the current value func (ac *AtomicCounter) Value() int64 { return atomic.LoadInt64(&ac.value) } // CompareAndSwap atomically compares and swaps func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&ac.value, old, new) } // AtomicFlag demonstrates atomic boolean operations type AtomicFlag struct { flag int32 } // Set atomically sets the flag to true func (af *AtomicFlag) Set() { atomic.StoreInt32(&af.flag, 1) } // Clear atomically sets the flag to false func (af *AtomicFlag) Clear() { atomic.StoreInt32(&af.flag, 0) } // IsSet atomically checks if flag is set func (af *AtomicFlag) IsSet() bool { return atomic.LoadInt32(&af.flag) == 1 } // TestAndSet atomically tests and sets the flag func (af *AtomicFlag) TestAndSet() bool { return atomic.SwapInt32(&af.flag, 1) == 1 } func main() { counter := &AtomicCounter{} flag := &AtomicFlag{} var wg sync.WaitGroup // Test atomic counter fmt.Println("Testing atomic counter...") for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }(i) } wg.Wait() fmt.Printf("Final counter value: %d\n", counter.Value()) // Test atomic flag fmt.Println("\nTesting atomic flag...") // Multiple goroutines trying to set flag for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() if !flag.TestAndSet() { fmt.Printf("Goroutine %d acquired the flag\n", id) time.Sleep(100 * time.Millisecond) flag.Clear() fmt.Printf("Goroutine %d released the flag\n", id) } else { fmt.Printf("Goroutine %d failed to acquire flag\n", id) } }(i) } wg.Wait() } 3. Resource Pool Pattern package main import ( "errors" "fmt" "sync" "time" ) // Resource represents a limited resource type Resource struct { ID int Data string } // ResourcePool manages a pool of limited resources type ResourcePool struct { mu sync.Mutex resources []*Resource available chan *Resource maxSize int created int } // NewResourcePool creates a new resource pool func NewResourcePool(maxSize int) *ResourcePool { return &ResourcePool{ resources: make([]*Resource, 0, maxSize), available: make(chan *Resource, maxSize), maxSize: maxSize, } } // createResource creates a new resource func (rp *ResourcePool) createResource() *Resource { rp.mu.Lock() defer rp.mu.Unlock() if rp.created >= rp.maxSize { return nil } rp.created++ resource := &Resource{ ID: rp.created, Data: fmt.Sprintf("Resource-%d", rp.created), } fmt.Printf("Created resource %d\n", resource.ID) return resource } // Acquire gets a resource from the pool func (rp *ResourcePool) Acquire() (*Resource, error) { select { case resource := <-rp.available: fmt.Printf("Acquired existing resource %d\n", resource.ID) return resource, nil default: // No available resource, try to create one if resource := rp.createResource(); resource != nil { return resource, nil } // Pool is full, wait for available resource fmt.Println("Pool full, waiting for available resource...") select { case resource := <-rp.available: fmt.Printf("Acquired resource %d after waiting\n", resource.ID) return resource, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout waiting for resource") } } } // Release returns a resource to the pool func (rp *ResourcePool) Release(resource *Resource) { select { case rp.available <- resource: fmt.Printf("Released resource %d\n", resource.ID) default: // Channel full, resource will be garbage collected fmt.Printf("Pool full, discarding resource %d\n", resource.ID) } } // Size returns current pool statistics func (rp *ResourcePool) Size() (available, created int) { rp.mu.Lock() defer rp.mu.Unlock() return len(rp.available), rp.created } func main() { pool := NewResourcePool(3) var wg sync.WaitGroup // Multiple goroutines using resources for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := pool.Acquire() if err != nil { fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err) return } fmt.Printf("Worker %d using resource %d\n", id, resource.ID) // Simulate work time.Sleep(time.Duration(200+id*100) * time.Millisecond) pool.Release(resource) fmt.Printf("Worker %d finished\n", id) }(i) } wg.Wait() available, created := pool.Size() fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created) } Best Practices Always Use defer: Ensure locks are released even if panic occurs Keep Critical Sections Small: Minimize time holding locks Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies Use RWMutex for Read-Heavy: Better performance for read-heavy workloads Consider Lock-Free: Use atomic operations when possible Document Lock Order: If multiple locks needed, establish clear ordering Prefer Channels: Use channels for communication, locks for shared state Common Pitfalls 1. Deadlocks // Bad: Potential deadlock with nested locks type BadAccount struct { mu sync.Mutex balance int } func (a *BadAccount) Transfer(to *BadAccount, amount int) { a.mu.Lock() defer a.mu.Unlock() to.mu.Lock() // Potential deadlock if called concurrently defer to.mu.Unlock() a.balance -= amount to.balance += amount } // Good: Ordered locking to prevent deadlock func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) { // Always acquire locks in consistent order first, second := a, to if a.ID > to.ID { first, second = to, a } first.mu.Lock() defer first.mu.Unlock() second.mu.Lock() defer second.mu.Unlock() a.balance -= amount to.balance += amount } 2. Race Conditions // Bad: Race condition type BadCounter struct { mu sync.Mutex value int } func (c *BadCounter) IncrementIfEven() { if c.value%2 == 0 { // Race: value might change between check and increment c.mu.Lock() c.value++ c.mu.Unlock() } } // Good: Atomic check and update func (c *BadCounter) SafeIncrementIfEven() { c.mu.Lock() defer c.mu.Unlock() if c.value%2 == 0 { c.value++ } } Testing Concurrent Code package main import ( "sync" "testing" ) func TestCounter(t *testing.T) { counter := &Counter{} var wg sync.WaitGroup goroutines := 100 increments := 1000 for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < increments; j++ { counter.Increment() } }() } wg.Wait() expected := goroutines * increments if counter.Value() != expected { t.Errorf("Expected %d, got %d", expected, counter.Value()) } } // Run with: go test -race func TestCounterRace(t *testing.T) { counter := &Counter{} go func() { for i := 0; i < 1000; i++ { counter.Increment() } }() go func() { for i := 0; i < 1000; i++ { _ = counter.Value() } }() } Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios. ...

    July 3, 2024 · 10 min · Rafiul Alam