The Producer-Consumer Problem

The Producer-Consumer pattern is one of the most fundamental concurrency patterns. It appears everywhere in modern software:

  • Message queues (RabbitMQ, Kafka, SQS)
  • Task processing (background jobs, worker pools)
  • Data pipelines (ETL, streaming analytics)
  • Event systems (event buses, pub/sub)
  • Buffering (I/O buffers, network buffers)

The Setup: Producers generate data, consumers process it. They run concurrently and need to coordinate through a shared buffer.

The Unbounded Buffer Variant

In this first variant, we use an unbounded buffer - the queue can grow infinitely (until we run out of memory). This is the simplest version and showcases Go’s beautiful channel abstraction.

Key characteristics:

  • Producers never block (can always add to queue)
  • Consumers block only when queue is empty
  • Simple to implement with Go channels
  • Risk of memory exhaustion if producers outpace consumers

Go Channels: Perfect for Producer-Consumer

Go’s channels are specifically designed for this pattern! An unbuffered or buffered channel provides:

✓ Thread-safe communication ✓ Built-in synchronization ✓ Natural blocking behavior ✓ Clear ownership semantics

Basic Implementation

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Job represents a unit of work
type Job struct {
	ID        int
	Data      string
	CreatedAt time.Time
}

// Producer generates jobs and sends them to the channel
func Producer(id int, jobs chan<- Job, wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i < 5; i++ {
		job := Job{
			ID:        id*100 + i,
			Data:      fmt.Sprintf("Job from Producer %d, item %d", id, i),
			CreatedAt: time.Now(),
		}

		fmt.Printf("[Producer %d] Creating job %d\n", id, job.ID)
		jobs <- job // Send to channel (never blocks with unbounded)

		// Simulate work
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	}

	fmt.Printf("[Producer %d] Finished\n", id)
}

// Consumer receives jobs from the channel and processes them
func Consumer(id int, jobs <-chan Job, wg *sync.WaitGroup) {
	defer wg.Done()

	for job := range jobs {
		fmt.Printf("[Consumer %d] Processing job %d (age: %v)\n",
			id, job.ID, time.Since(job.CreatedAt))

		// Simulate processing
		time.Sleep(time.Duration(150+rand.Intn(100)) * time.Millisecond)

		fmt.Printf("[Consumer %d] Completed job %d\n", id, job.ID)
	}

	fmt.Printf("[Consumer %d] No more jobs, shutting down\n", id)
}

func main() {
	fmt.Println("=== Producer-Consumer: Unbounded Buffer ===")
	fmt.Println()

	// Create unbounded channel (buffered with large capacity)
	// In practice, Go channels have a max capacity, but we can make it "large enough"
	jobs := make(chan Job, 1000)

	var producerWg sync.WaitGroup
	var consumerWg sync.WaitGroup

	numProducers := 2
	numConsumers := 2

	// Start consumers first
	fmt.Println("Starting consumers...")
	for i := 0; i < numConsumers; i++ {
		consumerWg.Add(1)
		go Consumer(i, jobs, &consumerWg)
	}

	// Start producers
	fmt.Println("Starting producers...")
	for i := 0; i < numProducers; i++ {
		producerWg.Add(1)
		go Producer(i, jobs, &producerWg)
	}

	// Wait for all producers to finish
	producerWg.Wait()
	fmt.Println("\n✓ All producers finished")

	// Close the channel to signal consumers
	close(jobs)
	fmt.Println("Channel closed, consumers will finish pending jobs")

	// Wait for all consumers to finish
	consumerWg.Wait()
	fmt.Println("\n✓ All consumers finished")
	fmt.Println("✓ Pipeline complete!")
}

Understanding Channel Semantics

Send-Only Channel: chan<- T

func Producer(id int, jobs chan<- Job, wg *sync.WaitGroup) {
    jobs <- job // Can only send
    // job := <-jobs // Compile error! Can't receive
}

This is a compile-time guarantee that producers can only send, not receive.

Receive-Only Channel: <-chan T

func Consumer(id int, jobs <-chan Job, wg *sync.WaitGroup) {
    job := <-jobs // Can only receive
    // jobs <- job // Compile error! Can't send
}

This prevents consumers from accidentally sending to the channel.

Ranging Over Channels

for job := range jobs {
    // Process job
}

This idiom:

  1. Receives from the channel
  2. Continues until channel is closed
  3. Automatically exits when closed

Advanced Example: Real-Time Metrics

Let’s build a more realistic example that processes log events and generates metrics:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// LogEvent represents a log entry
type LogEvent struct {
	Timestamp time.Time
	Level     string
	Message   string
	UserID    int
}

// Metrics tracks processing statistics
type Metrics struct {
	totalProcessed atomic.Int64
	errors         atomic.Int64
	warnings       atomic.Int64
	infos          atomic.Int64
}

func (m *Metrics) Record(level string) {
	m.totalProcessed.Add(1)
	switch level {
	case "ERROR":
		m.errors.Add(1)
	case "WARN":
		m.warnings.Add(1)
	case "INFO":
		m.infos.Add(1)
	}
}

func (m *Metrics) String() string {
	return fmt.Sprintf("Total: %d, Errors: %d, Warnings: %d, Infos: %d",
		m.totalProcessed.Load(),
		m.errors.Load(),
		m.warnings.Load(),
		m.infos.Load())
}

