Go Concurrency Patterns Series: ← Request/Response | Series Overview | Mutex Patterns →


What is the Worker Pool Pattern?

The Worker Pool pattern manages a fixed number of worker goroutines that process jobs from a shared queue. This pattern is essential for controlling resource usage, preventing system overload, and ensuring predictable performance under varying loads.

Key Components:

  • Job Queue: Channel containing work to be processed
  • Worker Pool: Fixed number of worker goroutines
  • Result Channel: Optional channel for collecting results
  • Dispatcher: Coordinates job distribution to workers

Real-World Use Cases

  • Image Processing: Resize/compress images with limited CPU cores
  • Database Operations: Limit concurrent database connections
  • API Rate Limiting: Control outbound API call rates
  • File Processing: Process files with bounded I/O operations
  • Web Scraping: Limit concurrent HTTP requests
  • Background Jobs: Process queued tasks with resource limits

Basic Worker Pool Implementation

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Job represents work to be processed
type Job struct {
    ID   int
    Data interface{}
}

// Result represents the outcome of processing a job
type Result struct {
    JobID  int
    Output interface{}
    Error  error
}

// WorkerPool manages a pool of workers
type WorkerPool struct {
    workerCount int
    jobQueue    chan Job
    resultQueue chan Result
    quit        chan bool
    wg          sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(workerCount, jobQueueSize int) *WorkerPool {
    return &WorkerPool{
        workerCount: workerCount,
        jobQueue:    make(chan Job, jobQueueSize),
        resultQueue: make(chan Result, jobQueueSize),
        quit:        make(chan bool),
    }
}

// Start initializes and starts all workers
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workerCount; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

// worker processes jobs from the job queue
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for {
        select {
        case job := <-wp.jobQueue:
            fmt.Printf("Worker %d processing job %d\n", id, job.ID)
            result := wp.processJob(job)
            wp.resultQueue <- result
            
        case <-wp.quit:
            fmt.Printf("Worker %d stopping\n", id)
            return
        }
    }
}

// processJob simulates job processing
func (wp *WorkerPool) processJob(job Job) Result {
    // Simulate work
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    
    // Process the job (example: square the number)
    if num, ok := job.Data.(int); ok {
        return Result{
            JobID:  job.ID,
            Output: num * num,
        }
    }
    
    return Result{
        JobID: job.ID,
        Error: fmt.Errorf("invalid job data"),
    }
}

// Submit adds a job to the queue
func (wp *WorkerPool) Submit(job Job) {
    wp.jobQueue <- job
}

// Results returns the result channel
func (wp *WorkerPool) Results() <-chan Result {
    return wp.resultQueue
}

// Stop gracefully shuts down the worker pool
func (wp *WorkerPool) Stop() {
    close(wp.quit)
    wp.wg.Wait()
    close(wp.jobQueue)
    close(wp.resultQueue)
}

func main() {
    // Create worker pool with 3 workers
    pool := NewWorkerPool(3, 10)
    pool.Start()
    defer pool.Stop()
    
    // Submit jobs
    go func() {
        for i := 1; i <= 10; i++ {
            job := Job{
                ID:   i,
                Data: i * 10,
            }
            pool.Submit(job)
        }
    }()
    
    // Collect results
    for i := 0; i < 10; i++ {
        result := <-pool.Results()
        if result.Error != nil {
            fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error)
        } else {
            fmt.Printf("Job %d result: %v\n", result.JobID, result.Output)
        }
    }
}

Advanced Worker Pool with Context

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ContextJob includes context for cancellation
type ContextJob struct {
    ID      string
    Data    interface{}
    Context context.Context
}

// ContextResult includes timing and context information
type ContextResult struct {
    JobID     string
    Output    interface{}
    Error     error
    Duration  time.Duration
    WorkerID  int
}

// AdvancedWorkerPool supports context cancellation and monitoring
type AdvancedWorkerPool struct {
    workerCount   int
    jobQueue      chan ContextJob
    resultQueue   chan ContextResult
    ctx           context.Context
    cancel        context.CancelFunc
    wg            sync.WaitGroup
    metrics       *PoolMetrics
}

