The Producer-Consumer Problem
The Producer-Consumer pattern is one of the most fundamental concurrency patterns. It appears everywhere in modern software:
- Message queues (RabbitMQ, Kafka, SQS)
- Task processing (background jobs, worker pools)
- Data pipelines (ETL, streaming analytics)
- Event systems (event buses, pub/sub)
- Buffering (I/O buffers, network buffers)
The Setup: Producers generate data, consumers process it. They run concurrently and need to coordinate through a shared buffer.
The Unbounded Buffer Variant
In this first variant, we use an unbounded buffer - the queue can grow infinitely (until we run out of memory). This is the simplest version and showcases Go’s beautiful channel abstraction.
Key characteristics:
- Producers never block (can always add to queue)
- Consumers block only when queue is empty
- Simple to implement with Go channels
- Risk of memory exhaustion if producers outpace consumers
Go Channels: Perfect for Producer-Consumer
Go’s channels are specifically designed for this pattern! An unbuffered or buffered channel provides:
✓ Thread-safe communication ✓ Built-in synchronization ✓ Natural blocking behavior ✓ Clear ownership semantics
Basic Implementation
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Job represents a unit of work
type Job struct {
ID int
Data string
CreatedAt time.Time
}
// Producer generates jobs and sends them to the channel
func Producer(id int, jobs chan<- Job, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
job := Job{
ID: id*100 + i,
Data: fmt.Sprintf("Job from Producer %d, item %d", id, i),
CreatedAt: time.Now(),
}
fmt.Printf("[Producer %d] Creating job %d\n", id, job.ID)
jobs <- job // Send to channel (never blocks with unbounded)
// Simulate work
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
fmt.Printf("[Producer %d] Finished\n", id)
}
// Consumer receives jobs from the channel and processes them
func Consumer(id int, jobs <-chan Job, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("[Consumer %d] Processing job %d (age: %v)\n",
id, job.ID, time.Since(job.CreatedAt))
// Simulate processing
time.Sleep(time.Duration(150+rand.Intn(100)) * time.Millisecond)
fmt.Printf("[Consumer %d] Completed job %d\n", id, job.ID)
}
fmt.Printf("[Consumer %d] No more jobs, shutting down\n", id)
}
func main() {
fmt.Println("=== Producer-Consumer: Unbounded Buffer ===")
fmt.Println()
// Create unbounded channel (buffered with large capacity)
// In practice, Go channels have a max capacity, but we can make it "large enough"
jobs := make(chan Job, 1000)
var producerWg sync.WaitGroup
var consumerWg sync.WaitGroup
numProducers := 2
numConsumers := 2
// Start consumers first
fmt.Println("Starting consumers...")
for i := 0; i < numConsumers; i++ {
consumerWg.Add(1)
go Consumer(i, jobs, &consumerWg)
}
// Start producers
fmt.Println("Starting producers...")
for i := 0; i < numProducers; i++ {
producerWg.Add(1)
go Producer(i, jobs, &producerWg)
}
// Wait for all producers to finish
producerWg.Wait()
fmt.Println("\n✓ All producers finished")
// Close the channel to signal consumers
close(jobs)
fmt.Println("Channel closed, consumers will finish pending jobs")
// Wait for all consumers to finish
consumerWg.Wait()
fmt.Println("\n✓ All consumers finished")
fmt.Println("✓ Pipeline complete!")
}
Understanding Channel Semantics
Send-Only Channel: chan<- T
func Producer(id int, jobs chan<- Job, wg *sync.WaitGroup) {
jobs <- job // Can only send
// job := <-jobs // Compile error! Can't receive
}
This is a compile-time guarantee that producers can only send, not receive.
Receive-Only Channel: <-chan T
func Consumer(id int, jobs <-chan Job, wg *sync.WaitGroup) {
job := <-jobs // Can only receive
// jobs <- job // Compile error! Can't send
}
This prevents consumers from accidentally sending to the channel.
Ranging Over Channels
for job := range jobs {
// Process job
}
This idiom:
- Receives from the channel
- Continues until channel is closed
- Automatically exits when closed
Advanced Example: Real-Time Metrics
Let’s build a more realistic example that processes log events and generates metrics:
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// LogEvent represents a log entry
type LogEvent struct {
Timestamp time.Time
Level string
Message string
UserID int
}
// Metrics tracks processing statistics
type Metrics struct {
totalProcessed atomic.Int64
errors atomic.Int64
warnings atomic.Int64
infos atomic.Int64
}
func (m *Metrics) Record(level string) {
m.totalProcessed.Add(1)
switch level {
case "ERROR":
m.errors.Add(1)
case "WARN":
m.warnings.Add(1)
case "INFO":
m.infos.Add(1)
}
}
func (m *Metrics) String() string {
return fmt.Sprintf("Total: %d, Errors: %d, Warnings: %d, Infos: %d",
m.totalProcessed.Load(),
m.errors.Load(),
m.warnings.Load(),
m.infos.Load())
}
// LogProducer simulates log generation
func LogProducer(id int, events chan<- LogEvent, duration time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
levels := []string{"INFO", "WARN", "ERROR"}
messages := []string{
"User login successful",
"Database connection timeout",
"Cache miss",
"API request completed",
"Authentication failed",
}
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
event := LogEvent{
Timestamp: time.Now(),
Level: levels[rand.Intn(len(levels))],
Message: messages[rand.Intn(len(messages))],
UserID: rand.Intn(1000),
}
events <- event
// Variable rate
time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond)
}
fmt.Printf("[Producer %d] Stopped\n", id)
}
// LogProcessor consumes and processes log events
func LogProcessor(id int, events <-chan LogEvent, metrics *Metrics, wg *sync.WaitGroup) {
defer wg.Done()
processed := 0
for event := range events {
// Process the event
age := time.Since(event.Timestamp)
// Record metrics
metrics.Record(event.Level)
processed++
// Log slow processing
if age > 100*time.Millisecond {
fmt.Printf("[Processor %d] Slow processing: event age %v\n", id, age)
}
// Simulate processing time
time.Sleep(time.Duration(20+rand.Intn(30)) * time.Millisecond)
}
fmt.Printf("[Processor %d] Finished, processed %d events\n", id, processed)
}
// MetricsReporter periodically reports metrics
func MetricsReporter(metrics *Metrics, interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("\n📊 Metrics: %s\n\n", metrics)
case <-stop:
fmt.Printf("\n📊 Final Metrics: %s\n", metrics)
return
}
}
}
func main() {
fmt.Println("=== Log Processing Pipeline ===")
fmt.Println("Demonstrating unbounded producer-consumer pattern")
fmt.Println()
// Large buffer - simulates "unbounded"
events := make(chan LogEvent, 10000)
metrics := &Metrics{}
// Control channels
stopReporter := make(chan struct{})
// Start metrics reporter
go MetricsReporter(metrics, 2*time.Second, stopReporter)
var producerWg, consumerWg sync.WaitGroup
numProducers := 3
numConsumers := 2
duration := 5 * time.Second
// Start consumers
fmt.Printf("Starting %d consumers...\n", numConsumers)
for i := 0; i < numConsumers; i++ {
consumerWg.Add(1)
go LogProcessor(i, events, metrics, &consumerWg)
}
// Start producers
fmt.Printf("Starting %d producers for %v...\n\n", numProducers, duration)
for i := 0; i < numProducers; i++ {
producerWg.Add(1)
go LogProducer(i, events, duration, &producerWg)
}
// Wait for producers
producerWg.Wait()
fmt.Println("\n✓ All producers finished")
// Close channel
close(events)
// Wait for consumers
consumerWg.Wait()
fmt.Println("✓ All consumers finished")
// Stop reporter
close(stopReporter)
fmt.Println("\n✓ Pipeline complete!")
}
When Producers Outpace Consumers
The unbounded buffer can grow indefinitely. Let’s visualize what happens:
// Slow consumer scenario
func DemoBackpressure() {
events := make(chan int, 100)
// Fast producer: 100 items/sec
go func() {
for i := 0; i < 1000; i++ {
events <- i
time.Sleep(10 * time.Millisecond)
}
close(events)
}()
// Slow consumer: 10 items/sec
for event := range events {
fmt.Printf("Processing %d, queue size: %d\n", event, len(events))
time.Sleep(100 * time.Millisecond)
}
}
You’ll see the queue size growing! This is where bounded buffers (next article) become important.
Key Go Patterns Used
1. Directional Channels
func Producer(jobs chan<- Job) // Send-only
func Consumer(jobs <-chan Job) // Receive-only
Prevents misuse at compile time!
2. Channel Closing
close(jobs) // Signals "no more data"
Only the sender should close. Closing signals consumers to finish.
3. Atomic Operations for Metrics
type Metrics struct {
totalProcessed atomic.Int64
}
m.totalProcessed.Add(1) // Thread-safe increment
Lock-free counters for high-performance metrics.
4. Select with Timeout
select {
case event := <-events:
process(event)
case <-time.After(1 * time.Second):
// Timeout
}
Advantages of Unbounded Buffer
✓ Producers never block - Maximum throughput ✓ Simple to implement - No complex coordination ✓ Handles bursts - Queue absorbs traffic spikes ✓ Natural in Go - Channels make it elegant
Disadvantages
✗ Memory growth - Queue can grow unbounded ✗ No backpressure - Producers don’t know consumers are slow ✗ Latency issues - Items can sit in queue for long time ✗ OOM risk - Can exhaust memory
When to Use Unbounded Buffers
✓ Use when:
- You know the maximum queue size will be reasonable
- Producers and consumers are roughly balanced
- You need to handle traffic bursts
- Simplicity is more important than memory constraints
✗ Avoid when:
- Producers can significantly outpace consumers
- Memory is constrained
- You need bounded latency guarantees
- You need backpressure
Real-World Applications
1. Log Aggregation
// Collect logs from multiple sources
logs := make(chan LogEntry, 10000)
2. Event Processing
// Handle UI events, webhook deliveries
events := make(chan Event, 5000)
3. Job Queues
// Background task processing
tasks := make(chan Task, 1000)
Next Up: Bounded Buffer
In the next article, we’ll add bounded buffers that limit queue size and provide backpressure to slow down producers when consumers can’t keep up!
Try It Yourself
Experiment with the code:
- Add monitoring - Track max queue size over time
- Slow down consumers - See the queue grow
- Add priorities - Process high-priority items first
- Implement timeouts - Drop old items from queue
- Add circuit breakers - Stop producers if queue too large
This is part 4 of “Golang Experiments: Classic Concurrency Problems”