// LogProducer simulates log generation
func LogProducer(id int, events chan<- LogEvent, duration time.Duration, wg *sync.WaitGroup) {
	defer wg.Done()

	levels := []string{"INFO", "WARN", "ERROR"}
	messages := []string{
		"User login successful",
		"Database connection timeout",
		"Cache miss",
		"API request completed",
		"Authentication failed",
	}

	deadline := time.Now().Add(duration)

	for time.Now().Before(deadline) {
		event := LogEvent{
			Timestamp: time.Now(),
			Level:     levels[rand.Intn(len(levels))],
			Message:   messages[rand.Intn(len(messages))],
			UserID:    rand.Intn(1000),
		}

		events <- event

		// Variable rate
		time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond)
	}

	fmt.Printf("[Producer %d] Stopped\n", id)
}

// LogProcessor consumes and processes log events
func LogProcessor(id int, events <-chan LogEvent, metrics *Metrics, wg *sync.WaitGroup) {
	defer wg.Done()

	processed := 0

	for event := range events {
		// Process the event
		age := time.Since(event.Timestamp)

		// Record metrics
		metrics.Record(event.Level)
		processed++

		// Log slow processing
		if age > 100*time.Millisecond {
			fmt.Printf("[Processor %d] Slow processing: event age %v\n", id, age)
		}

		// Simulate processing time
		time.Sleep(time.Duration(20+rand.Intn(30)) * time.Millisecond)
	}

	fmt.Printf("[Processor %d] Finished, processed %d events\n", id, processed)
}

// MetricsReporter periodically reports metrics
func MetricsReporter(metrics *Metrics, interval time.Duration, stop <-chan struct{}) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			fmt.Printf("\n📊 Metrics: %s\n\n", metrics)
		case <-stop:
			fmt.Printf("\n📊 Final Metrics: %s\n", metrics)
			return
		}
	}
}

func main() {
	fmt.Println("=== Log Processing Pipeline ===")
	fmt.Println("Demonstrating unbounded producer-consumer pattern")
	fmt.Println()

	// Large buffer - simulates "unbounded"
	events := make(chan LogEvent, 10000)
	metrics := &Metrics{}

	// Control channels
	stopReporter := make(chan struct{})

	// Start metrics reporter
	go MetricsReporter(metrics, 2*time.Second, stopReporter)

	var producerWg, consumerWg sync.WaitGroup

	numProducers := 3
	numConsumers := 2
	duration := 5 * time.Second

	// Start consumers
	fmt.Printf("Starting %d consumers...\n", numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumerWg.Add(1)
		go LogProcessor(i, events, metrics, &consumerWg)
	}

	// Start producers
	fmt.Printf("Starting %d producers for %v...\n\n", numProducers, duration)
	for i := 0; i < numProducers; i++ {
		producerWg.Add(1)
		go LogProducer(i, events, duration, &producerWg)
	}

	// Wait for producers
	producerWg.Wait()
	fmt.Println("\n✓ All producers finished")

	// Close channel
	close(events)

	// Wait for consumers
	consumerWg.Wait()
	fmt.Println("✓ All consumers finished")

	// Stop reporter
	close(stopReporter)

	fmt.Println("\n✓ Pipeline complete!")
}

When Producers Outpace Consumers

The unbounded buffer can grow indefinitely. Let’s visualize what happens:

// Slow consumer scenario
func DemoBackpressure() {
	events := make(chan int, 100)

	// Fast producer: 100 items/sec
	go func() {
		for i := 0; i < 1000; i++ {
			events <- i
			time.Sleep(10 * time.Millisecond)
		}
		close(events)
	}()

	// Slow consumer: 10 items/sec
	for event := range events {
		fmt.Printf("Processing %d, queue size: %d\n", event, len(events))
		time.Sleep(100 * time.Millisecond)
	}
}

You’ll see the queue size growing! This is where bounded buffers (next article) become important.

Key Go Patterns Used

1. Directional Channels

func Producer(jobs chan<- Job)  // Send-only
func Consumer(jobs <-chan Job)  // Receive-only

Prevents misuse at compile time!

2. Channel Closing

close(jobs) // Signals "no more data"

Only the sender should close. Closing signals consumers to finish.

3. Atomic Operations for Metrics

type Metrics struct {
    totalProcessed atomic.Int64
}
m.totalProcessed.Add(1) // Thread-safe increment

Lock-free counters for high-performance metrics.

4. Select with Timeout

select {
case event := <-events:
    process(event)
case <-time.After(1 * time.Second):
    // Timeout
}

Advantages of Unbounded Buffer

Producers never block - Maximum throughput ✓ Simple to implement - No complex coordination ✓ Handles bursts - Queue absorbs traffic spikes ✓ Natural in Go - Channels make it elegant

Disadvantages

Memory growth - Queue can grow unbounded ✗ No backpressure - Producers don’t know consumers are slow ✗ Latency issues - Items can sit in queue for long time ✗ OOM risk - Can exhaust memory

When to Use Unbounded Buffers

Use when:

  • You know the maximum queue size will be reasonable
  • Producers and consumers are roughly balanced
  • You need to handle traffic bursts
  • Simplicity is more important than memory constraints

Avoid when:

  • Producers can significantly outpace consumers
  • Memory is constrained
  • You need bounded latency guarantees
  • You need backpressure

Real-World Applications

1. Log Aggregation

// Collect logs from multiple sources
logs := make(chan LogEntry, 10000)

2. Event Processing

// Handle UI events, webhook deliveries
events := make(chan Event, 5000)

3. Job Queues

// Background task processing
tasks := make(chan Task, 1000)

Next Up: Bounded Buffer

In the next article, we’ll add bounded buffers that limit queue size and provide backpressure to slow down producers when consumers can’t keep up!

Try It Yourself

Experiment with the code:

  1. Add monitoring - Track max queue size over time
  2. Slow down consumers - See the queue grow
  3. Add priorities - Process high-priority items first
  4. Implement timeouts - Drop old items from queue
  5. Add circuit breakers - Stop producers if queue too large

This is part 4 of “Golang Experiments: Classic Concurrency Problems”