// PoolMetrics tracks worker pool performance
type PoolMetrics struct {
    mu              sync.RWMutex
    jobsProcessed   int64
    jobsFailed      int64
    totalDuration   time.Duration
    activeWorkers   int
}

func (pm *PoolMetrics) RecordJob(duration time.Duration, success bool) {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    
    if success {
        pm.jobsProcessed++
    } else {
        pm.jobsFailed++
    }
    pm.totalDuration += duration
}

func (pm *PoolMetrics) SetActiveWorkers(count int) {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    pm.activeWorkers = count
}

func (pm *PoolMetrics) GetStats() (processed, failed int64, avgDuration time.Duration, active int) {
    pm.mu.RLock()
    defer pm.mu.RUnlock()
    
    processed = pm.jobsProcessed
    failed = pm.jobsFailed
    active = pm.activeWorkers
    
    if pm.jobsProcessed > 0 {
        avgDuration = pm.totalDuration / time.Duration(pm.jobsProcessed)
    }
    
    return
}

// NewAdvancedWorkerPool creates a new advanced worker pool
func NewAdvancedWorkerPool(ctx context.Context, workerCount, queueSize int) *AdvancedWorkerPool {
    poolCtx, cancel := context.WithCancel(ctx)
    
    return &AdvancedWorkerPool{
        workerCount: workerCount,
        jobQueue:    make(chan ContextJob, queueSize),
        resultQueue: make(chan ContextResult, queueSize),
        ctx:         poolCtx,
        cancel:      cancel,
        metrics:     &PoolMetrics{},
    }
}

// Start begins processing with all workers
func (awp *AdvancedWorkerPool) Start() {
    awp.metrics.SetActiveWorkers(awp.workerCount)
    
    for i := 0; i < awp.workerCount; i++ {
        awp.wg.Add(1)
        go awp.worker(i)
    }
    
    // Start metrics reporter
    go awp.reportMetrics()
}

// worker processes jobs with context support
func (awp *AdvancedWorkerPool) worker(id int) {
    defer awp.wg.Done()
    
    for {
        select {
        case job := <-awp.jobQueue:
            start := time.Now()
            result := awp.processContextJob(job, id)
            duration := time.Since(start)
            
            awp.metrics.RecordJob(duration, result.Error == nil)
            
            select {
            case awp.resultQueue <- result:
            case <-awp.ctx.Done():
                return
            }
            
        case <-awp.ctx.Done():
            fmt.Printf("Worker %d shutting down\n", id)
            return
        }
    }
}

// processContextJob handles job processing with context
func (awp *AdvancedWorkerPool) processContextJob(job ContextJob, workerID int) ContextResult {
    start := time.Now()
    
    // Check if job context is already cancelled
    select {
    case <-job.Context.Done():
        return ContextResult{
            JobID:    job.ID,
            Error:    job.Context.Err(),
            Duration: time.Since(start),
            WorkerID: workerID,
        }
    default:
    }
    
    // Simulate work that respects context cancellation
    workDone := make(chan interface{}, 1)
    workErr := make(chan error, 1)
    
    go func() {
        // Simulate processing time
        time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
        
        if num, ok := job.Data.(int); ok {
            workDone <- num * num
        } else {
            workErr <- fmt.Errorf("invalid data type")
        }
    }()
    
    select {
    case result := <-workDone:
        return ContextResult{
            JobID:    job.ID,
            Output:   result,
            Duration: time.Since(start),
            WorkerID: workerID,
        }
    case err := <-workErr:
        return ContextResult{
            JobID:    job.ID,
            Error:    err,
            Duration: time.Since(start),
            WorkerID: workerID,
        }
    case <-job.Context.Done():
        return ContextResult{
            JobID:    job.ID,
            Error:    job.Context.Err(),
            Duration: time.Since(start),
            WorkerID: workerID,
        }
    case <-awp.ctx.Done():
        return ContextResult{
            JobID:    job.ID,
            Error:    awp.ctx.Err(),
            Duration: time.Since(start),
            WorkerID: workerID,
        }
    }
}

// Submit adds a job to the queue
func (awp *AdvancedWorkerPool) Submit(job ContextJob) error {
    select {
    case awp.jobQueue <- job:
        return nil
    case <-awp.ctx.Done():
        return awp.ctx.Err()
    }
}

