From Unbounded to Bounded

In the previous article, we explored the unbounded buffer pattern where the queue could grow infinitely. This works until you run out of memory!

The bounded buffer adds a crucial constraint: maximum queue size. This introduces backpressure - when the buffer is full, producers must wait for consumers to catch up.

Why Bounded Buffers Matter

Bounded buffers appear everywhere in production systems:

  • TCP sliding windows (flow control)
  • HTTP/2 stream flow control (prevents overwhelm)
  • Message queue limits (RabbitMQ, Kafka partition limits)
  • Thread pool queues (bounded task queues)
  • Rate limiters (token buckets with finite capacity)
  • Circuit breakers (limit concurrent requests)

The key benefit: Bounded buffers provide natural backpressure and prevent resource exhaustion.

Go’s Built-in Solution: Buffered Channels

Go makes bounded buffers trivial with buffered channels:

buffer := make(chan Job, 10) // Maximum 10 items

When the buffer is full:

  • Sends block until space is available
  • Receives block when empty
  • Perfect for producer-consumer coordination!

Basic Bounded Buffer Implementation

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task represents a unit of work
type Task struct {
	ID          int
	Priority    int
	Data        string
	EnqueuedAt  time.Time
	ProcessedAt time.Time
}

// Producer creates tasks and sends them to the bounded buffer
func Producer(id int, tasks chan<- Task, duration time.Duration, wg *sync.WaitGroup) {
	defer wg.Done()

	taskCount := 0
	deadline := time.Now().Add(duration)

	for time.Now().Before(deadline) {
		task := Task{
			ID:         id*1000 + taskCount,
			Priority:   rand.Intn(3),
			Data:       fmt.Sprintf("Task from Producer %d", id),
			EnqueuedAt: time.Now(),
		}

		fmt.Printf("[Producer %d] Attempting to enqueue task %d (buffer: %d/%d)\n",
			id, task.ID, len(tasks), cap(tasks))

		// This will BLOCK if buffer is full
		start := time.Now()
		tasks <- task
		blocked := time.Since(start)

		if blocked > 10*time.Millisecond {
			fmt.Printf("[Producer %d] ⚠️  BLOCKED for %v waiting for buffer space\n",
				id, blocked)
		}

		taskCount++
		time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
	}

	fmt.Printf("[Producer %d] Finished, created %d tasks\n", id, taskCount)
}

// Consumer processes tasks from the bounded buffer
func Consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
	defer wg.Done()

	processed := 0

	for task := range tasks {
		task.ProcessedAt = time.Now()
		queueTime := task.ProcessedAt.Sub(task.EnqueuedAt)

		fmt.Printf("[Consumer %d] Processing task %d (queued for %v)\n",
			id, task.ID, queueTime)

		// Simulate processing
		processingTime := time.Duration(100+rand.Intn(200)) * time.Millisecond
		time.Sleep(processingTime)

		processed++

		if queueTime > 500*time.Millisecond {
			fmt.Printf("[Consumer %d] ⚠️  Task %d had high latency: %v\n",
				id, task.ID, queueTime)
		}
	}

	fmt.Printf("[Consumer %d] Finished, processed %d tasks\n", id, processed)
}

func main() {
	fmt.Println("=== Producer-Consumer: Bounded Buffer ===")
	fmt.Println()

	const (
		bufferSize   = 5
		numProducers = 3
		numConsumers = 2
		duration     = 3 * time.Second
	)

	// Create bounded buffer
	tasks := make(chan Task, bufferSize)
	fmt.Printf("Buffer capacity: %d\n", bufferSize)
	fmt.Printf("Producers: %d, Consumers: %d\n", numProducers, numConsumers)
	fmt.Printf("Duration: %v\n\n", duration)

	var producerWg, consumerWg sync.WaitGroup

	// Start consumers
	for i := 0; i < numConsumers; i++ {
		consumerWg.Add(1)
		go Consumer(i, tasks, &consumerWg)
	}

	// Start producers
	for i := 0; i < numProducers; i++ {
		producerWg.Add(1)
		go Producer(i, tasks, duration, &producerWg)
	}

	// Wait for producers
	producerWg.Wait()
	fmt.Println("\n✓ All producers finished")
	close(tasks)

	// Wait for consumers
	consumerWg.Wait()
	fmt.Println("✓ All consumers finished")
	fmt.Println("✓ Pipeline complete!")
}

Visualizing Backpressure

Here’s what happens when producers are faster than consumers:

Time 0: Buffer [empty]
        Producers sending fast, consumers slow

Time 1: Buffer [T1][T2][T3][T4][T5] FULL!
        Next producer send will BLOCK

