Go Concurrency Patterns Series: ← Request/Response | Series Overview | Mutex Patterns →
What is the Worker Pool Pattern?
The Worker Pool pattern manages a fixed number of worker goroutines that process jobs from a shared queue. This pattern is essential for controlling resource usage, preventing system overload, and ensuring predictable performance under varying loads.
Key Components:
- Job Queue: Channel containing work to be processed
- Worker Pool: Fixed number of worker goroutines
- Result Channel: Optional channel for collecting results
- Dispatcher: Coordinates job distribution to workers
Real-World Use Cases
- Image Processing: Resize/compress images with limited CPU cores
- Database Operations: Limit concurrent database connections
- API Rate Limiting: Control outbound API call rates
- File Processing: Process files with bounded I/O operations
- Web Scraping: Limit concurrent HTTP requests
- Background Jobs: Process queued tasks with resource limits
Basic Worker Pool Implementation
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Job represents work to be processed
type Job struct {
ID int
Data interface{}
}
// Result represents the outcome of processing a job
type Result struct {
JobID int
Output interface{}
Error error
}
// WorkerPool manages a pool of workers
type WorkerPool struct {
workerCount int
jobQueue chan Job
resultQueue chan Result
quit chan bool
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workerCount, jobQueueSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobQueue: make(chan Job, jobQueueSize),
resultQueue: make(chan Result, jobQueueSize),
quit: make(chan bool),
}
}
// Start initializes and starts all workers
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker processes jobs from the job queue
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case job := <-wp.jobQueue:
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
result := wp.processJob(job)
wp.resultQueue <- result
case <-wp.quit:
fmt.Printf("Worker %d stopping\n", id)
return
}
}
}
// processJob simulates job processing
func (wp *WorkerPool) processJob(job Job) Result {
// Simulate work
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Process the job (example: square the number)
if num, ok := job.Data.(int); ok {
return Result{
JobID: job.ID,
Output: num * num,
}
}
return Result{
JobID: job.ID,
Error: fmt.Errorf("invalid job data"),
}
}
// Submit adds a job to the queue
func (wp *WorkerPool) Submit(job Job) {
wp.jobQueue <- job
}
// Results returns the result channel
func (wp *WorkerPool) Results() <-chan Result {
return wp.resultQueue
}
// Stop gracefully shuts down the worker pool
func (wp *WorkerPool) Stop() {
close(wp.quit)
wp.wg.Wait()
close(wp.jobQueue)
close(wp.resultQueue)
}
func main() {
// Create worker pool with 3 workers
pool := NewWorkerPool(3, 10)
pool.Start()
defer pool.Stop()
// Submit jobs
go func() {
for i := 1; i <= 10; i++ {
job := Job{
ID: i,
Data: i * 10,
}
pool.Submit(job)
}
}()
// Collect results
for i := 0; i < 10; i++ {
result := <-pool.Results()
if result.Error != nil {
fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("Job %d result: %v\n", result.JobID, result.Output)
}
}
}
Advanced Worker Pool with Context
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ContextJob includes context for cancellation
type ContextJob struct {
ID string
Data interface{}
Context context.Context
}
// ContextResult includes timing and context information
type ContextResult struct {
JobID string
Output interface{}
Error error
Duration time.Duration
WorkerID int
}
// AdvancedWorkerPool supports context cancellation and monitoring
type AdvancedWorkerPool struct {
workerCount int
jobQueue chan ContextJob
resultQueue chan ContextResult
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
metrics *PoolMetrics
}
// PoolMetrics tracks worker pool performance
type PoolMetrics struct {
mu sync.RWMutex
jobsProcessed int64
jobsFailed int64
totalDuration time.Duration
activeWorkers int
}
func (pm *PoolMetrics) RecordJob(duration time.Duration, success bool) {
pm.mu.Lock()
defer pm.mu.Unlock()
if success {
pm.jobsProcessed++
} else {
pm.jobsFailed++
}
pm.totalDuration += duration
}
func (pm *PoolMetrics) SetActiveWorkers(count int) {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.activeWorkers = count
}
func (pm *PoolMetrics) GetStats() (processed, failed int64, avgDuration time.Duration, active int) {
pm.mu.RLock()
defer pm.mu.RUnlock()
processed = pm.jobsProcessed
failed = pm.jobsFailed
active = pm.activeWorkers
if pm.jobsProcessed > 0 {
avgDuration = pm.totalDuration / time.Duration(pm.jobsProcessed)
}
return
}
// NewAdvancedWorkerPool creates a new advanced worker pool
func NewAdvancedWorkerPool(ctx context.Context, workerCount, queueSize int) *AdvancedWorkerPool {
poolCtx, cancel := context.WithCancel(ctx)
return &AdvancedWorkerPool{
workerCount: workerCount,
jobQueue: make(chan ContextJob, queueSize),
resultQueue: make(chan ContextResult, queueSize),
ctx: poolCtx,
cancel: cancel,
metrics: &PoolMetrics{},
}
}
// Start begins processing with all workers
func (awp *AdvancedWorkerPool) Start() {
awp.metrics.SetActiveWorkers(awp.workerCount)
for i := 0; i < awp.workerCount; i++ {
awp.wg.Add(1)
go awp.worker(i)
}
// Start metrics reporter
go awp.reportMetrics()
}
// worker processes jobs with context support
func (awp *AdvancedWorkerPool) worker(id int) {
defer awp.wg.Done()
for {
select {
case job := <-awp.jobQueue:
start := time.Now()
result := awp.processContextJob(job, id)
duration := time.Since(start)
awp.metrics.RecordJob(duration, result.Error == nil)
select {
case awp.resultQueue <- result:
case <-awp.ctx.Done():
return
}
case <-awp.ctx.Done():
fmt.Printf("Worker %d shutting down\n", id)
return
}
}
}
// processContextJob handles job processing with context
func (awp *AdvancedWorkerPool) processContextJob(job ContextJob, workerID int) ContextResult {
start := time.Now()
// Check if job context is already cancelled
select {
case <-job.Context.Done():
return ContextResult{
JobID: job.ID,
Error: job.Context.Err(),
Duration: time.Since(start),
WorkerID: workerID,
}
default:
}
// Simulate work that respects context cancellation
workDone := make(chan interface{}, 1)
workErr := make(chan error, 1)
go func() {
// Simulate processing time
time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
if num, ok := job.Data.(int); ok {
workDone <- num * num
} else {
workErr <- fmt.Errorf("invalid data type")
}
}()
select {
case result := <-workDone:
return ContextResult{
JobID: job.ID,
Output: result,
Duration: time.Since(start),
WorkerID: workerID,
}
case err := <-workErr:
return ContextResult{
JobID: job.ID,
Error: err,
Duration: time.Since(start),
WorkerID: workerID,
}
case <-job.Context.Done():
return ContextResult{
JobID: job.ID,
Error: job.Context.Err(),
Duration: time.Since(start),
WorkerID: workerID,
}
case <-awp.ctx.Done():
return ContextResult{
JobID: job.ID,
Error: awp.ctx.Err(),
Duration: time.Since(start),
WorkerID: workerID,
}
}
}
// Submit adds a job to the queue
func (awp *AdvancedWorkerPool) Submit(job ContextJob) error {
select {
case awp.jobQueue <- job:
return nil
case <-awp.ctx.Done():
return awp.ctx.Err()
}
}
// Results returns the result channel
func (awp *AdvancedWorkerPool) Results() <-chan ContextResult {
return awp.resultQueue
}
// reportMetrics periodically reports pool statistics
func (awp *AdvancedWorkerPool) reportMetrics() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
processed, failed, avgDuration, active := awp.metrics.GetStats()
fmt.Printf("Pool Stats - Processed: %d, Failed: %d, Avg Duration: %v, Active Workers: %d\n",
processed, failed, avgDuration, active)
case <-awp.ctx.Done():
return
}
}
}
// Stop gracefully shuts down the worker pool
func (awp *AdvancedWorkerPool) Stop() {
awp.cancel()
awp.wg.Wait()
close(awp.jobQueue)
close(awp.resultQueue)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
pool := NewAdvancedWorkerPool(ctx, 4, 20)
pool.Start()
defer pool.Stop()
// Submit jobs with individual timeouts
go func() {
for i := 1; i <= 15; i++ {
jobCtx, jobCancel := context.WithTimeout(ctx, 200*time.Millisecond)
job := ContextJob{
ID: fmt.Sprintf("job-%d", i),
Data: i * 5,
Context: jobCtx,
}
if err := pool.Submit(job); err != nil {
fmt.Printf("Failed to submit job %d: %v\n", i, err)
jobCancel()
break
}
// Cancel some jobs early to demonstrate cancellation
if i%5 == 0 {
go func() {
time.Sleep(50 * time.Millisecond)
jobCancel()
}()
} else {
defer jobCancel()
}
}
}()
// Collect results
resultCount := 0
for result := range pool.Results() {
resultCount++
if result.Error != nil {
fmt.Printf("Job %s failed (worker %d): %v (took %v)\n",
result.JobID, result.WorkerID, result.Error, result.Duration)
} else {
fmt.Printf("Job %s completed (worker %d): %v (took %v)\n",
result.JobID, result.WorkerID, result.Output, result.Duration)
}
if resultCount >= 15 {
break
}
}
}
Dynamic Worker Pool
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// DynamicWorkerPool can scale workers up and down based on load
type DynamicWorkerPool struct {
minWorkers int
maxWorkers int
currentWorkers int64
jobQueue chan Job
resultQueue chan Result
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
workerControl chan int // +1 to add worker, -1 to remove worker
metrics *DynamicMetrics
}
// DynamicMetrics tracks load and performance for scaling decisions
type DynamicMetrics struct {
mu sync.RWMutex
queueLength int64
avgProcessingTime time.Duration
lastScaleTime time.Time
scaleUpThreshold int
scaleDownThreshold int
}
func (dm *DynamicMetrics) UpdateQueueLength(length int) {
atomic.StoreInt64(&dm.queueLength, int64(length))
}
func (dm *DynamicMetrics) GetQueueLength() int {
return int(atomic.LoadInt64(&dm.queueLength))
}
func (dm *DynamicMetrics) ShouldScaleUp(currentWorkers int, maxWorkers int) bool {
dm.mu.RLock()
defer dm.mu.RUnlock()
return currentWorkers < maxWorkers &&
dm.GetQueueLength() > dm.scaleUpThreshold &&
time.Since(dm.lastScaleTime) > 5*time.Second
}
func (dm *DynamicMetrics) ShouldScaleDown(currentWorkers int, minWorkers int) bool {
dm.mu.RLock()
defer dm.mu.RUnlock()
return currentWorkers > minWorkers &&
dm.GetQueueLength() < dm.scaleDownThreshold &&
time.Since(dm.lastScaleTime) > 10*time.Second
}
func (dm *DynamicMetrics) RecordScale() {
dm.mu.Lock()
defer dm.mu.Unlock()
dm.lastScaleTime = time.Now()
}
// NewDynamicWorkerPool creates a new dynamic worker pool
func NewDynamicWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool {
poolCtx, cancel := context.WithCancel(ctx)
return &DynamicWorkerPool{
minWorkers: minWorkers,
maxWorkers: maxWorkers,
currentWorkers: 0,
jobQueue: make(chan Job, queueSize),
resultQueue: make(chan Result, queueSize),
ctx: poolCtx,
cancel: cancel,
workerControl: make(chan int, maxWorkers),
metrics: &DynamicMetrics{
scaleUpThreshold: queueSize / 2,
scaleDownThreshold: queueSize / 4,
},
}
}
// Start initializes the pool with minimum workers
func (dwp *DynamicWorkerPool) Start() {
// Start with minimum workers
for i := 0; i < dwp.minWorkers; i++ {
dwp.addWorker()
}
// Start the scaler
go dwp.scaler()
// Start queue monitor
go dwp.queueMonitor()
}
// addWorker creates and starts a new worker
func (dwp *DynamicWorkerPool) addWorker() {
workerID := atomic.AddInt64(&dwp.currentWorkers, 1)
dwp.wg.Add(1)
go func(id int64) {
defer dwp.wg.Done()
defer atomic.AddInt64(&dwp.currentWorkers, -1)
fmt.Printf("Worker %d started\n", id)
for {
select {
case job := <-dwp.jobQueue:
start := time.Now()
result := dwp.processJob(job)
duration := time.Since(start)
fmt.Printf("Worker %d processed job %d in %v\n", id, job.ID, duration)
select {
case dwp.resultQueue <- result:
case <-dwp.ctx.Done():
return
}
case <-dwp.ctx.Done():
fmt.Printf("Worker %d stopping\n", id)
return
}
}
}(workerID)
}
// processJob simulates job processing
func (dwp *DynamicWorkerPool) processJob(job Job) Result {
// Simulate variable processing time
time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond)
if num, ok := job.Data.(int); ok {
return Result{
JobID: job.ID,
Output: num * 2,
}
}
return Result{
JobID: job.ID,
Error: fmt.Errorf("invalid job data"),
}
}
// scaler monitors load and adjusts worker count
func (dwp *DynamicWorkerPool) scaler() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
currentWorkers := int(atomic.LoadInt64(&dwp.currentWorkers))
queueLength := dwp.metrics.GetQueueLength()
fmt.Printf("Scaler check - Workers: %d, Queue: %d\n", currentWorkers, queueLength)
if dwp.metrics.ShouldScaleUp(currentWorkers, dwp.maxWorkers) {
fmt.Printf("Scaling up: adding worker (current: %d)\n", currentWorkers)
dwp.addWorker()
dwp.metrics.RecordScale()
} else if dwp.metrics.ShouldScaleDown(currentWorkers, dwp.minWorkers) {
fmt.Printf("Scaling down: removing worker (current: %d)\n", currentWorkers)
// Signal one worker to stop by closing context
// In a real implementation, you might use a more sophisticated approach
dwp.metrics.RecordScale()
}
case <-dwp.ctx.Done():
return
}
}
}
// queueMonitor tracks queue length for scaling decisions
func (dwp *DynamicWorkerPool) queueMonitor() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
queueLength := len(dwp.jobQueue)
dwp.metrics.UpdateQueueLength(queueLength)
case <-dwp.ctx.Done():
return
}
}
}
// Submit adds a job to the queue
func (dwp *DynamicWorkerPool) Submit(job Job) error {
select {
case dwp.jobQueue <- job:
return nil
case <-dwp.ctx.Done():
return dwp.ctx.Err()
}
}
// Results returns the result channel
func (dwp *DynamicWorkerPool) Results() <-chan Result {
return dwp.resultQueue
}
// Stop gracefully shuts down the pool
func (dwp *DynamicWorkerPool) Stop() {
dwp.cancel()
dwp.wg.Wait()
close(dwp.jobQueue)
close(dwp.resultQueue)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool := NewDynamicWorkerPool(ctx, 2, 6, 20)
pool.Start()
defer pool.Stop()
// Submit jobs in bursts to trigger scaling
go func() {
// Initial burst
for i := 1; i <= 10; i++ {
job := Job{ID: i, Data: i * 10}
if err := pool.Submit(job); err != nil {
fmt.Printf("Failed to submit job %d: %v\n", i, err)
break
}
}
time.Sleep(8 * time.Second)
// Second burst
for i := 11; i <= 25; i++ {
job := Job{ID: i, Data: i * 10}
if err := pool.Submit(job); err != nil {
fmt.Printf("Failed to submit job %d: %v\n", i, err)
break
}
}
time.Sleep(5 * time.Second)
// Final smaller batch
for i := 26; i <= 30; i++ {
job := Job{ID: i, Data: i * 10}
if err := pool.Submit(job); err != nil {
fmt.Printf("Failed to submit job %d: %v\n", i, err)
break
}
}
}()
// Collect results
resultCount := 0
for result := range pool.Results() {
resultCount++
if result.Error != nil {
fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("Job %d completed: %v\n", result.JobID, result.Output)
}
if resultCount >= 30 {
break
}
}
}
Best Practices
- Right-Size the Pool: Match worker count to available resources
- Monitor Performance: Track queue length, processing times, and throughput
- Handle Backpressure: Implement proper queue management
- Graceful Shutdown: Ensure all workers complete current jobs
- Error Handling: Isolate worker failures from the pool
- Resource Cleanup: Properly close channels and cancel contexts
- Load Balancing: Distribute work evenly across workers
Common Pitfalls
- Too Many Workers: Creating more workers than CPU cores for CPU-bound tasks
- Unbounded Queues: Memory issues with unlimited job queues
- Worker Leaks: Not properly shutting down workers
- Blocking Operations: Long-running jobs blocking other work
- No Backpressure: Not handling queue overflow situations
Testing Worker Pools
package main
import (
"context"
"testing"
"time"
)
func TestWorkerPool(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pool := NewAdvancedWorkerPool(ctx, 2, 5)
pool.Start()
defer pool.Stop()
// Submit test jobs
jobCount := 5
for i := 1; i <= jobCount; i++ {
job := ContextJob{
ID: fmt.Sprintf("test-%d", i),
Data: i,
Context: ctx,
}
if err := pool.Submit(job); err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
}
// Collect results
results := make(map[string]ContextResult)
for i := 0; i < jobCount; i++ {
select {
case result := <-pool.Results():
results[result.JobID] = result
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for results")
}
}
// Verify all jobs completed
if len(results) != jobCount {
t.Errorf("Expected %d results, got %d", jobCount, len(results))
}
// Verify results are correct
for i := 1; i <= jobCount; i++ {
jobID := fmt.Sprintf("test-%d", i)
result, exists := results[jobID]
if !exists {
t.Errorf("Missing result for job %s", jobID)
continue
}
if result.Error != nil {
t.Errorf("Job %s failed: %v", jobID, result.Error)
continue
}
expected := i * i
if result.Output != expected {
t.Errorf("Job %s: expected %d, got %v", jobID, expected, result.Output)
}
}
}
The Worker Pool pattern is essential for building scalable, resource-efficient concurrent applications in Go. It provides controlled concurrency, predictable resource usage, and excellent performance characteristics for both CPU-bound and I/O-bound workloads.
Next: Learn about Mutex Patterns for protecting shared resources with locks.