// Results returns the result channel
func (awp *AdvancedWorkerPool) Results() <-chan ContextResult {
    return awp.resultQueue
}

// reportMetrics periodically reports pool statistics
func (awp *AdvancedWorkerPool) reportMetrics() {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            processed, failed, avgDuration, active := awp.metrics.GetStats()
            fmt.Printf("Pool Stats - Processed: %d, Failed: %d, Avg Duration: %v, Active Workers: %d\n",
                processed, failed, avgDuration, active)
        case <-awp.ctx.Done():
            return
        }
    }
}

// Stop gracefully shuts down the worker pool
func (awp *AdvancedWorkerPool) Stop() {
    awp.cancel()
    awp.wg.Wait()
    close(awp.jobQueue)
    close(awp.resultQueue)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    pool := NewAdvancedWorkerPool(ctx, 4, 20)
    pool.Start()
    defer pool.Stop()
    
    // Submit jobs with individual timeouts
    go func() {
        for i := 1; i <= 15; i++ {
            jobCtx, jobCancel := context.WithTimeout(ctx, 200*time.Millisecond)
            
            job := ContextJob{
                ID:      fmt.Sprintf("job-%d", i),
                Data:    i * 5,
                Context: jobCtx,
            }
            
            if err := pool.Submit(job); err != nil {
                fmt.Printf("Failed to submit job %d: %v\n", i, err)
                jobCancel()
                break
            }
            
            // Cancel some jobs early to demonstrate cancellation
            if i%5 == 0 {
                go func() {
                    time.Sleep(50 * time.Millisecond)
                    jobCancel()
                }()
            } else {
                defer jobCancel()
            }
        }
    }()
    
    // Collect results
    resultCount := 0
    for result := range pool.Results() {
        resultCount++
        if result.Error != nil {
            fmt.Printf("Job %s failed (worker %d): %v (took %v)\n",
                result.JobID, result.WorkerID, result.Error, result.Duration)
        } else {
            fmt.Printf("Job %s completed (worker %d): %v (took %v)\n",
                result.JobID, result.WorkerID, result.Output, result.Duration)
        }
        
        if resultCount >= 15 {
            break
        }
    }
}

Dynamic Worker Pool

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// DynamicWorkerPool can scale workers up and down based on load
type DynamicWorkerPool struct {
    minWorkers    int
    maxWorkers    int
    currentWorkers int64
    jobQueue      chan Job
    resultQueue   chan Result
    ctx           context.Context
    cancel        context.CancelFunc
    wg            sync.WaitGroup
    workerControl chan int // +1 to add worker, -1 to remove worker
    metrics       *DynamicMetrics
}

// DynamicMetrics tracks load and performance for scaling decisions
type DynamicMetrics struct {
    mu                sync.RWMutex
    queueLength       int64
    avgProcessingTime time.Duration
    lastScaleTime     time.Time
    scaleUpThreshold  int
    scaleDownThreshold int
}

func (dm *DynamicMetrics) UpdateQueueLength(length int) {
    atomic.StoreInt64(&dm.queueLength, int64(length))
}

func (dm *DynamicMetrics) GetQueueLength() int {
    return int(atomic.LoadInt64(&dm.queueLength))
}

func (dm *DynamicMetrics) ShouldScaleUp(currentWorkers int, maxWorkers int) bool {
    dm.mu.RLock()
    defer dm.mu.RUnlock()
    
    return currentWorkers < maxWorkers &&
        dm.GetQueueLength() > dm.scaleUpThreshold &&
        time.Since(dm.lastScaleTime) > 5*time.Second
}

func (dm *DynamicMetrics) ShouldScaleDown(currentWorkers int, minWorkers int) bool {
    dm.mu.RLock()
    defer dm.mu.RUnlock()
    
    return currentWorkers > minWorkers &&
        dm.GetQueueLength() < dm.scaleDownThreshold &&
        time.Since(dm.lastScaleTime) > 10*time.Second
}

func (dm *DynamicMetrics) RecordScale() {
    dm.mu.Lock()
    defer dm.mu.Unlock()
    dm.lastScaleTime = time.Now()
}