Time 2: Consumer processes T1
        Buffer [T2][T3][T4][T5][__]
        Blocked producer unblocks, sends T6
        Buffer [T2][T3][T4][T5][T6] FULL again

Time 3: Consumer processes T2
        Buffer [T3][T4][T5][T6][__]
        Backpressure is working!

Advanced: Monitoring Buffer Health

Let’s add real-time buffer monitoring:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// BufferStats tracks buffer usage over time
type BufferStats struct {
	samples       []int
	blockedSends  atomic.Int64
	blockedTime   atomic.Int64 // nanoseconds
	totalSends    atomic.Int64
	maxObserved   atomic.Int64
	mu            sync.Mutex
}

func (s *BufferStats) RecordSize(size int) {
	s.mu.Lock()
	s.samples = append(s.samples, size)
	s.mu.Unlock()

	// Update max
	for {
		current := s.maxObserved.Load()
		if int64(size) <= current {
			break
		}
		if s.maxObserved.CompareAndSwap(current, int64(size)) {
			break
		}
	}
}

func (s *BufferStats) RecordBlockedSend(duration time.Duration) {
	s.blockedSends.Add(1)
	s.blockedTime.Add(int64(duration))
	s.totalSends.Add(1)
}

func (s *BufferStats) RecordSend() {
	s.totalSends.Add(1)
}

func (s *BufferStats) Report() {
	s.mu.Lock()
	defer s.mu.Unlock()

	if len(s.samples) == 0 {
		return
	}

	// Calculate average
	sum := 0
	for _, v := range s.samples {
		sum += v
	}
	avg := float64(sum) / float64(len(s.samples))

	blocked := s.blockedSends.Load()
	total := s.totalSends.Load()
	blockedPct := 0.0
	if total > 0 {
		blockedPct = float64(blocked) / float64(total) * 100
	}

	avgBlockTime := time.Duration(0)
	if blocked > 0 {
		avgBlockTime = time.Duration(s.blockedTime.Load() / blocked)
	}

	fmt.Printf(`
📊 Buffer Statistics:
   Average fill: %.2f items
   Max fill:     %d items
   Samples:      %d
   Total sends:  %d
   Blocked sends: %d (%.1f%%)
   Avg block time: %v
`,
		avg,
		s.maxObserved.Load(),
		len(s.samples),
		total,
		blocked,
		blockedPct,
		avgBlockTime,
	)
}

// MonitoredProducer sends tasks and tracks when it blocks
func MonitoredProducer(id int, tasks chan Task, stats *BufferStats,
	duration time.Duration, wg *sync.WaitGroup) {
	defer wg.Done()

	taskID := 0
	deadline := time.Now().Add(duration)

	for time.Now().Before(deadline) {
		task := Task{
			ID:         id*1000 + taskID,
			EnqueuedAt: time.Now(),
		}

		// Try to send and measure blocking
		start := time.Now()
		tasks <- task
		elapsed := time.Since(start)

		if elapsed > 1*time.Millisecond {
			stats.RecordBlockedSend(elapsed)
			fmt.Printf("[Producer %d] Blocked %v\n", id, elapsed)
		} else {
			stats.RecordSend()
		}

		taskID++
		time.Sleep(time.Duration(20+taskID*5) * time.Millisecond)
	}
}

// BufferMonitor periodically samples buffer size
func BufferMonitor(tasks chan Task, stats *BufferStats, interval time.Duration, stop <-chan struct{}) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			size := len(tasks)
			capacity := cap(tasks)
			stats.RecordSize(size)

			fillPct := float64(size) / float64(capacity) * 100
			fmt.Printf("📈 Buffer: %d/%d (%.0f%% full)\n", size, capacity, fillPct)

		case <-stop:
			return
		}
	}
}

func MonitoredConsumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
	defer wg.Done()

	for task := range tasks {
		queueTime := time.Since(task.EnqueuedAt)
		time.Sleep(time.Duration(80+id*20) * time.Millisecond) // Variable speed
		_ = queueTime
	}
}

func main() {
	fmt.Println("=== Bounded Buffer with Monitoring ===\n")

	const bufferSize = 10
	tasks := make(chan Task, bufferSize)
	stats := &BufferStats{}
	stopMonitor := make(chan struct{})

	// Start buffer monitor
	go BufferMonitor(tasks, stats, 200*time.Millisecond, stopMonitor)

	var producerWg, consumerWg sync.WaitGroup

	// Start consumers (slower)
	for i := 0; i < 2; i++ {
		consumerWg.Add(1)
		go MonitoredConsumer(i, tasks, &consumerWg)
	}

	// Start producers (faster)
	for i := 0; i < 4; i++ {
		producerWg.Add(1)
		go MonitoredProducer(i, tasks, stats, 2*time.Second, &producerWg)
	}

	producerWg.Wait()
	close(tasks)
	consumerWg.Wait()
	close(stopMonitor)

	stats.Report()
}

