Go Concurrency Patterns Series: ← Worker Pool | Series Overview | WaitGroup Pattern →


What are Mutex Patterns?

Mutex (mutual exclusion) patterns are essential for protecting shared resources in concurrent programs. Go provides sync.Mutex and sync.RWMutex for controlling access to critical sections, ensuring data consistency and preventing race conditions.

Key Types:

  • Mutex: Exclusive access (one goroutine at a time)
  • RWMutex: Reader-writer locks (multiple readers OR one writer)
  • Lock-free: Atomic operations without explicit locks

Real-World Use Cases

  • Shared Counters: Statistics, metrics, rate limiting
  • Cache Systems: Thread-safe caching with read/write operations
  • Configuration Management: Safe updates to application config
  • Connection Pools: Managing database/HTTP connection pools
  • Resource Allocation: Tracking and managing limited resources
  • State Machines: Protecting state transitions

Basic Mutex Usage

package main

import (
    "fmt"
    "sync"
    "time"
)

// Counter demonstrates basic mutex usage
type Counter struct {
    mu    sync.Mutex
    value int
}

// Increment safely increments the counter
func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

// Decrement safely decrements the counter
func (c *Counter) Decrement() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value--
}

// Value safely returns the current value
func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// Add safely adds a value to the counter
func (c *Counter) Add(delta int) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value += delta
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    // Start multiple goroutines incrementing the counter
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Increment()
            }
            fmt.Printf("Goroutine %d finished\n", id)
        }(i)
    }
    
    // Start some goroutines decrementing
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 50; j++ {
                counter.Decrement()
            }
            fmt.Printf("Decrementer %d finished\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.Value())
    // Expected: (10 * 100) - (3 * 50) = 1000 - 150 = 850
}

RWMutex for Read-Heavy Workloads

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Cache demonstrates RWMutex usage for read-heavy scenarios
type Cache struct {
    mu   sync.RWMutex
    data map[string]interface{}
}

// NewCache creates a new cache
func NewCache() *Cache {
    return &Cache{
        data: make(map[string]interface{}),
    }
}

// Get retrieves a value (read operation)
func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    
    value, exists := c.data[key]
    return value, exists
}

// Set stores a value (write operation)
func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    c.data[key] = value
}

// Delete removes a value (write operation)
func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    delete(c.data, key)
}

// Keys returns all keys (read operation)
func (c *Cache) Keys() []string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    
    keys := make([]string, 0, len(c.data))
    for key := range c.data {
        keys = append(keys, key)
    }
    return keys
}

// Size returns the number of items (read operation)
func (c *Cache) Size() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    
    return len(c.data)
}

// Clear removes all items (write operation)
func (c *Cache) Clear() {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    c.data = make(map[string]interface{})
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup
    
    // Writers (fewer, less frequent)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 50; j++ {
                key := fmt.Sprintf("key-%d-%d", id, j)
                cache.Set(key, fmt.Sprintf("value-%d-%d", id, j))
                time.Sleep(10 * time.Millisecond)
            }
        }(i)
    }
    
    // Readers (many, frequent)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                // Try to read random keys
                key := fmt.Sprintf("key-%d-%d", rand.Intn(3), rand.Intn(50))
                if value, exists := cache.Get(key); exists {
                    fmt.Printf("Reader %d found %s: %v\n", id, key, value)
                }
                time.Sleep(5 * time.Millisecond)
            }
        }(i)
    }
    
    // Size checker
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 20; i++ {
            size := cache.Size()
            fmt.Printf("Cache size: %d\n", size)
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    wg.Wait()
    fmt.Printf("Final cache size: %d\n", cache.Size())
}

Advanced Mutex Patterns

1. Conditional Variables Pattern

package main

import (
    "fmt"
    "sync"
    "time"
)

// Buffer demonstrates conditional variables with mutex
type Buffer struct {
    mu       sync.Mutex
    notEmpty *sync.Cond
    notFull  *sync.Cond
    items    []interface{}
    capacity int
}

// NewBuffer creates a new bounded buffer
func NewBuffer(capacity int) *Buffer {
    b := &Buffer{
        items:    make([]interface{}, 0, capacity),
        capacity: capacity,
    }
    b.notEmpty = sync.NewCond(&b.mu)
    b.notFull = sync.NewCond(&b.mu)
    return b
}

// Put adds an item to the buffer (blocks if full)
func (b *Buffer) Put(item interface{}) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    // Wait while buffer is full
    for len(b.items) == b.capacity {
        b.notFull.Wait()
    }
    
    b.items = append(b.items, item)
    fmt.Printf("Put item: %v (buffer size: %d)\n", item, len(b.items))
    
    // Signal that buffer is not empty
    b.notEmpty.Signal()
}