// NewDynamicWorkerPool creates a new dynamic worker pool
func NewDynamicWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool {
    poolCtx, cancel := context.WithCancel(ctx)
    
    return &DynamicWorkerPool{
        minWorkers:    minWorkers,
        maxWorkers:    maxWorkers,
        currentWorkers: 0,
        jobQueue:      make(chan Job, queueSize),
        resultQueue:   make(chan Result, queueSize),
        ctx:           poolCtx,
        cancel:        cancel,
        workerControl: make(chan int, maxWorkers),
        metrics: &DynamicMetrics{
            scaleUpThreshold:   queueSize / 2,
            scaleDownThreshold: queueSize / 4,
        },
    }
}

// Start initializes the pool with minimum workers
func (dwp *DynamicWorkerPool) Start() {
    // Start with minimum workers
    for i := 0; i < dwp.minWorkers; i++ {
        dwp.addWorker()
    }
    
    // Start the scaler
    go dwp.scaler()
    
    // Start queue monitor
    go dwp.queueMonitor()
}

// addWorker creates and starts a new worker
func (dwp *DynamicWorkerPool) addWorker() {
    workerID := atomic.AddInt64(&dwp.currentWorkers, 1)
    dwp.wg.Add(1)
    
    go func(id int64) {
        defer dwp.wg.Done()
        defer atomic.AddInt64(&dwp.currentWorkers, -1)
        
        fmt.Printf("Worker %d started\n", id)
        
        for {
            select {
            case job := <-dwp.jobQueue:
                start := time.Now()
                result := dwp.processJob(job)
                duration := time.Since(start)
                
                fmt.Printf("Worker %d processed job %d in %v\n", id, job.ID, duration)
                
                select {
                case dwp.resultQueue <- result:
                case <-dwp.ctx.Done():
                    return
                }
                
            case <-dwp.ctx.Done():
                fmt.Printf("Worker %d stopping\n", id)
                return
            }
        }
    }(workerID)
}

// processJob simulates job processing
func (dwp *DynamicWorkerPool) processJob(job Job) Result {
    // Simulate variable processing time
    time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond)
    
    if num, ok := job.Data.(int); ok {
        return Result{
            JobID:  job.ID,
            Output: num * 2,
        }
    }
    
    return Result{
        JobID: job.ID,
        Error: fmt.Errorf("invalid job data"),
    }
}

// scaler monitors load and adjusts worker count
func (dwp *DynamicWorkerPool) scaler() {
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            currentWorkers := int(atomic.LoadInt64(&dwp.currentWorkers))
            queueLength := dwp.metrics.GetQueueLength()
            
            fmt.Printf("Scaler check - Workers: %d, Queue: %d\n", currentWorkers, queueLength)
            
            if dwp.metrics.ShouldScaleUp(currentWorkers, dwp.maxWorkers) {
                fmt.Printf("Scaling up: adding worker (current: %d)\n", currentWorkers)
                dwp.addWorker()
                dwp.metrics.RecordScale()
            } else if dwp.metrics.ShouldScaleDown(currentWorkers, dwp.minWorkers) {
                fmt.Printf("Scaling down: removing worker (current: %d)\n", currentWorkers)
                // Signal one worker to stop by closing context
                // In a real implementation, you might use a more sophisticated approach
                dwp.metrics.RecordScale()
            }
            
        case <-dwp.ctx.Done():
            return
        }
    }
}

// queueMonitor tracks queue length for scaling decisions
func (dwp *DynamicWorkerPool) queueMonitor() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            queueLength := len(dwp.jobQueue)
            dwp.metrics.UpdateQueueLength(queueLength)
            
        case <-dwp.ctx.Done():
            return
        }
    }
}

// Submit adds a job to the queue
func (dwp *DynamicWorkerPool) Submit(job Job) error {
    select {
    case dwp.jobQueue <- job:
        return nil
    case <-dwp.ctx.Done():
        return dwp.ctx.Err()
    }
}

// Results returns the result channel
func (dwp *DynamicWorkerPool) Results() <-chan Result {
    return dwp.resultQueue
}

