Go Concurrency Patterns Series: Series Overview | Goroutine Basics | Channel Fundamentals
Introduction: Welcome to Go Coffee Shop
Imagine running a busy coffee shop. You have customers placing orders, baristas making drinks, shared equipment like espresso machines and milk steamers, and the constant challenge of managing it all efficiently. This is exactly what concurrent programming in Go is like - and goroutines are your baristas!
In this comprehensive guide, we’ll explore Go’s concurrency patterns through the lens of running a coffee shop. By the end, you’ll understand not just how to write concurrent Go code, but why these patterns work and when to use them.
Channels] Queue[Order Queue
Buffered Channels] Barista1[Barista 1
Goroutine] Barista2[Barista 2
Goroutine] Barista3[Barista 3
Goroutine] Espresso[Espresso Machine
Mutex] Steamer[Milk Steamer
Semaphore] Customer --> OrderWindow OrderWindow --> Queue Queue --> Barista1 Queue --> Barista2 Queue --> Barista3 Barista1 -.-> Espresso Barista2 -.-> Espresso Barista3 -.-> Steamer style OrderWindow fill:#e1f5ff style Queue fill:#fff4e1 style Espresso fill:#ffe1e1 style Steamer fill:#ffe1e1
Part 1: Orders and Baristas
The Order Window - Channels as Communication
In a coffee shop, the order window is where customers communicate their orders to baristas. In Go, channels serve this exact purpose - they’re the communication mechanism between goroutines.
package main
import (
"fmt"
"time"
)
// Order represents a customer order
type Order struct {
CustomerName string
Drink string
OrderTime time.Time
}
// Simple order window (unbuffered channel)
func simpleOrderWindow() {
// Create an order channel - like having one order window
orders := make(chan Order)
// Start a barista (goroutine)
go func() {
for order := range orders {
fmt.Printf("Barista received order: %s for %s\n",
order.Drink, order.CustomerName)
// Simulate making the drink
time.Sleep(500 * time.Millisecond)
fmt.Printf("Completed: %s for %s\n",
order.Drink, order.CustomerName)
}
}()
// Customers placing orders
orders <- Order{"Alice", "Latte", time.Now()}
orders <- Order{"Bob", "Cappuccino", time.Now()}
orders <- Order{"Charlie", "Espresso", time.Now()}
close(orders)
time.Sleep(2 * time.Second) // Wait for orders to complete
}
Key Concept: Unbuffered channels block both the sender (customer) and receiver (barista) until both are ready. The customer waits at the window until a barista is ready to take the order.
The Order Queue - Buffered Channels
In reality, coffee shops have a queue - customers can place orders even if baristas are busy. This is what buffered channels provide.
// Order queue with buffered channel
func orderQueueSystem() {
// Buffer size of 5 - can hold 5 orders before blocking
orders := make(chan Order, 5)
// Single barista working through the queue
go func() {
for order := range orders {
fmt.Printf("[%s] Starting: %s for %s\n",
time.Now().Format("15:04:05"), order.Drink, order.CustomerName)
time.Sleep(1 * time.Second) // Making the drink takes time
fmt.Printf("[%s] Completed: %s for %s\n",
time.Now().Format("15:04:05"), order.Drink, order.CustomerName)
}
}()
// Customers can quickly place orders without waiting
customers := []Order{
{"Alice", "Latte", time.Now()},
{"Bob", "Cappuccino", time.Now()},
{"Charlie", "Espresso", time.Now()},
{"Diana", "Mocha", time.Now()},
{"Eve", "Americano", time.Now()},
}
start := time.Now()
for _, order := range customers {
orders <- order
fmt.Printf("[%s] Queued order for %s\n",
time.Now().Format("15:04:05"), order.CustomerName)
}
fmt.Printf("All orders queued in %v\n", time.Since(start))
close(orders)
time.Sleep(6 * time.Second) // Wait for all drinks
}
Output:
[14:30:00] Queued order for Alice
[14:30:00] Queued order for Bob
[14:30:00] Queued order for Charlie
[14:30:00] Queued order for Diana
[14:30:00] Queued order for Eve
All orders queued in 2ms
[14:30:00] Starting: Latte for Alice
[14:30:01] Completed: Latte for Alice
[14:30:01] Starting: Cappuccino for Bob
[14:30:02] Completed: Cappuccino for Bob
...
Hiring More Baristas - Worker Pool Pattern
One barista is too slow during rush hour. Let’s hire more workers!
package main
import (
"fmt"
"sync"
"time"
)
// WorkerPool manages multiple barista goroutines
type CoffeeShop struct {
orders chan Order
numBaristas int
wg sync.WaitGroup
}
func NewCoffeeShop(numBaristas int, queueSize int) *CoffeeShop {
return &CoffeeShop{
orders: make(chan Order, queueSize),
numBaristas: numBaristas,
}
}
func (cs *CoffeeShop) Start() {
// Hire baristas (start worker goroutines)
for i := 1; i <= cs.numBaristas; i++ {
cs.wg.Add(1)
go cs.barista(i)
}
fmt.Printf("Coffee shop opened with %d baristas\n", cs.numBaristas)
}
func (cs *CoffeeShop) barista(id int) {
defer cs.wg.Done()
for order := range cs.orders {
fmt.Printf("Barista %d: Making %s for %s\n",
id, order.Drink, order.CustomerName)
// Simulate drink preparation time
preparationTime := time.Duration(500+id*100) * time.Millisecond
time.Sleep(preparationTime)
fmt.Printf("Barista %d: Completed %s for %s (took %v)\n",
id, order.Drink, order.CustomerName, preparationTime)
}
fmt.Printf("Barista %d: Shift ended\n", id)
}
func (cs *CoffeeShop) PlaceOrder(order Order) {
cs.orders <- order
}
func (cs *CoffeeShop) Close() {
close(cs.orders) // No more orders
cs.wg.Wait() // Wait for all baristas to finish
fmt.Println("Coffee shop closed")
}
func workerPoolExample() {
shop := NewCoffeeShop(3, 10) // 3 baristas, queue of 10
shop.Start()
// Rush hour - many orders
customers := []string{"Alice", "Bob", "Charlie", "Diana", "Eve",
"Frank", "Grace", "Henry", "Ivy", "Jack"}
start := time.Now()
for _, customer := range customers {
shop.PlaceOrder(Order{
CustomerName: customer,
Drink: "Latte",
OrderTime: time.Now(),
})
}
shop.Close()
fmt.Printf("Total time: %v\n", time.Since(start))
}
Closing Time - Graceful Shutdown with WaitGroup
At closing time, you don’t just kick everyone out - you finish pending orders first. This is graceful shutdown.
type GracefulShop struct {
orders chan Order
shutdown chan struct{}
wg sync.WaitGroup
once sync.Once
}
func NewGracefulShop(queueSize int) *GracefulShop {
return &GracefulShop{
orders: make(chan Order, queueSize),
shutdown: make(chan struct{}),
}
}
func (gs *GracefulShop) StartBarista(id int) {
gs.wg.Add(1)
go func() {
defer gs.wg.Done()
defer fmt.Printf("Barista %d: Clocked out\n", id)
for {
select {
case order, ok := <-gs.orders:
if !ok {
// Channel closed, finish up
return
}
fmt.Printf("Barista %d: Making %s\n", id, order.Drink)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Barista %d: Done with %s\n", id, order.Drink)
case <-gs.shutdown:
// Shutdown signal received
fmt.Printf("Barista %d: Shutdown signal received, finishing current orders\n", id)
// Continue processing remaining orders in channel
for order := range gs.orders {
fmt.Printf("Barista %d: Finishing %s\n", id, order.Drink)
time.Sleep(500 * time.Millisecond)
}
return
}
}
}()
}
func (gs *GracefulShop) PlaceOrder(order Order) bool {
select {
case gs.orders <- order:
return true
case <-gs.shutdown:
fmt.Println("Sorry, we're closing!")
return false
}
}
func (gs *GracefulShop) Close() {
gs.once.Do(func() {
fmt.Println("Closing time! No new orders.")
close(gs.shutdown) // Signal shutdown
close(gs.orders) // No more orders
gs.wg.Wait() // Wait for all baristas
fmt.Println("Shop closed successfully")
})
}
The Impatient Customer - Timeouts and Context
Some customers won’t wait forever. We need timeouts!
package main
import (
"context"
"fmt"
"time"
)
func customerWithTimeout() {
orders := make(chan Order)
// Barista taking orders
go func() {
time.Sleep(2 * time.Second) // Barista is slow
order := <-orders
fmt.Printf("Received order: %s\n", order.Drink)
}()
// Impatient customer with timeout
timeout := time.After(1 * time.Second)
select {
case orders <- Order{"Alice", "Latte", time.Now()}:
fmt.Println("Order placed successfully")
case <-timeout:
fmt.Println("Customer left - waited too long!")
}
}
// Better approach using context
type ContextualShop struct {
orders chan Order
}
func (cs *ContextualShop) PlaceOrderWithContext(ctx context.Context, order Order) error {
select {
case cs.orders <- order:
return nil
case <-ctx.Done():
return fmt.Errorf("order cancelled: %w", ctx.Err())
}
}
func contextExample() {
shop := &ContextualShop{
orders: make(chan Order, 5),
}
// Start barista
go func() {
for order := range shop.orders {
fmt.Printf("Making: %s\n", order.Drink)
time.Sleep(500 * time.Millisecond)
}
}()
// Customer with 2-second patience
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Place multiple orders
orders := []Order{
{"Alice", "Latte", time.Now()},
{"Bob", "Cappuccino", time.Now()},
{"Charlie", "Espresso", time.Now()},
}
for _, order := range orders {
if err := shop.PlaceOrderWithContext(ctx, order); err != nil {
fmt.Printf("Failed to place order for %s: %v\n", order.CustomerName, err)
} else {
fmt.Printf("Order placed for %s\n", order.CustomerName)
}
time.Sleep(800 * time.Millisecond)
}
time.Sleep(2 * time.Second)
}
Part 2: Shared Equipment
One Espresso Machine - Mutex
Your coffee shop has one espresso machine. Multiple baristas can’t use it simultaneously - they need to take turns. This is where Mutex (mutual exclusion) comes in.
package main
import (
"fmt"
"sync"
"time"
)
// EspressoMachine can only be used by one barista at a time
type EspressoMachine struct {
mu sync.Mutex
shotsМade int
currentUser string
}
func (em *EspressoMachine) MakeEspresso(baristaID string) {
// Lock the machine - wait if someone else is using it
em.mu.Lock()
defer em.mu.Unlock() // Always unlock when done
em.currentUser = baristaID
fmt.Printf("[%s] %s: Started using espresso machine\n",
time.Now().Format("15:04:05.000"), baristaID)
// Making espresso takes time
time.Sleep(300 * time.Millisecond)
em.shotsМade++
fmt.Printf("[%s] %s: Finished espresso shot #%d\n",
time.Now().Format("15:04:05.000"), baristaID, em.shotsМade)
}
func mutexExample() {
machine := &EspressoMachine{}
var wg sync.WaitGroup
// Three baristas trying to use the machine
baristas := []string{"Alice", "Bob", "Charlie"}
for _, barista := range baristas {
wg.Add(1)
go func(name string) {
defer wg.Done()
// Each barista makes 3 shots
for i := 0; i < 3; i++ {
machine.MakeEspresso(name)
time.Sleep(100 * time.Millisecond) // Brief pause
}
}(barista)
}
wg.Wait()
fmt.Printf("\nTotal shots made: %d\n", machine.shotsМade)
}
Without Mutex - Race Condition:
// DON'T DO THIS - Race condition!
type UnsafeCounter struct {
count int
}
func (uc *UnsafeCounter) Increment() {
// This is NOT thread-safe
uc.count++ // Read, increment, write - can be interrupted!
}
// With Mutex - Thread-safe
type SafeCounter struct {
mu sync.Mutex
count int
}
func (sc *SafeCounter) Increment() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.count++ // Protected by mutex
}
The Milk Steamer - Semaphore with Channels
You have 2 milk steamers. Unlike the espresso machine, multiple baristas can use them - but only 2 at a time. This is a semaphore.
// Semaphore using buffered channel
type MilkSteamer struct {
permits chan struct{}
name string
}
func NewMilkSteamer(capacity int) *MilkSteamer {
return &MilkSteamer{
permits: make(chan struct{}, capacity),
name: "Milk Steamer",
}
}
func (ms *MilkSteamer) Acquire(baristaID string) {
fmt.Printf("%s: Waiting for milk steamer...\n", baristaID)
ms.permits <- struct{}{} // Acquire permit
fmt.Printf("%s: Got milk steamer!\n", baristaID)
}
func (ms *MilkSteamer) Release(baristaID string) {
<-ms.permits // Release permit
fmt.Printf("%s: Released milk steamer\n", baristaID)
}
func (ms *MilkSteamer) SteamMilk(baristaID string) {
ms.Acquire(baristaID)
defer ms.Release(baristaID)
fmt.Printf("%s: Steaming milk...\n", baristaID)
time.Sleep(500 * time.Millisecond)
fmt.Printf("%s: Milk steamed!\n", baristaID)
}
func semaphoreExample() {
steamer := NewMilkSteamer(2) // 2 steamers available
var wg sync.WaitGroup
// 5 baristas need to steam milk
baristas := []string{"Alice", "Bob", "Charlie", "Diana", "Eve"}
for _, barista := range baristas {
wg.Add(1)
go func(name string) {
defer wg.Done()
steamer.SteamMilk(name)
}(barista)
}
wg.Wait()
}
Advanced Semaphore with Weighted Acquire:
type WeightedSemaphore struct {
permits chan struct{}
}
func NewWeightedSemaphore(max int) *WeightedSemaphore {
return &WeightedSemaphore{
permits: make(chan struct{}, max),
}
}
func (ws *WeightedSemaphore) Acquire(n int) {
for i := 0; i < n; i++ {
ws.permits <- struct{}{}
}
}
func (ws *WeightedSemaphore) Release(n int) {
for i := 0; i < n; i++ {
<-ws.permits
}
}
// Example: Large orders need more resources
func weightedSemaphoreExample() {
sem := NewWeightedSemaphore(5) // 5 units of capacity
var wg sync.WaitGroup
orders := []struct {
name string
size int // How many units needed
drinks int // Number of drinks
}{
{"Small order", 1, 1},
{"Medium order", 2, 2},
{"Large order", 3, 5},
}
for _, order := range orders {
wg.Add(1)
go func(o struct {
name string
size int
drinks int
}) {
defer wg.Done()
sem.Acquire(o.size)
defer sem.Release(o.size)
fmt.Printf("%s: Processing %d drinks\n", o.name, o.drinks)
time.Sleep(time.Duration(o.drinks) * 200 * time.Millisecond)
fmt.Printf("%s: Completed\n", o.name)
}(order)
}
wg.Wait()
}
Reading the Menu - RWMutex
The menu is read frequently but updated rarely. Many people can read it simultaneously, but when updating it, everyone must wait. This is RWMutex.
type Menu struct {
mu sync.RWMutex
items map[string]float64
}
func NewMenu() *Menu {
return &Menu{
items: make(map[string]float64),
}
}
// Many baristas can read prices simultaneously
func (m *Menu) GetPrice(item string) (float64, bool) {
m.mu.RLock() // Read lock - multiple readers allowed
defer m.mu.RUnlock()
fmt.Printf("Reading price for %s\n", item)
time.Sleep(50 * time.Millisecond) // Simulate read time
price, ok := m.items[item]
return price, ok
}
// Only one barista can update prices at a time
func (m *Menu) UpdatePrice(item string, price float64) {
m.mu.Lock() // Write lock - exclusive access
defer m.mu.Unlock()
fmt.Printf("Updating price for %s to $%.2f\n", item, price)
time.Sleep(100 * time.Millisecond) // Simulate update time
m.items[item] = price
}
// Get all items (read operation)
func (m *Menu) GetAllItems() map[string]float64 {
m.mu.RLock()
defer m.mu.RUnlock()
// Create a copy to avoid external modifications
items := make(map[string]float64, len(m.items))
for k, v := range m.items {
items[k] = v
}
return items
}
func rwMutexExample() {
menu := NewMenu()
var wg sync.WaitGroup
// Initialize menu
menu.UpdatePrice("Latte", 4.50)
menu.UpdatePrice("Cappuccino", 4.00)
menu.UpdatePrice("Espresso", 3.00)
// 10 baristas reading prices (can happen simultaneously)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
price, _ := menu.GetPrice("Latte")
fmt.Printf("Barista %d: Latte costs $%.2f\n", id, price)
}(i)
}
// 1 manager updating price (blocks all readers)
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
menu.UpdatePrice("Latte", 4.75)
}()
wg.Wait()
}
The Recipe Book - sync.Once
The coffee shop has a recipe book that needs to be loaded once when the shop opens. Multiple baristas might try to load it, but we only want it loaded once. This is sync.Once.
type RecipeBook struct {
recipes map[string][]string
once sync.Once
}
func (rb *RecipeBook) Load() {
rb.once.Do(func() {
fmt.Println("Loading recipe book (expensive operation)...")
time.Sleep(1 * time.Second) // Simulate loading
rb.recipes = map[string][]string{
"Latte": {
"Pull espresso shot",
"Steam milk to 150°F",
"Pour milk over espresso",
"Create latte art",
},
"Cappuccino": {
"Pull espresso shot",
"Steam milk to 140°F",
"Add foam",
"Dust with cocoa",
},
}
fmt.Println("Recipe book loaded!")
})
}
func (rb *RecipeBook) GetRecipe(drink string) []string {
rb.Load() // Will only execute once, even if called multiple times
return rb.recipes[drink]
}
func syncOnceExample() {
recipeBook := &RecipeBook{}
var wg sync.WaitGroup
// 5 baristas all try to load the recipe book
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Barista %d: Getting recipe...\n", id)
recipe := recipeBook.GetRecipe("Latte")
fmt.Printf("Barista %d: Got %d steps\n", id, len(recipe))
}(i)
}
wg.Wait()
// Output shows "Loading recipe book" only once!
}
Real-world use case - Database Connection Pool:
type ConnectionPool struct {
pool *sql.DB
once sync.Once
}
var dbPool *ConnectionPool
func GetConnectionPool() *ConnectionPool {
if dbPool == nil {
dbPool = &ConnectionPool{}
}
dbPool.once.Do(func() {
fmt.Println("Initializing connection pool...")
// This expensive operation happens only once
db, err := sql.Open("postgres", "connection-string")
if err != nil {
panic(err)
}
dbPool.pool = db
})
return dbPool
}
Part 3: Making the Drink
The Latte Pipeline - Pipeline Pattern
Making a latte is a multi-stage process. We can create a pipeline where each stage is handled by different goroutines.
package main
import (
"fmt"
"time"
)
// Pipeline stages
func grindBeans(orders <-chan Order) <-chan Order {
ground := make(chan Order)
go func() {
defer close(ground)
for order := range orders {
fmt.Printf("Grinding beans for %s's %s\n",
order.CustomerName, order.Drink)
time.Sleep(200 * time.Millisecond)
ground <- order
}
}()
return ground
}
func pullEspresso(ground <-chan Order) <-chan Order {
espresso := make(chan Order)
go func() {
defer close(espresso)
for order := range ground {
fmt.Printf("Pulling espresso for %s's %s\n",
order.CustomerName, order.Drink)
time.Sleep(300 * time.Millisecond)
espresso <- order
}
}()
return espresso
}
func steamMilk(espresso <-chan Order) <-chan Order {
steamed := make(chan Order)
go func() {
defer close(steamed)
for order := range espresso {
fmt.Printf("Steaming milk for %s's %s\n",
order.CustomerName, order.Drink)
time.Sleep(250 * time.Millisecond)
steamed <- order
}
}()
return steamed
}
func createLatteArt(steamed <-chan Order) <-chan Order {
finished := make(chan Order)
go func() {
defer close(finished)
for order := range steamed {
fmt.Printf("Creating latte art for %s\n", order.CustomerName)
time.Sleep(150 * time.Millisecond)
finished <- order
}
}()
return finished
}
func pipelineExample() {
// Create orders channel
orders := make(chan Order)
// Build the pipeline
stage1 := grindBeans(orders)
stage2 := pullEspresso(stage1)
stage3 := steamMilk(stage2)
finished := createLatteArt(stage3)
// Send orders
go func() {
customers := []string{"Alice", "Bob", "Charlie"}
for _, customer := range customers {
orders <- Order{CustomerName: customer, Drink: "Latte", OrderTime: time.Now()}
}
close(orders)
}()
// Receive finished drinks
for order := range finished {
fmt.Printf("✓ Completed: %s for %s\n", order.Drink, order.CustomerName)
}
}
Grinding and Steaming Together - Fan-out
During rush hour, one barista per stage isn’t enough. Let’s add multiple workers to the slow stages. This is fan-out.
// Fan-out: Multiple workers for expensive stage
func pullEspressoFanOut(ground <-chan Order, numWorkers int) <-chan Order {
espresso := make(chan Order)
var wg sync.WaitGroup
// Start multiple workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for order := range ground {
fmt.Printf("Worker %d: Pulling espresso for %s\n",
workerID, order.CustomerName)
time.Sleep(300 * time.Millisecond)
espresso <- order
}
}(i)
}
// Close output channel when all workers done
go func() {
wg.Wait()
close(espresso)
}()
return espresso
}
func fanOutExample() {
orders := make(chan Order, 10)
// Send many orders
go func() {
for i := 0; i < 10; i++ {
orders <- Order{
CustomerName: fmt.Sprintf("Customer-%d", i),
Drink: "Espresso",
OrderTime: time.Now(),
}
}
close(orders)
}()
// Fan-out to 3 workers
ground := grindBeans(orders)
espresso := pullEspressoFanOut(ground, 3) // 3 workers!
// Collect results
count := 0
for range espresso {
count++
}
fmt.Printf("Completed %d orders\n", count)
}
Combining Ingredients - Fan-in
Some drinks need multiple components prepared in parallel (espresso + steamed milk). We need to combine them. This is fan-in.
type DrinkComponent struct {
OrderID string
Component string
Ready bool
}
// Fan-in: Merge multiple channels into one
func fanIn(channels ...<-chan DrinkComponent) <-chan DrinkComponent {
merged := make(chan DrinkComponent)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for _, ch := range channels {
wg.Add(1)
go func(c <-chan DrinkComponent) {
defer wg.Done()
for component := range c {
merged <- component
}
}(ch)
}
// Close merged channel when all inputs are done
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func prepareEspresso(orderID string) <-chan DrinkComponent {
ch := make(chan DrinkComponent)
go func() {
defer close(ch)
fmt.Printf("Preparing espresso for order %s\n", orderID)
time.Sleep(300 * time.Millisecond)
ch <- DrinkComponent{OrderID: orderID, Component: "Espresso", Ready: true}
}()
return ch
}
func prepareSteamedMilk(orderID string) <-chan DrinkComponent {
ch := make(chan DrinkComponent)
go func() {
defer close(ch)
fmt.Printf("Steaming milk for order %s\n", orderID)
time.Sleep(250 * time.Millisecond)
ch <- DrinkComponent{OrderID: orderID, Component: "Milk", Ready: true}
}()
return ch
}
func fanInExample() {
orderID := "ORD-001"
// Prepare components in parallel
espressoChan := prepareEspresso(orderID)
milkChan := prepareSteamedMilk(orderID)
// Fan-in: Wait for both components
components := fanIn(espressoChan, milkChan)
// Collect components
ready := make(map[string]bool)
for component := range components {
fmt.Printf("Received: %s for order %s\n",
component.Component, component.OrderID)
ready[component.Component] = true
// When both ready, combine them
if ready["Espresso"] && ready["Milk"] {
fmt.Printf("Combining espresso and milk for order %s\n", orderID)
fmt.Println("Latte ready!")
}
}
}
The Failed Shot - Error Handling in Pipelines
Sometimes the espresso machine malfunctions or we run out of beans. Pipelines need error handling!
type Result struct {
Order Order
Error error
}
func grindBeansWithErrors(orders <-chan Order) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
beansLeft := 5 // Limited beans
for order := range orders {
if beansLeft <= 0 {
results <- Result{
Order: order,
Error: fmt.Errorf("out of beans"),
}
continue
}
fmt.Printf("Grinding beans for %s\n", order.CustomerName)
time.Sleep(200 * time.Millisecond)
beansLeft--
results <- Result{Order: order, Error: nil}
}
}()
return results
}
func pullEspressoWithErrors(results <-chan Result) <-chan Result {
output := make(chan Result)
go func() {
defer close(output)
for result := range results {
// Propagate previous errors
if result.Error != nil {
output <- result
continue
}
// Simulate occasional machine failure
if time.Now().Unix()%3 == 0 {
output <- Result{
Order: result.Order,
Error: fmt.Errorf("espresso machine malfunction"),
}
continue
}
fmt.Printf("Pulling espresso for %s\n", result.Order.CustomerName)
time.Sleep(300 * time.Millisecond)
output <- result
}
}()
return output
}
func errorHandlingPipelineExample() {
orders := make(chan Order)
// Send orders
go func() {
defer close(orders)
for i := 0; i < 8; i++ {
orders <- Order{
CustomerName: fmt.Sprintf("Customer-%d", i),
Drink: "Espresso",
OrderTime: time.Now(),
}
}
}()
// Build pipeline with error handling
ground := grindBeansWithErrors(orders)
finished := pullEspressoWithErrors(ground)
// Process results
successes := 0
failures := 0
for result := range finished {
if result.Error != nil {
fmt.Printf("✗ Failed order for %s: %v\n",
result.Order.CustomerName, result.Error)
failures++
} else {
fmt.Printf("✓ Completed order for %s\n", result.Order.CustomerName)
successes++
}
}
fmt.Printf("\nSummary: %d successes, %d failures\n", successes, failures)
}
Part 4: When Things Break
The Morning Rush - Rate Limiting
During morning rush hour, customers flood in. You need to limit how fast you accept orders to avoid overwhelming the baristas.
package main
import (
"fmt"
"time"
)
// Simple rate limiter using ticker
type RateLimiter struct {
ticker *time.Ticker
rate time.Duration
}
func NewRateLimiter(requestsPerSecond int) *RateLimiter {
interval := time.Second / time.Duration(requestsPerSecond)
return &RateLimiter{
ticker: time.NewTicker(interval),
rate: interval,
}
}
func (rl *RateLimiter) Wait() {
<-rl.ticker.C
}
func (rl *RateLimiter) Stop() {
rl.ticker.Stop()
}
func rateLimitExample() {
limiter := NewRateLimiter(2) // Max 2 orders per second
defer limiter.Stop()
customers := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"}
fmt.Println("Processing orders with rate limiting...")
start := time.Now()
for i, customer := range customers {
limiter.Wait() // Wait for rate limiter
fmt.Printf("[%v] Order %d: %s\n", time.Since(start).Round(time.Millisecond), i+1, customer)
}
fmt.Printf("Total time: %v\n", time.Since(start))
}
// Token bucket rate limiter - more sophisticated
type TokenBucket struct {
tokens chan struct{}
capacity int
refillRate time.Duration
}
func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
tb := &TokenBucket{
tokens: make(chan struct{}, capacity),
capacity: capacity,
refillRate: refillRate,
}
// Fill bucket initially
for i := 0; i < capacity; i++ {
tb.tokens <- struct{}{}
}
// Start refill goroutine
go tb.refill()
return tb
}
func (tb *TokenBucket) refill() {
ticker := time.NewTicker(tb.refillRate)
defer ticker.Stop()
for range ticker.C {
select {
case tb.tokens <- struct{}{}:
// Token added
default:
// Bucket full, skip
}
}
}
func (tb *TokenBucket) Take() bool {
select {
case <-tb.tokens:
return true
default:
return false
}
}
func (tb *TokenBucket) TakeWithWait() {
<-tb.tokens
}
func tokenBucketExample() {
// Bucket with 5 tokens, refills 1 token every 500ms
bucket := NewTokenBucket(5, 500*time.Millisecond)
// Burst of 8 customers
for i := 0; i < 8; i++ {
go func(id int) {
if bucket.Take() {
fmt.Printf("Customer %d: Order accepted immediately\n", id)
} else {
fmt.Printf("Customer %d: Waiting for token...\n", id)
bucket.TakeWithWait()
fmt.Printf("Customer %d: Order accepted after wait\n", id)
}
}(i)
}
time.Sleep(5 * time.Second)
}
Ticker] -.->|Add tokens| TB style TB fill:#fff4e1 style Refill fill:#e1f5ff
The Broken Machine - Circuit Breaker
The espresso machine sometimes breaks. Instead of trying to use it repeatedly and failing, we open a “circuit breaker” to prevent wasting time.
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
mu sync.Mutex
failures int
lastFailTime time.Time
state string // "closed", "open", "half-open"
}
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: "closed",
}
}
func (cb *CircuitBreaker) Call(operation func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// Check if we should transition from open to half-open
if cb.state == "open" {
if time.Since(cb.lastFailTime) > cb.resetTimeout {
fmt.Println("Circuit breaker: Transitioning to half-open")
cb.state = "half-open"
cb.failures = 0
} else {
return fmt.Errorf("circuit breaker is open")
}
}
// Try the operation
err := operation()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
fmt.Printf("Circuit breaker: Opening after %d failures\n", cb.failures)
cb.state = "open"
}
return err
}
// Success - reset
if cb.state == "half-open" {
fmt.Println("Circuit breaker: Closing after successful call")
cb.state = "closed"
}
cb.failures = 0
return nil
}
func (cb *CircuitBreaker) GetState() string {
cb.mu.Lock()
defer cb.mu.Unlock()
return cb.state
}
func circuitBreakerExample() {
cb := NewCircuitBreaker(3, 2*time.Second)
// Simulate espresso machine that fails
attemptCount := 0
useEspressoMachine := func() error {
attemptCount++
// Fail first 5 attempts, then succeed
if attemptCount <= 5 {
return fmt.Errorf("machine malfunction")
}
return nil
}
// Try making espressos
for i := 0; i < 10; i++ {
err := cb.Call(useEspressoMachine)
if err != nil {
fmt.Printf("Attempt %d: Failed - %v (State: %s)\n",
i+1, err, cb.GetState())
} else {
fmt.Printf("Attempt %d: Success (State: %s)\n",
i+1, cb.GetState())
}
time.Sleep(500 * time.Millisecond)
// After some failures, wait for reset timeout
if i == 6 {
fmt.Println("Waiting for circuit to reset...")
time.Sleep(2 * time.Second)
}
}
}
Output:
Attempt 1: Failed - machine malfunction (State: closed)
Attempt 2: Failed - machine malfunction (State: closed)
Attempt 3: Failed - machine malfunction (State: closed)
Circuit breaker: Opening after 3 failures
Attempt 4: Failed - circuit breaker is open (State: open)
Attempt 5: Failed - circuit breaker is open (State: open)
Attempt 6: Failed - circuit breaker is open (State: open)
Attempt 7: Failed - circuit breaker is open (State: open)
Waiting for circuit to reset...
Circuit breaker: Transitioning to half-open
Attempt 8: Success (State: half-open)
Circuit breaker: Closing after successful call
Attempt 9: Success (State: closed)
Attempt 10: Success (State: closed)
The Retry - Exponential Backoff
When the milk delivery is late, you don’t check every second. You wait a bit, then longer, then even longer. This is exponential backoff.
type RetryConfig struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
func RetryWithBackoff(config RetryConfig, operation func() error) error {
delay := config.InitialDelay
for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
err := operation()
if err == nil {
return nil // Success!
}
if attempt == config.MaxAttempts {
return fmt.Errorf("max attempts reached: %w", err)
}
fmt.Printf("Attempt %d failed: %v. Retrying in %v...\n",
attempt, err, delay)
time.Sleep(delay)
// Exponential backoff
delay = time.Duration(float64(delay) * config.Multiplier)
if delay > config.MaxDelay {
delay = config.MaxDelay
}
}
return fmt.Errorf("all attempts failed")
}
func exponentialBackoffExample() {
attemptCount := 0
// Simulate unreliable milk delivery
checkMilkDelivery := func() error {
attemptCount++
if attemptCount < 4 {
return fmt.Errorf("delivery truck not here yet")
}
fmt.Println("Milk delivered!")
return nil
}
config := RetryConfig{
MaxAttempts: 5,
InitialDelay: 500 * time.Millisecond,
MaxDelay: 10 * time.Second,
Multiplier: 2.0,
}
err := RetryWithBackoff(config, checkMilkDelivery)
if err != nil {
fmt.Printf("Failed to get milk: %v\n", err)
}
}
Advanced: Retry with Jitter
func RetryWithJitter(config RetryConfig, operation func() error) error {
delay := config.InitialDelay
for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
err := operation()
if err == nil {
return nil
}
if attempt == config.MaxAttempts {
return fmt.Errorf("max attempts reached: %w", err)
}
// Add jitter: randomize ±25% of delay
jitter := time.Duration(float64(delay) * (0.75 + rand.Float64()*0.5))
fmt.Printf("Attempt %d failed. Retrying in %v...\n", attempt, jitter)
time.Sleep(jitter)
delay = time.Duration(float64(delay) * config.Multiplier)
if delay > config.MaxDelay {
delay = config.MaxDelay
}
}
return fmt.Errorf("all attempts failed")
}
Back Pressure - Bounded Queues
When orders pile up faster than baristas can handle them, you need to push back on customers.
type BoundedQueue struct {
queue chan Order
rejected int
mu sync.Mutex
}
func NewBoundedQueue(size int) *BoundedQueue {
return &BoundedQueue{
queue: make(chan Order, size),
}
}
func (bq *BoundedQueue) Enqueue(order Order) error {
select {
case bq.queue <- order:
return nil
default:
// Queue full - apply back pressure
bq.mu.Lock()
bq.rejected++
bq.mu.Unlock()
return fmt.Errorf("queue full, order rejected")
}
}
func (bq *BoundedQueue) Dequeue() (Order, bool) {
order, ok := <-bq.queue
return order, ok
}
func (bq *BoundedQueue) GetRejectedCount() int {
bq.mu.Lock()
defer bq.mu.Unlock()
return bq.rejected
}
func backPressureExample() {
queue := NewBoundedQueue(5) // Small queue
var wg sync.WaitGroup
// Start a slow barista
wg.Add(1)
go func() {
defer wg.Done()
for {
order, ok := queue.Dequeue()
if !ok {
break
}
fmt.Printf("Processing: %s\n", order.CustomerName)
time.Sleep(1 * time.Second) // Slow processing
}
}()
// Flood of customers
for i := 0; i < 20; i++ {
order := Order{
CustomerName: fmt.Sprintf("Customer-%d", i),
Drink: "Latte",
OrderTime: time.Now(),
}
err := queue.Enqueue(order)
if err != nil {
fmt.Printf("❌ %s: %v\n", order.CustomerName, err)
} else {
fmt.Printf("✓ %s: Order accepted\n", order.CustomerName)
}
time.Sleep(100 * time.Millisecond) // Customers arrive fast
}
close(queue.queue)
wg.Wait()
fmt.Printf("\nTotal rejected orders: %d\n", queue.GetRejectedCount())
}
Arriving Fast] Queue[Bounded Queue
Size: 5] Worker[Slow Barista] Rejected[Rejected Orders] Customers -->|Try to add| Queue Queue -->|Full| Rejected Queue -->|Process| Worker style Queue fill:#fff4e1 style Rejected fill:#ffe1e1
Part 5: Common Mistakes
The Forgotten Barista - Goroutine Leaks
You send a barista on break but forget to call them back. They never return to work!
// WRONG - Goroutine leak
func goroutineLeakExample() {
ch := make(chan Order)
// Start a goroutine that waits forever
go func() {
order := <-ch // Blocks forever if nothing is sent
fmt.Printf("Processing: %s\n", order.CustomerName)
}()
// Forgot to send anything to ch!
// The goroutine is leaked - it will run forever
fmt.Println("Main function exits, but goroutine still waiting...")
}
// CORRECT - Proper cleanup
func properCleanupExample() {
ch := make(chan Order)
done := make(chan bool)
go func() {
select {
case order := <-ch:
fmt.Printf("Processing: %s\n", order.CustomerName)
case <-done:
fmt.Println("Barista: Received shutdown signal")
return
case <-time.After(2 * time.Second):
fmt.Println("Barista: Timeout, going home")
return
}
}()
// Cleanup
time.Sleep(1 * time.Second)
close(done)
time.Sleep(100 * time.Millisecond) // Let goroutine finish
}
// Using context for cancellation
func contextCleanupExample() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
orders := make(chan Order)
go func() {
for {
select {
case order := <-orders:
fmt.Printf("Processing: %s\n", order.CustomerName)
case <-ctx.Done():
fmt.Println("Barista: Context cancelled, cleaning up")
return
}
}
}()
// Send some orders
orders <- Order{CustomerName: "Alice", Drink: "Latte", OrderTime: time.Now()}
time.Sleep(1 * time.Second)
// Context will be cancelled automatically after 3 seconds
}
Detecting Goroutine Leaks:
func detectLeaks() {
before := runtime.NumGoroutine()
fmt.Printf("Goroutines before: %d\n", before)
// Do some work
ch := make(chan int)
for i := 0; i < 100; i++ {
go func() {
<-ch // These goroutines leak!
}()
}
time.Sleep(1 * time.Second)
after := runtime.NumGoroutine()
fmt.Printf("Goroutines after: %d\n", after)
fmt.Printf("Leaked: %d goroutines\n", after-before)
}
The Deadlocked Kitchen - Deadlock
Two baristas each waiting for the other to finish - nobody makes progress!
// DEADLOCK - Both goroutines wait for each other
func deadlockExample() {
ch1 := make(chan int)
ch2 := make(chan int)
// Barista 1
go func() {
ch1 <- 1 // Wait for someone to receive
<-ch2 // Then wait to receive
}()
// Barista 2
go func() {
ch2 <- 2 // Wait for someone to receive
<-ch1 // Then wait to receive
}()
time.Sleep(2 * time.Second)
fmt.Println("Deadlock! Nobody makes progress")
// This will deadlock - both goroutines stuck
}
// SOLUTION - Use buffered channels or different ordering
func noDeadlockExample() {
// Solution 1: Buffered channels
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
go func() {
ch1 <- 1 // Doesn't block due to buffer
<-ch2
fmt.Println("Barista 1 done")
}()
go func() {
ch2 <- 2 // Doesn't block due to buffer
<-ch1
fmt.Println("Barista 2 done")
}()
time.Sleep(1 * time.Second)
// Solution 2: Use select with timeout
ch3 := make(chan int)
ch4 := make(chan int)
go func() {
select {
case ch3 <- 1:
<-ch4
case <-time.After(1 * time.Second):
fmt.Println("Barista 3: Timeout, moving on")
}
}()
go func() {
select {
case ch4 <- 2:
<-ch3
case <-time.After(1 * time.Second):
fmt.Println("Barista 4: Timeout, moving on")
}
}()
time.Sleep(2 * time.Second)
}
Real-world Deadlock - Mutex Lock Ordering:
type Inventory struct {
mu sync.Mutex
beans int
milk int
}
// WRONG - Can deadlock
func transferWrong(from, to *Inventory, beans int) {
from.mu.Lock()
to.mu.Lock() // If another goroutine locks in opposite order = deadlock!
from.beans -= beans
to.beans += beans
to.mu.Unlock()
from.mu.Unlock()
}
// CORRECT - Consistent lock ordering
func transferCorrect(from, to *Inventory, beans int) {
// Always lock in consistent order (e.g., by memory address)
first, second := from, to
if uintptr(unsafe.Pointer(from)) > uintptr(unsafe.Pointer(to)) {
first, second = to, from
}
first.mu.Lock()
defer first.mu.Unlock()
second.mu.Lock()
defer second.mu.Unlock()
from.beans -= beans
to.beans += beans
}
The Race for the Last Bean - Data Races
Multiple baristas reaching for the last bean at the same time - chaos ensues!
// DATA RACE - Multiple goroutines accessing shared data
type UnsafeCounter struct {
count int
}
func (uc *UnsafeCounter) Increment() {
// This is NOT atomic!
// 1. Read count
// 2. Add 1
// 3. Write back
// Another goroutine can interfere between these steps
uc.count++
}
func dataRaceExample() {
counter := &UnsafeCounter{}
var wg sync.WaitGroup
// 1000 baristas incrementing
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("Count: %d (should be 1000, but will vary due to race)\n", counter.count)
// Run with: go run -race main.go to detect
}
// SOLUTION 1: Use Mutex
type SafeCounterMutex struct {
mu sync.Mutex
count int
}
func (sc *SafeCounterMutex) Increment() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.count++
}
// SOLUTION 2: Use atomic operations
type SafeCounterAtomic struct {
count int64
}
func (sc *SafeCounterAtomic) Increment() {
atomic.AddInt64(&sc.count, 1)
}
// SOLUTION 3: Use channels
func channelSolution() {
countChan := make(chan int)
// Single goroutine owns the state
go func() {
count := 0
for range countChan {
count++
}
fmt.Printf("Final count: %d\n", count)
}()
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
countChan <- 1
}()
}
wg.Wait()
close(countChan)
time.Sleep(100 * time.Millisecond)
}
Detecting Data Races:
# Compile and run with race detector
go run -race main.go
# Build with race detector
go build -race
# Test with race detector
go test -race
Common Data Race Patterns:
// Pattern 1: Unsynchronized map access
type UnsafeCache struct {
data map[string]string
}
func (uc *UnsafeCache) Get(key string) string {
return uc.data[key] // RACE if another goroutine is writing
}
func (uc *UnsafeCache) Set(key, value string) {
uc.data[key] = value // RACE!
}
// Solution: Use sync.Map
type SafeCache struct {
data sync.Map
}
func (sc *SafeCache) Get(key string) (string, bool) {
value, ok := sc.data.Load(key)
if !ok {
return "", false
}
return value.(string), true
}
func (sc *SafeCache) Set(key, value string) {
sc.data.Store(key, value)
}
// Pattern 2: Closure over loop variable
func closureRaceExample() {
var wg sync.WaitGroup
// WRONG
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i) // RACE - i is shared!
}()
}
wg.Wait()
// CORRECT
for i := 0; i < 5; i++ {
wg.Add(1)
go func(val int) {
defer wg.Done()
fmt.Println(val) // Safe - val is copied
}(i)
}
wg.Wait()
}
Conclusion: Running Your Coffee Shop Like a Pro
We’ve covered the complete journey of running a concurrent coffee shop using Go:
Part 1: Orders and Baristas
- Channels: Communication between goroutines
- Buffered Channels: Queueing orders
- Worker Pools: Multiple baristas processing orders
- WaitGroups: Graceful shutdown
- Timeouts & Context: Handling impatient customers
Part 2: Shared Equipment
- Mutex: Exclusive access to espresso machine
- Semaphore: Limited access to milk steamers
- RWMutex: Many readers, one writer for menus
- sync.Once: Loading recipe book once
Part 3: Making the Drink
- Pipelines: Multi-stage drink preparation
- Fan-out: Parallel processing of slow stages
- Fan-in: Combining parallel operations
- Error Handling: Managing failures in pipelines
Part 4: When Things Break
- Rate Limiting: Managing rush hour
- Circuit Breaker: Handling broken equipment
- Exponential Backoff: Smart retries
- Back Pressure: Bounded queues
Part 5: Common Mistakes
- Goroutine Leaks: Forgotten workers
- Deadlocks: Circular waiting
- Data Races: Unsafe concurrent access
Key Principles
-
Share memory by communicating, don’t communicate by sharing memory
- Use channels instead of shared variables when possible
-
Always clean up your goroutines
- Use context, timeouts, or done channels
-
Protect shared state
- Use mutexes, atomic operations, or channels
-
Handle errors gracefully
- Implement timeouts, circuit breakers, and retries
-
Test with the race detector
go run -raceis your friend
Testing Your Concurrent Code
func TestCoffeeShop(t *testing.T) {
shop := NewCoffeeShop(3, 10)
shop.Start()
// Send many orders
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
shop.PlaceOrder(Order{
CustomerName: fmt.Sprintf("Customer-%d", id),
Drink: "Latte",
OrderTime: time.Now(),
})
}(i)
}
wg.Wait()
shop.Close()
// Verify no goroutine leaks
time.Sleep(100 * time.Millisecond)
goroutines := runtime.NumGoroutine()
if goroutines > 2 { // Main + test
t.Errorf("Goroutine leak detected: %d goroutines", goroutines)
}
}
What’s Next?
Now that you understand Go concurrency through the coffee shop metaphor, explore these advanced topics:
- Worker Pools: Go Worker Pool Pattern
- Context Management: Go Context Pattern
- Advanced Patterns: Go Concurrency Patterns Series
Resources
- Effective Go - Concurrency
- Go Blog - Share Memory By Communicating
- Go Concurrency Patterns (Google I/O 2012)
This post is part of the Go Concurrency Patterns series. Master these patterns and you’ll be running your concurrent programs as smoothly as a well-oiled coffee shop!