From Unbounded to Bounded
In the previous article, we explored the unbounded buffer pattern where the queue could grow infinitely. This works until you run out of memory!
The bounded buffer adds a crucial constraint: maximum queue size. This introduces backpressure - when the buffer is full, producers must wait for consumers to catch up.
Why Bounded Buffers Matter
Bounded buffers appear everywhere in production systems:
- TCP sliding windows (flow control)
- HTTP/2 stream flow control (prevents overwhelm)
- Message queue limits (RabbitMQ, Kafka partition limits)
- Thread pool queues (bounded task queues)
- Rate limiters (token buckets with finite capacity)
- Circuit breakers (limit concurrent requests)
The key benefit: Bounded buffers provide natural backpressure and prevent resource exhaustion.
Go’s Built-in Solution: Buffered Channels
Go makes bounded buffers trivial with buffered channels:
buffer := make(chan Job, 10) // Maximum 10 items
When the buffer is full:
- Sends block until space is available
- Receives block when empty
- Perfect for producer-consumer coordination!
Basic Bounded Buffer Implementation
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Task represents a unit of work
type Task struct {
ID int
Priority int
Data string
EnqueuedAt time.Time
ProcessedAt time.Time
}
// Producer creates tasks and sends them to the bounded buffer
func Producer(id int, tasks chan<- Task, duration time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
taskCount := 0
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
task := Task{
ID: id*1000 + taskCount,
Priority: rand.Intn(3),
Data: fmt.Sprintf("Task from Producer %d", id),
EnqueuedAt: time.Now(),
}
fmt.Printf("[Producer %d] Attempting to enqueue task %d (buffer: %d/%d)\n",
id, task.ID, len(tasks), cap(tasks))
// This will BLOCK if buffer is full
start := time.Now()
tasks <- task
blocked := time.Since(start)
if blocked > 10*time.Millisecond {
fmt.Printf("[Producer %d] ⚠️ BLOCKED for %v waiting for buffer space\n",
id, blocked)
}
taskCount++
time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond)
}
fmt.Printf("[Producer %d] Finished, created %d tasks\n", id, taskCount)
}
// Consumer processes tasks from the bounded buffer
func Consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
processed := 0
for task := range tasks {
task.ProcessedAt = time.Now()
queueTime := task.ProcessedAt.Sub(task.EnqueuedAt)
fmt.Printf("[Consumer %d] Processing task %d (queued for %v)\n",
id, task.ID, queueTime)
// Simulate processing
processingTime := time.Duration(100+rand.Intn(200)) * time.Millisecond
time.Sleep(processingTime)
processed++
if queueTime > 500*time.Millisecond {
fmt.Printf("[Consumer %d] ⚠️ Task %d had high latency: %v\n",
id, task.ID, queueTime)
}
}
fmt.Printf("[Consumer %d] Finished, processed %d tasks\n", id, processed)
}
func main() {
fmt.Println("=== Producer-Consumer: Bounded Buffer ===")
fmt.Println()
const (
bufferSize = 5
numProducers = 3
numConsumers = 2
duration = 3 * time.Second
)
// Create bounded buffer
tasks := make(chan Task, bufferSize)
fmt.Printf("Buffer capacity: %d\n", bufferSize)
fmt.Printf("Producers: %d, Consumers: %d\n", numProducers, numConsumers)
fmt.Printf("Duration: %v\n\n", duration)
var producerWg, consumerWg sync.WaitGroup
// Start consumers
for i := 0; i < numConsumers; i++ {
consumerWg.Add(1)
go Consumer(i, tasks, &consumerWg)
}
// Start producers
for i := 0; i < numProducers; i++ {
producerWg.Add(1)
go Producer(i, tasks, duration, &producerWg)
}
// Wait for producers
producerWg.Wait()
fmt.Println("\n✓ All producers finished")
close(tasks)
// Wait for consumers
consumerWg.Wait()
fmt.Println("✓ All consumers finished")
fmt.Println("✓ Pipeline complete!")
}
Visualizing Backpressure
Here’s what happens when producers are faster than consumers:
Time 0: Buffer [empty]
Producers sending fast, consumers slow
Time 1: Buffer [T1][T2][T3][T4][T5] FULL!
Next producer send will BLOCK
Time 2: Consumer processes T1
Buffer [T2][T3][T4][T5][__]
Blocked producer unblocks, sends T6
Buffer [T2][T3][T4][T5][T6] FULL again
Time 3: Consumer processes T2
Buffer [T3][T4][T5][T6][__]
Backpressure is working!
Advanced: Monitoring Buffer Health
Let’s add real-time buffer monitoring:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// BufferStats tracks buffer usage over time
type BufferStats struct {
samples []int
blockedSends atomic.Int64
blockedTime atomic.Int64 // nanoseconds
totalSends atomic.Int64
maxObserved atomic.Int64
mu sync.Mutex
}
func (s *BufferStats) RecordSize(size int) {
s.mu.Lock()
s.samples = append(s.samples, size)
s.mu.Unlock()
// Update max
for {
current := s.maxObserved.Load()
if int64(size) <= current {
break
}
if s.maxObserved.CompareAndSwap(current, int64(size)) {
break
}
}
}
func (s *BufferStats) RecordBlockedSend(duration time.Duration) {
s.blockedSends.Add(1)
s.blockedTime.Add(int64(duration))
s.totalSends.Add(1)
}
func (s *BufferStats) RecordSend() {
s.totalSends.Add(1)
}
func (s *BufferStats) Report() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.samples) == 0 {
return
}
// Calculate average
sum := 0
for _, v := range s.samples {
sum += v
}
avg := float64(sum) / float64(len(s.samples))
blocked := s.blockedSends.Load()
total := s.totalSends.Load()
blockedPct := 0.0
if total > 0 {
blockedPct = float64(blocked) / float64(total) * 100
}
avgBlockTime := time.Duration(0)
if blocked > 0 {
avgBlockTime = time.Duration(s.blockedTime.Load() / blocked)
}
fmt.Printf(`
📊 Buffer Statistics:
Average fill: %.2f items
Max fill: %d items
Samples: %d
Total sends: %d
Blocked sends: %d (%.1f%%)
Avg block time: %v
`,
avg,
s.maxObserved.Load(),
len(s.samples),
total,
blocked,
blockedPct,
avgBlockTime,
)
}
// MonitoredProducer sends tasks and tracks when it blocks
func MonitoredProducer(id int, tasks chan Task, stats *BufferStats,
duration time.Duration, wg *sync.WaitGroup) {
defer wg.Done()
taskID := 0
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
task := Task{
ID: id*1000 + taskID,
EnqueuedAt: time.Now(),
}
// Try to send and measure blocking
start := time.Now()
tasks <- task
elapsed := time.Since(start)
if elapsed > 1*time.Millisecond {
stats.RecordBlockedSend(elapsed)
fmt.Printf("[Producer %d] Blocked %v\n", id, elapsed)
} else {
stats.RecordSend()
}
taskID++
time.Sleep(time.Duration(20+taskID*5) * time.Millisecond)
}
}
// BufferMonitor periodically samples buffer size
func BufferMonitor(tasks chan Task, stats *BufferStats, interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
size := len(tasks)
capacity := cap(tasks)
stats.RecordSize(size)
fillPct := float64(size) / float64(capacity) * 100
fmt.Printf("📈 Buffer: %d/%d (%.0f%% full)\n", size, capacity, fillPct)
case <-stop:
return
}
}
}
func MonitoredConsumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
queueTime := time.Since(task.EnqueuedAt)
time.Sleep(time.Duration(80+id*20) * time.Millisecond) // Variable speed
_ = queueTime
}
}
func main() {
fmt.Println("=== Bounded Buffer with Monitoring ===\n")
const bufferSize = 10
tasks := make(chan Task, bufferSize)
stats := &BufferStats{}
stopMonitor := make(chan struct{})
// Start buffer monitor
go BufferMonitor(tasks, stats, 200*time.Millisecond, stopMonitor)
var producerWg, consumerWg sync.WaitGroup
// Start consumers (slower)
for i := 0; i < 2; i++ {
consumerWg.Add(1)
go MonitoredConsumer(i, tasks, &consumerWg)
}
// Start producers (faster)
for i := 0; i < 4; i++ {
producerWg.Add(1)
go MonitoredProducer(i, tasks, stats, 2*time.Second, &producerWg)
}
producerWg.Wait()
close(tasks)
consumerWg.Wait()
close(stopMonitor)
stats.Report()
}
Choosing the Right Buffer Size
Buffer size is a critical tuning parameter:
Too Small (size = 1)
- Maximum backpressure
- Producers frequently blocked
- Good for strict rate limiting
- Low memory usage
Too Large (size = 10000)
- Minimal backpressure
- Approaches unbounded behavior
- High latency for items
- High memory usage
Just Right (size = 10-100)
- Absorbs bursts
- Some backpressure
- Reasonable latency
- Moderate memory
Rule of thumb: Buffer size ≈ (number of producers × expected burst size)
Backpressure Strategies
When the buffer is full, you have options:
1. Block (Default)
tasks <- task // Blocks until space available
Simple but can cause producer goroutines to pile up.
2. Drop Oldest
select {
case tasks <- task:
// Sent successfully
default:
// Buffer full, drop oldest
<-tasks // Remove oldest
tasks <- task // Add new
}
3. Drop Newest
select {
case tasks <- task:
// Sent successfully
default:
// Buffer full, drop this task
dropped++
}
4. Timeout
select {
case tasks <- task:
// Sent successfully
case <-time.After(100 * time.Millisecond):
// Timeout, handle error
return ErrTimeout
}
Real-World Example: HTTP Request Queue
package main
import (
"fmt"
"net/http"
"sync/atomic"
"time"
)
// RequestHandler with bounded request queue
type RequestHandler struct {
queue chan *http.Request
rejectedReqs atomic.Int64
processedReqs atomic.Int64
}
func NewRequestHandler(queueSize, numWorkers int) *RequestHandler {
h := &RequestHandler{
queue: make(chan *http.Request, queueSize),
}
// Start worker pool
for i := 0; i < numWorkers; i++ {
go h.worker(i)
}
return h
}
func (h *RequestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Try to enqueue with timeout
select {
case h.queue <- r:
w.WriteHeader(http.StatusAccepted)
fmt.Fprintf(w, "Request queued\n")
case <-time.After(10 * time.Millisecond):
// Queue full, reject with backpressure signal
h.rejectedReqs.Add(1)
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusTooManyRequests)
fmt.Fprintf(w, "Server busy, try again\n")
}
}
func (h *RequestHandler) worker(id int) {
for req := range h.queue {
// Process request
time.Sleep(100 * time.Millisecond)
h.processedReqs.Add(1)
_ = req
}
}
func (h *RequestHandler) Stats() string {
return fmt.Sprintf("Processed: %d, Rejected: %d, Queued: %d/%d",
h.processedReqs.Load(),
h.rejectedReqs.Load(),
len(h.queue),
cap(h.queue))
}
This pattern protects your server from overload!
Key Advantages of Bounded Buffers
✓ Memory bounded - Cannot exhaust memory ✓ Natural backpressure - Slows producers automatically ✓ Prevents cascading failure - Doesn’t accept more than it can handle ✓ Predictable latency - Queue depth bounded ✓ Load shedding - Can drop when overloaded
When to Use Bounded Buffers
✓ Use when:
- Memory is constrained
- You need backpressure
- Latency bounds are important
- You want to prevent overload
✗ Consider unbounded when:
- You need to handle all requests
- Producers and consumers are balanced
- Memory is not a concern
- Bursts are critical to absorb
Performance Tuning Tips
- Monitor buffer fill rate - Track average and max
- Measure blocking time - How often are producers blocked?
- Right-size the buffer - Based on actual production metrics
- Add alerting - Alert when buffer is consistently full
- Load test - Verify behavior under stress
Next Up: Multiple Producers and Consumers
In the next article, we’ll scale up to multiple producers and multiple consumers, exploring work distribution, load balancing, and result aggregation!
Try It Yourself
Experiment with the code:
- Vary buffer size - Try 1, 10, 100, 1000
- Add slow consumers - See backpressure in action
- Implement drop policies - Drop oldest vs newest
- Add priorities - Use multiple queues
- Benchmark - Measure throughput vs buffer size
This is part 5 of “Golang Experiments: Classic Concurrency Problems”