// Get removes an item from the buffer (blocks if empty)
func (b *Buffer) Get() interface{} {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    // Wait while buffer is empty
    for len(b.items) == 0 {
        b.notEmpty.Wait()
    }
    
    item := b.items[0]
    b.items = b.items[1:]
    fmt.Printf("Got item: %v (buffer size: %d)\n", item, len(b.items))
    
    // Signal that buffer is not full
    b.notFull.Signal()
    
    return item
}

// Size returns current buffer size
func (b *Buffer) Size() int {
    b.mu.Lock()
    defer b.mu.Unlock()
    return len(b.items)
}

func main() {
    buffer := NewBuffer(3)
    var wg sync.WaitGroup
    
    // Producers
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                item := fmt.Sprintf("item-%d-%d", id, j)
                buffer.Put(item)
                time.Sleep(200 * time.Millisecond)
            }
        }(i)
    }
    
    // Consumers
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                item := buffer.Get()
                fmt.Printf("Consumer %d processed: %v\n", id, item)
                time.Sleep(300 * time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
}

2. Lock-Free Patterns with Atomic Operations

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// AtomicCounter demonstrates lock-free counter
type AtomicCounter struct {
    value int64
}

// Increment atomically increments the counter
func (ac *AtomicCounter) Increment() int64 {
    return atomic.AddInt64(&ac.value, 1)
}

// Decrement atomically decrements the counter
func (ac *AtomicCounter) Decrement() int64 {
    return atomic.AddInt64(&ac.value, -1)
}

// Value atomically reads the current value
func (ac *AtomicCounter) Value() int64 {
    return atomic.LoadInt64(&ac.value)
}

// CompareAndSwap atomically compares and swaps
func (ac *AtomicCounter) CompareAndSwap(old, new int64) bool {
    return atomic.CompareAndSwapInt64(&ac.value, old, new)
}

// AtomicFlag demonstrates atomic boolean operations
type AtomicFlag struct {
    flag int32
}

// Set atomically sets the flag to true
func (af *AtomicFlag) Set() {
    atomic.StoreInt32(&af.flag, 1)
}

// Clear atomically sets the flag to false
func (af *AtomicFlag) Clear() {
    atomic.StoreInt32(&af.flag, 0)
}

// IsSet atomically checks if flag is set
func (af *AtomicFlag) IsSet() bool {
    return atomic.LoadInt32(&af.flag) == 1
}

// TestAndSet atomically tests and sets the flag
func (af *AtomicFlag) TestAndSet() bool {
    return atomic.SwapInt32(&af.flag, 1) == 1
}

func main() {
    counter := &AtomicCounter{}
    flag := &AtomicFlag{}
    var wg sync.WaitGroup
    
    // Test atomic counter
    fmt.Println("Testing atomic counter...")
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter.Increment()
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.Value())
    
    // Test atomic flag
    fmt.Println("\nTesting atomic flag...")
    
    // Multiple goroutines trying to set flag
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            if !flag.TestAndSet() {
                fmt.Printf("Goroutine %d acquired the flag\n", id)
                time.Sleep(100 * time.Millisecond)
                flag.Clear()
                fmt.Printf("Goroutine %d released the flag\n", id)
            } else {
                fmt.Printf("Goroutine %d failed to acquire flag\n", id)
            }
        }(i)
    }
    
    wg.Wait()
}

3. Resource Pool Pattern

package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

// Resource represents a limited resource
type Resource struct {
    ID   int
    Data string
}

// ResourcePool manages a pool of limited resources
type ResourcePool struct {
    mu        sync.Mutex
    resources []*Resource
    available chan *Resource
    maxSize   int
    created   int
}

// NewResourcePool creates a new resource pool
func NewResourcePool(maxSize int) *ResourcePool {
    return &ResourcePool{
        resources: make([]*Resource, 0, maxSize),
        available: make(chan *Resource, maxSize),
        maxSize:   maxSize,
    }
}

// createResource creates a new resource
func (rp *ResourcePool) createResource() *Resource {
    rp.mu.Lock()
    defer rp.mu.Unlock()
    
    if rp.created >= rp.maxSize {
        return nil
    }
    
    rp.created++
    resource := &Resource{
        ID:   rp.created,
        Data: fmt.Sprintf("Resource-%d", rp.created),
    }
    
    fmt.Printf("Created resource %d\n", resource.ID)
    return resource
}

