Concurrency is one of Go’s most powerful features, built into the language from the ground up. This comprehensive guide covers all essential concurrency patterns with visual diagrams and practical code examples.
Table of Contents
- Goroutines - Basic Concurrency
- Channels - Communication
- Select Statement - Multiplexing
- Worker Pool Pattern
- Fan-In Pattern
- Fan-Out Pattern
- Pipeline Pattern
- Semaphore Pattern
- Barrier Pattern
- Future/Promise Pattern
- Rate Limiting Pattern
- Circuit Breaker Pattern
- Context Pattern
- Mutex Pattern
- WaitGroup Pattern
- ErrGroup Pattern
Goroutines - Basic Concurrency
Goroutines are lightweight threads managed by the Go runtime. They enable concurrent execution with minimal overhead.
package main
import (
"fmt"
"time"
)
func main() {
// Launch goroutines - lightweight concurrent functions
for i := 1; i <= 3; i++ {
go func(id int) {
fmt.Printf("Goroutine %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Goroutine %d done\n", id)
}(i) // Pass i as parameter to avoid closure issue
}
// Wait for goroutines to complete
time.Sleep(2 * time.Second)
fmt.Println("Main goroutine exiting")
}
Channels - Communication
Channels provide a way for goroutines to communicate and synchronize. They can be buffered or unbuffered.
package main
import (
"fmt"
"time"
)
func main() {
// Unbuffered channel - synchronous communication
ch := make(chan string)
go func() {
time.Sleep(time.Second)
ch <- "Hello from goroutine" // Send blocks until receiver is ready
}()
msg := <-ch // Receive blocks until sender sends
fmt.Println(msg)
// Buffered channel - asynchronous up to buffer size
buffered := make(chan int, 3)
// Send multiple values without blocking
buffered <- 1
buffered <- 2
buffered <- 3
// Receive values
fmt.Println(<-buffered, <-buffered, <-buffered)
close(buffered)
}
Select Statement - Multiplexing
The select statement lets a goroutine wait on multiple channel operations, proceeding with whichever becomes ready first.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
// Select waits on multiple channels, proceeds with first ready
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
case <-time.After(3 * time.Second):
fmt.Println("Timeout") // Timeout case
}
}
}
Worker Pool Pattern
Worker pools limit concurrent operations by using a fixed number of workers processing jobs from a queue.
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
// Worker processes jobs from channel until it's closed
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(time.Second) // Simulate work
fmt.Printf("Worker %d finished job %d\n", id, job)
results <- job * 2 // Send result
}
}
func main() {
const numJobs = 9
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start worker pool
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Close to signal no more jobs
// Collect results
for r := 1; r <= numJobs; r++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
}
Fan-In Pattern
Fan-in multiplexes multiple input channels into a single output channel.
package main
import (
"fmt"
"sync"
"time"
)
// fanIn merges multiple channels into one
func fanIn(channels ...<-chan string) <-chan string {
var wg sync.WaitGroup
merged := make(chan string)
// Start a goroutine for each input channel
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for val := range c {
merged <- val // Forward to merged channel
}
}(ch)
}
// Close merged channel when all inputs are done
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func producer(name string, count int) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 1; i <= count; i++ {
ch <- fmt.Sprintf("%s: message %d", name, i)
time.Sleep(100 * time.Millisecond)
}
}()
return ch
}
func main() {
ch1 := producer("Producer1", 3)
ch2 := producer("Producer2", 3)
ch3 := producer("Producer3", 3)
// Fan-in all producers
merged := fanIn(ch1, ch2, ch3)
// Consume merged output
for msg := range merged {
fmt.Println(msg)
}
}
Fan-Out Pattern
Fan-out distributes work from a single source to multiple workers.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond) // Simulate work
fmt.Printf("Worker %d completed job %d\n", id, job)
}
}
func main() {
const numWorkers = 4
const numJobs = 12
jobs := make(chan int, numJobs)
var wg sync.WaitGroup
// Fan-out: Start multiple workers reading from same channel
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, &wg)
}
// Send jobs - distributed across workers
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// Wait for all workers to complete
wg.Wait()
fmt.Println("All jobs completed")
}
Pipeline Pattern
Pipelines chain multiple stages where each stage processes data and passes it to the next.
package main
import "fmt"
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n // Transform: square each number
}
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n // Only pass even numbers
}
}
}()
return out
}
func main() {
// Build pipeline: generate -> square -> filter
input := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(input)
filtered := filterEven(squared)
// Consume pipeline output
for result := range filtered {
fmt.Printf("Result: %d\n", result)
}
}
Semaphore Pattern
Semaphores limit the number of concurrent operations using a buffered channel.
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore struct {
sem chan struct{}
}
// NewSemaphore creates a semaphore with n permits
func NewSemaphore(maxConcurrent int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrent),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{} // Blocks if buffer is full
}
func (s *Semaphore) Release() {
<-s.sem // Frees up a slot
}
func main() {
sem := NewSemaphore(3) // Allow max 3 concurrent operations
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire() // Acquire permit
defer sem.Release() // Release when done
fmt.Printf("Task %d acquired semaphore\n", id)
time.Sleep(time.Second) // Simulate work
fmt.Printf("Task %d releasing semaphore\n", id)
}(i)
}
wg.Wait()
fmt.Println("All tasks completed")
}
Barrier Pattern
Barriers synchronize multiple goroutines to wait until all reach a certain point before proceeding.
package main
import (
"fmt"
"sync"
"time"
)
type Barrier struct {
size int
count int
mu sync.Mutex
cond *sync.Cond
entered int
}
func NewBarrier(size int) *Barrier {
b := &Barrier{size: size}
b.cond = sync.NewCond(&b.mu)
return b
}
func (b *Barrier) Wait() {
b.mu.Lock()
b.count++
b.entered++
if b.count == b.size {
// Last one to arrive - wake everyone up
b.count = 0
b.cond.Broadcast()
b.mu.Unlock()
} else {
// Wait for others
b.cond.Wait()
b.mu.Unlock()
}
}
func main() {
barrier := NewBarrier(4)
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Task %d: Phase 1 starting\n", id)
time.Sleep(time.Duration(id) * 500 * time.Millisecond)
fmt.Printf("Task %d: Phase 1 complete, waiting at barrier\n", id)
barrier.Wait() // Synchronization point
fmt.Printf("Task %d: Phase 2 starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Task %d: Phase 2 complete\n", id)
}(i)
}
wg.Wait()
}
Future/Promise Pattern
Futures represent a value that will be available in the future, enabling asynchronous computation.
package main
import (
"fmt"
"time"
)
// Future represents a value that will be available in the future
type Future struct {
result chan interface{}
err chan error
}
// NewFuture creates and starts an async computation
func NewFuture(fn func() (interface{}, error)) *Future {
f := &Future{
result: make(chan interface{}, 1),
err: make(chan error, 1),
}
go func() {
res, err := fn() // Execute async computation
if err != nil {
f.err <- err
} else {
f.result <- res
}
}()
return f
}
// Get blocks until result is available
func (f *Future) Get() (interface{}, error) {
select {
case res := <-f.result:
return res, nil
case err := <-f.err:
return nil, err
}
}
func expensiveComputation(x int) (interface{}, error) {
time.Sleep(2 * time.Second) // Simulate expensive work
return x * x, nil
}
func main() {
fmt.Println("Starting async computation...")
// Start computation, returns immediately
future := NewFuture(func() (interface{}, error) {
return expensiveComputation(42)
})
fmt.Println("Doing other work while computation runs...")
time.Sleep(time.Second)
// Get result (blocks if not ready)
result, err := future.Get()
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Result: %v\n", result)
}
}
Rate Limiting Pattern
Rate limiters control the rate of operations using a ticker or token bucket approach.
package main
import (
"fmt"
"time"
)
type RateLimiter struct {
rate time.Duration
ticker *time.Ticker
bucket chan struct{}
}
// NewRateLimiter creates a rate limiter allowing operations at specified rate
func NewRateLimiter(rate time.Duration, burst int) *RateLimiter {
rl := &RateLimiter{
rate: rate,
ticker: time.NewTicker(rate),
bucket: make(chan struct{}, burst),
}
// Fill initial bucket
for i := 0; i < burst; i++ {
rl.bucket <- struct{}{}
}
// Refill bucket at rate
go func() {
for range rl.ticker.C {
select {
case rl.bucket <- struct{}{}:
default: // Bucket full, skip
}
}
}()
return rl
}
func (rl *RateLimiter) Wait() {
<-rl.bucket // Blocks until token available
}
func (rl *RateLimiter) Stop() {
rl.ticker.Stop()
}
func main() {
// Allow 1 operation per 500ms, burst of 3
limiter := NewRateLimiter(500*time.Millisecond, 3)
defer limiter.Stop()
for i := 1; i <= 10; i++ {
limiter.Wait() // Rate limited
fmt.Printf("Request %d processed at %v\n", i, time.Now().Format("15:04:05.000"))
}
}
Circuit Breaker Pattern
Circuit breakers prevent cascading failures by stopping requests to failing services.
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
state State
failures int
lastFailTime time.Time
mu sync.Mutex
}
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: StateClosed,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
// Check if we should transition from Open to HalfOpen
if cb.state == StateOpen && time.Since(cb.lastFailTime) > cb.resetTimeout {
cb.state = StateHalfOpen
cb.failures = 0
}
// Reject if circuit is open
if cb.state == StateOpen {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
cb.mu.Unlock()
// Execute function
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
// Open circuit if failures exceed threshold
if cb.failures >= cb.maxFailures {
cb.state = StateOpen
fmt.Println("Circuit breaker opened!")
}
return err
}
// Success - close circuit
if cb.state == StateHalfOpen {
cb.state = StateClosed
fmt.Println("Circuit breaker closed!")
}
cb.failures = 0
return nil
}
func main() {
cb := NewCircuitBreaker(3, 5*time.Second)
// Simulate failing service
failingService := func() error {
return errors.New("service unavailable")
}
// Try calling failing service
for i := 1; i <= 10; i++ {
err := cb.Call(failingService)
if err != nil {
fmt.Printf("Call %d failed: %v\n", i, err)
}
time.Sleep(time.Second)
}
}
Context Pattern
Contexts carry deadlines, cancellation signals, and request-scoped values across API boundaries.
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
// Context cancelled or timed out
fmt.Printf("%s: stopped (%v)\n", name, ctx.Err())
return
default:
fmt.Printf("%s: working...\n", name)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 1. Context with cancellation
ctx1, cancel1 := context.WithCancel(context.Background())
go worker(ctx1, "Worker1")
time.Sleep(2 * time.Second)
cancel1() // Manual cancellation
time.Sleep(time.Second)
// 2. Context with timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel2()
go worker(ctx2, "Worker2")
time.Sleep(4 * time.Second)
// 3. Context with deadline
deadline := time.Now().Add(2 * time.Second)
ctx3, cancel3 := context.WithDeadline(context.Background(), deadline)
defer cancel3()
go worker(ctx3, "Worker3")
time.Sleep(3 * time.Second)
}
Mutex Pattern
Mutexes provide mutual exclusion to protect shared resources from concurrent access.
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter is safe to use concurrently
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock() // Acquire lock
defer c.mu.Unlock() // Release lock when done
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// RWMutex allows multiple readers or single writer
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // Multiple readers can acquire RLock
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // Only one writer can acquire Lock
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
// Start 100 goroutines incrementing counter
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("Final count: %d\n", counter.Value())
// RWMutex example
cache := &Cache{data: make(map[string]string)}
// Multiple readers
for i := 0; i < 5; i++ {
go func(id int) {
value := cache.Get("key")
fmt.Printf("Reader %d: %s\n", id, value)
}(i)
}
// Single writer
cache.Set("key", "value")
time.Sleep(time.Second)
}
WaitGroup Pattern
WaitGroups wait for a collection of goroutines to finish executing.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Decrement counter when done
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Duration(id) * time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// Launch 5 workers
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment counter before launching goroutine
go worker(i, &wg)
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All workers completed")
}
ErrGroup Pattern
ErrGroup provides synchronization, error propagation, and context cancellation for groups of goroutines.
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func fetchData(ctx context.Context, id int) error {
select {
case <-time.After(time.Duration(id) * time.Second):
if id == 3 {
return errors.New("failed to fetch data 3") // Simulate error
}
fmt.Printf("Fetched data %d\n", id)
return nil
case <-ctx.Done():
// Context cancelled due to error in another goroutine
fmt.Printf("Fetch %d cancelled\n", id)
return ctx.Err()
}
}
func main() {
// Create errgroup with context
g, ctx := errgroup.WithContext(context.Background())
// Launch multiple tasks
for i := 1; i <= 5; i++ {
id := i
g.Go(func() error {
return fetchData(ctx, id)
})
}
// Wait for all goroutines to complete
// Returns first non-nil error if any
if err := g.Wait(); err != nil {
fmt.Printf("Error occurred: %v\n", err)
} else {
fmt.Println("All tasks completed successfully")
}
}
Summary
This guide covered 16 essential Go concurrency patterns:
| Pattern | Use Case | Key Benefit |
|---|---|---|
| Goroutines | Concurrent execution | Lightweight threads |
| Channels | Communication between goroutines | Type-safe message passing |
| Select | Multiplexing channels | Non-blocking operations |
| Worker Pool | Limited concurrent workers | Resource control |
| Fan-In | Merge multiple inputs | Consolidation |
| Fan-Out | Distribute to multiple workers | Parallel processing |
| Pipeline | Multi-stage processing | Data transformation |
| Semaphore | Limit concurrent operations | Resource limiting |
| Barrier | Synchronization point | Phase coordination |
| Future/Promise | Async computation | Deferred results |
| Rate Limiting | Control operation rate | API throttling |
| Circuit Breaker | Prevent cascading failures | Fault tolerance |
| Context | Cancellation & deadlines | Request lifecycle |
| Mutex | Protect shared state | Data safety |
| WaitGroup | Wait for goroutines | Synchronization |
| ErrGroup | Error propagation | Error handling |
Best Practices
- Always close channels when done sending to prevent goroutine leaks
- Use buffered channels to prevent blocking when appropriate
- Pass channels as parameters rather than returning them when possible
- Prefer sync primitives over channels for simple mutual exclusion
- Use context for cancellation and timeouts in production code
- Avoid goroutine leaks by ensuring all goroutines can exit
- Handle errors properly in concurrent code using errgroup or custom patterns
- Test concurrent code with race detector:
go test -race
Further Reading
- Go Concurrency Patterns (Rob Pike)
- Advanced Go Concurrency Patterns
- Effective Go: Concurrency
- Go Memory Model
- golang.org/x/sync Package
Master these patterns to build robust, efficient concurrent systems in Go!