// Stop gracefully shuts down the pool
func (dwp *DynamicWorkerPool) Stop() {
    dwp.cancel()
    dwp.wg.Wait()
    close(dwp.jobQueue)
    close(dwp.resultQueue)
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    pool := NewDynamicWorkerPool(ctx, 2, 6, 20)
    pool.Start()
    defer pool.Stop()
    
    // Submit jobs in bursts to trigger scaling
    go func() {
        // Initial burst
        for i := 1; i <= 10; i++ {
            job := Job{ID: i, Data: i * 10}
            if err := pool.Submit(job); err != nil {
                fmt.Printf("Failed to submit job %d: %v\n", i, err)
                break
            }
        }
        
        time.Sleep(8 * time.Second)
        
        // Second burst
        for i := 11; i <= 25; i++ {
            job := Job{ID: i, Data: i * 10}
            if err := pool.Submit(job); err != nil {
                fmt.Printf("Failed to submit job %d: %v\n", i, err)
                break
            }
        }
        
        time.Sleep(5 * time.Second)
        
        // Final smaller batch
        for i := 26; i <= 30; i++ {
            job := Job{ID: i, Data: i * 10}
            if err := pool.Submit(job); err != nil {
                fmt.Printf("Failed to submit job %d: %v\n", i, err)
                break
            }
        }
    }()
    
    // Collect results
    resultCount := 0
    for result := range pool.Results() {
        resultCount++
        if result.Error != nil {
            fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error)
        } else {
            fmt.Printf("Job %d completed: %v\n", result.JobID, result.Output)
        }
        
        if resultCount >= 30 {
            break
        }
    }
}

Best Practices

  1. Right-Size the Pool: Match worker count to available resources
  2. Monitor Performance: Track queue length, processing times, and throughput
  3. Handle Backpressure: Implement proper queue management
  4. Graceful Shutdown: Ensure all workers complete current jobs
  5. Error Handling: Isolate worker failures from the pool
  6. Resource Cleanup: Properly close channels and cancel contexts
  7. Load Balancing: Distribute work evenly across workers

Common Pitfalls

  1. Too Many Workers: Creating more workers than CPU cores for CPU-bound tasks
  2. Unbounded Queues: Memory issues with unlimited job queues
  3. Worker Leaks: Not properly shutting down workers
  4. Blocking Operations: Long-running jobs blocking other work
  5. No Backpressure: Not handling queue overflow situations

Testing Worker Pools

package main

import (
    "context"
    "testing"
    "time"
)

func TestWorkerPool(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    pool := NewAdvancedWorkerPool(ctx, 2, 5)
    pool.Start()
    defer pool.Stop()
    
    // Submit test jobs
    jobCount := 5
    for i := 1; i <= jobCount; i++ {
        job := ContextJob{
            ID:      fmt.Sprintf("test-%d", i),
            Data:    i,
            Context: ctx,
        }
        
        if err := pool.Submit(job); err != nil {
            t.Fatalf("Failed to submit job: %v", err)
        }
    }
    
    // Collect results
    results := make(map[string]ContextResult)
    for i := 0; i < jobCount; i++ {
        select {
        case result := <-pool.Results():
            results[result.JobID] = result
        case <-time.After(2 * time.Second):
            t.Fatal("Timeout waiting for results")
        }
    }
    
    // Verify all jobs completed
    if len(results) != jobCount {
        t.Errorf("Expected %d results, got %d", jobCount, len(results))
    }
    
    // Verify results are correct
    for i := 1; i <= jobCount; i++ {
        jobID := fmt.Sprintf("test-%d", i)
        result, exists := results[jobID]
        if !exists {
            t.Errorf("Missing result for job %s", jobID)
            continue
        }
        
        if result.Error != nil {
            t.Errorf("Job %s failed: %v", jobID, result.Error)
            continue
        }
        
        expected := i * i
        if result.Output != expected {
            t.Errorf("Job %s: expected %d, got %v", jobID, expected, result.Output)
        }
    }
}

The Worker Pool pattern is essential for building scalable, resource-efficient concurrent applications in Go. It provides controlled concurrency, predictable resource usage, and excellent performance characteristics for both CPU-bound and I/O-bound workloads.


Next: Learn about Mutex Patterns for protecting shared resources with locks.