// Acquire gets a resource from the pool
func (rp *ResourcePool) Acquire() (*Resource, error) {
    select {
    case resource := <-rp.available:
        fmt.Printf("Acquired existing resource %d\n", resource.ID)
        return resource, nil
    default:
        // No available resource, try to create one
        if resource := rp.createResource(); resource != nil {
            return resource, nil
        }
        
        // Pool is full, wait for available resource
        fmt.Println("Pool full, waiting for available resource...")
        select {
        case resource := <-rp.available:
            fmt.Printf("Acquired resource %d after waiting\n", resource.ID)
            return resource, nil
        case <-time.After(5 * time.Second):
            return nil, errors.New("timeout waiting for resource")
        }
    }
}

// Release returns a resource to the pool
func (rp *ResourcePool) Release(resource *Resource) {
    select {
    case rp.available <- resource:
        fmt.Printf("Released resource %d\n", resource.ID)
    default:
        // Channel full, resource will be garbage collected
        fmt.Printf("Pool full, discarding resource %d\n", resource.ID)
    }
}

// Size returns current pool statistics
func (rp *ResourcePool) Size() (available, created int) {
    rp.mu.Lock()
    defer rp.mu.Unlock()
    return len(rp.available), rp.created
}

func main() {
    pool := NewResourcePool(3)
    var wg sync.WaitGroup
    
    // Multiple goroutines using resources
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            resource, err := pool.Acquire()
            if err != nil {
                fmt.Printf("Worker %d failed to acquire resource: %v\n", id, err)
                return
            }
            
            fmt.Printf("Worker %d using resource %d\n", id, resource.ID)
            
            // Simulate work
            time.Sleep(time.Duration(200+id*100) * time.Millisecond)
            
            pool.Release(resource)
            fmt.Printf("Worker %d finished\n", id)
        }(i)
    }
    
    wg.Wait()
    
    available, created := pool.Size()
    fmt.Printf("Final pool state - Available: %d, Created: %d\n", available, created)
}

Best Practices

  1. Always Use defer: Ensure locks are released even if panic occurs
  2. Keep Critical Sections Small: Minimize time holding locks
  3. Avoid Nested Locks: Prevent deadlocks by avoiding lock hierarchies
  4. Use RWMutex for Read-Heavy: Better performance for read-heavy workloads
  5. Consider Lock-Free: Use atomic operations when possible
  6. Document Lock Order: If multiple locks needed, establish clear ordering
  7. Prefer Channels: Use channels for communication, locks for shared state

Common Pitfalls

1. Deadlocks

// ❌ Bad: Potential deadlock with nested locks
type BadAccount struct {
    mu      sync.Mutex
    balance int
}

func (a *BadAccount) Transfer(to *BadAccount, amount int) {
    a.mu.Lock()
    defer a.mu.Unlock()
    
    to.mu.Lock() // Potential deadlock if called concurrently
    defer to.mu.Unlock()
    
    a.balance -= amount
    to.balance += amount
}

// ✅ Good: Ordered locking to prevent deadlock
func (a *BadAccount) SafeTransfer(to *BadAccount, amount int) {
    // Always acquire locks in consistent order
    first, second := a, to
    if a.ID > to.ID {
        first, second = to, a
    }
    
    first.mu.Lock()
    defer first.mu.Unlock()
    
    second.mu.Lock()
    defer second.mu.Unlock()
    
    a.balance -= amount
    to.balance += amount
}

2. Race Conditions

// ❌ Bad: Race condition
type BadCounter struct {
    mu    sync.Mutex
    value int
}

func (c *BadCounter) IncrementIfEven() {
    if c.value%2 == 0 { // Race: value might change between check and increment
        c.mu.Lock()
        c.value++
        c.mu.Unlock()
    }
}

// ✅ Good: Atomic check and update
func (c *BadCounter) SafeIncrementIfEven() {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if c.value%2 == 0 {
        c.value++
    }
}

Testing Concurrent Code

package main

import (
    "sync"
    "testing"
)

func TestCounter(t *testing.T) {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    goroutines := 100
    increments := 1000
    
    for i := 0; i < goroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < increments; j++ {
                counter.Increment()
            }
        }()
    }
    
    wg.Wait()
    
    expected := goroutines * increments
    if counter.Value() != expected {
        t.Errorf("Expected %d, got %d", expected, counter.Value())
    }
}

// Run with: go test -race
func TestCounterRace(t *testing.T) {
    counter := &Counter{}
    
    go func() {
        for i := 0; i < 1000; i++ {
            counter.Increment()
        }
    }()
    
    go func() {
        for i := 0; i < 1000; i++ {
            _ = counter.Value()
        }
    }()
}

Mutex patterns are fundamental for building safe concurrent applications in Go. Choose the right synchronization primitive based on your access patterns: use sync.Mutex for exclusive access, sync.RWMutex for read-heavy workloads, and atomic operations for simple lock-free scenarios.


Next: Learn about WaitGroup Pattern for coordinating goroutine completion.