Choosing the Right Buffer Size

Buffer size is a critical tuning parameter:

Too Small (size = 1)

  • Maximum backpressure
  • Producers frequently blocked
  • Good for strict rate limiting
  • Low memory usage

Too Large (size = 10000)

  • Minimal backpressure
  • Approaches unbounded behavior
  • High latency for items
  • High memory usage

Just Right (size = 10-100)

  • Absorbs bursts
  • Some backpressure
  • Reasonable latency
  • Moderate memory

Rule of thumb: Buffer size ≈ (number of producers × expected burst size)

Backpressure Strategies

When the buffer is full, you have options:

1. Block (Default)

tasks <- task // Blocks until space available

Simple but can cause producer goroutines to pile up.

2. Drop Oldest

select {
case tasks <- task:
    // Sent successfully
default:
    // Buffer full, drop oldest
    <-tasks        // Remove oldest
    tasks <- task  // Add new
}

3. Drop Newest

select {
case tasks <- task:
    // Sent successfully
default:
    // Buffer full, drop this task
    dropped++
}

4. Timeout

select {
case tasks <- task:
    // Sent successfully
case <-time.After(100 * time.Millisecond):
    // Timeout, handle error
    return ErrTimeout
}

Real-World Example: HTTP Request Queue

package main

import (
	"fmt"
	"net/http"
	"sync/atomic"
	"time"
)

// RequestHandler with bounded request queue
type RequestHandler struct {
	queue        chan *http.Request
	rejectedReqs atomic.Int64
	processedReqs atomic.Int64
}

func NewRequestHandler(queueSize, numWorkers int) *RequestHandler {
	h := &RequestHandler{
		queue: make(chan *http.Request, queueSize),
	}

	// Start worker pool
	for i := 0; i < numWorkers; i++ {
		go h.worker(i)
	}

	return h
}

func (h *RequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// Try to enqueue with timeout
	select {
	case h.queue <- r:
		w.WriteHeader(http.StatusAccepted)
		fmt.Fprintf(w, "Request queued\n")

	case <-time.After(10 * time.Millisecond):
		// Queue full, reject with backpressure signal
		h.rejectedReqs.Add(1)
		w.Header().Set("Retry-After", "1")
		w.WriteHeader(http.StatusTooManyRequests)
		fmt.Fprintf(w, "Server busy, try again\n")
	}
}

func (h *RequestHandler) worker(id int) {
	for req := range h.queue {
		// Process request
		time.Sleep(100 * time.Millisecond)
		h.processedReqs.Add(1)
		_ = req
	}
}

func (h *RequestHandler) Stats() string {
	return fmt.Sprintf("Processed: %d, Rejected: %d, Queued: %d/%d",
		h.processedReqs.Load(),
		h.rejectedReqs.Load(),
		len(h.queue),
		cap(h.queue))
}

This pattern protects your server from overload!

Key Advantages of Bounded Buffers

Memory bounded - Cannot exhaust memory ✓ Natural backpressure - Slows producers automatically ✓ Prevents cascading failure - Doesn’t accept more than it can handle ✓ Predictable latency - Queue depth bounded ✓ Load shedding - Can drop when overloaded

When to Use Bounded Buffers

Use when:

  • Memory is constrained
  • You need backpressure
  • Latency bounds are important
  • You want to prevent overload

Consider unbounded when:

  • You need to handle all requests
  • Producers and consumers are balanced
  • Memory is not a concern
  • Bursts are critical to absorb

Performance Tuning Tips

  1. Monitor buffer fill rate - Track average and max
  2. Measure blocking time - How often are producers blocked?
  3. Right-size the buffer - Based on actual production metrics
  4. Add alerting - Alert when buffer is consistently full
  5. Load test - Verify behavior under stress

Next Up: Multiple Producers and Consumers

In the next article, we’ll scale up to multiple producers and multiple consumers, exploring work distribution, load balancing, and result aggregation!

Try It Yourself

Experiment with the code:

  1. Vary buffer size - Try 1, 10, 100, 1000
  2. Add slow consumers - See backpressure in action
  3. Implement drop policies - Drop oldest vs newest
  4. Add priorities - Use multiple queues
  5. Benchmark - Measure throughput vs buffer size

This is part 5 of “Golang Experiments: Classic Concurrency Problems”