Go Concurrency Patterns Series: ← Channel Fundamentals | Series Overview | Pipeline Pattern →
What is the Select Statement?
The select statement is Go’s powerful tool for handling multiple channel operations simultaneously. It’s like a switch statement, but for channels - it allows a goroutine to wait on multiple communication operations and proceed with whichever one becomes ready first.
Think of select as a traffic controller at an intersection, managing multiple lanes of traffic (channels) and allowing the first available lane to proceed. This enables non-blocking communication, timeouts, and elegant multiplexing patterns that are essential for robust concurrent programs.
The Problem: Blocking Channel Operations
Without select, channel operations are blocking, which can lead to inefficient or deadlocked programs:
// ❌ PROBLEM: Blocking operations
func blockingExample() {
ch1 := make(chan string)
ch2 := make(chan string)
// This blocks until ch1 has data
msg1 := <-ch1
// This blocks until ch2 has data
msg2 := <-ch2
// We can't receive from both channels concurrently!
}
Select Statement Basics
Here’s how select solves the blocking problem:
package main
import (
"fmt"
"time"
)
func basicSelect() {
ch1 := make(chan string)
ch2 := make(chan string)
// Send data to channels with different delays
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Message from channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Message from channel 2"
}()
// Select waits for the first available channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Printf("Received: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("Received: %s\n", msg2)
}
}
}
Output:
Received: Message from channel 1
Received: Message from channel 2
Select Statement Patterns
1. Non-blocking Operations with Default
func nonBlockingOperations() {
ch := make(chan string, 1)
// Non-blocking send
select {
case ch <- "Hello":
fmt.Println("Sent message")
default:
fmt.Println("Channel is full, couldn't send")
}
// Non-blocking receive
select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
default:
fmt.Println("No message available")
}
// Another non-blocking receive (channel is now empty)
select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
default:
fmt.Println("Channel is empty")
}
}
2. Timeout Pattern
func timeoutPattern() {
slowOperation := func() <-chan string {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second) // Simulate slow operation
ch <- "Operation completed"
}()
return ch
}
fmt.Println("Starting operation with 2-second timeout...")
select {
case result := <-slowOperation():
fmt.Printf("Success: %s\n", result)
case <-time.After(2 * time.Second):
fmt.Println("Operation timed out")
}
}
3. Heartbeat and Monitoring
func heartbeatPattern() {
done := make(chan bool)
// Worker goroutine with heartbeat
go func() {
heartbeat := time.NewTicker(500 * time.Millisecond)
defer heartbeat.Stop()
work := time.NewTicker(2 * time.Second)
defer work.Stop()
for {
select {
case <-heartbeat.C:
fmt.Println("💓 Heartbeat")
case <-work.C:
fmt.Println("🔨 Doing work")
case <-done:
fmt.Println("👋 Worker shutting down")
return
}
}
}()
// Let it run for a while
time.Sleep(5 * time.Second)
close(done)
time.Sleep(100 * time.Millisecond) // Give time to shutdown
}
Real-World Example: Multi-Source Data Aggregator
Let’s build a practical example that aggregates data from multiple sources with different response times:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type DataSource struct {
Name string
URL string
}
type DataResult struct {
Source string
Data string
Duration time.Duration
Error error
}
type DataAggregator struct {
sources []DataSource
timeout time.Duration
}
func NewDataAggregator(timeout time.Duration) *DataAggregator {
return &DataAggregator{
sources: []DataSource{
{Name: "API-1", URL: "https://api1.example.com/data"},
{Name: "API-2", URL: "https://api2.example.com/data"},
{Name: "API-3", URL: "https://api3.example.com/data"},
{Name: "Cache", URL: "redis://cache/data"},
{Name: "Database", URL: "postgres://db/data"},
},
timeout: timeout,
}
}
func (da *DataAggregator) FetchData() []DataResult {
resultCh := make(chan DataResult, len(da.sources))
var wg sync.WaitGroup
// Start fetching from all sources concurrently
for _, source := range da.sources {
wg.Add(1)
go func(src DataSource) {
defer wg.Done()
result := da.fetchFromSource(src)
resultCh <- result
}(source)
}
// Close channel when all goroutines complete
go func() {
wg.Wait()
close(resultCh)
}()
return da.collectResults(resultCh)
}
func (da *DataAggregator) fetchFromSource(source DataSource) DataResult {
start := time.Now()
// Simulate different response times and failure rates
var delay time.Duration
var failureRate float64
switch source.Name {
case "Cache":
delay = 50 * time.Millisecond
failureRate = 0.05 // 5% failure rate
case "API-1":
delay = 200 * time.Millisecond
failureRate = 0.1 // 10% failure rate
case "API-2":
delay = 500 * time.Millisecond
failureRate = 0.15 // 15% failure rate
case "API-3":
delay = 800 * time.Millisecond
failureRate = 0.2 // 20% failure rate
case "Database":
delay = 300 * time.Millisecond
failureRate = 0.05 // 5% failure rate
}
time.Sleep(delay)
// Simulate random failures
if rand.Float64() < failureRate {
return DataResult{
Source: source.Name,
Duration: time.Since(start),
Error: fmt.Errorf("failed to fetch from %s", source.Name),
}
}
return DataResult{
Source: source.Name,
Data: fmt.Sprintf("Data from %s", source.Name),
Duration: time.Since(start),
}
}
func (da *DataAggregator) collectResults(resultCh <-chan DataResult) []DataResult {
var results []DataResult
timeout := time.After(da.timeout)
for {
select {
case result, ok := <-resultCh:
if !ok {
// Channel closed, all results collected
return results
}
results = append(results, result)
case <-timeout:
fmt.Printf("⏰ Timeout reached (%v), stopping collection\n", da.timeout)
return results
}
}
}
func (da *DataAggregator) GetBestResult(results []DataResult) *DataResult {
var best *DataResult
// Priority: Cache > Database > API-1 > API-2 > API-3
priority := map[string]int{
"Cache": 1,
"Database": 2,
"API-1": 3,
"API-2": 4,
"API-3": 5,
}
for _, result := range results {
if result.Error != nil {
continue // Skip failed results
}
if best == nil || priority[result.Source] < priority[best.Source] {
best = &result
}
}
return best
}
func main() {
aggregator := NewDataAggregator(1 * time.Second)
fmt.Println("🚀 Starting data aggregation...")
start := time.Now()
results := aggregator.FetchData()
totalDuration := time.Since(start)
fmt.Printf("\n📊 Results collected in %v:\n", totalDuration)
fmt.Println(strings.Repeat("-", 50))
successCount := 0
for _, result := range results {
if result.Error != nil {
fmt.Printf("❌ %s: %v (took %v)\n",
result.Source, result.Error, result.Duration)
} else {
fmt.Printf("✅ %s: %s (took %v)\n",
result.Source, result.Data, result.Duration)
successCount++
}
}
fmt.Println(strings.Repeat("-", 50))
fmt.Printf("📈 Success rate: %d/%d (%.1f%%)\n",
successCount, len(results),
float64(successCount)/float64(len(results))*100)
// Get the best result based on priority
best := aggregator.GetBestResult(results)
if best != nil {
fmt.Printf("🏆 Best result: %s (%s)\n", best.Data, best.Source)
} else {
fmt.Println("😞 No successful results")
}
}
Advanced Select Patterns
1. Fan-In Pattern (Multiple Producers, Single Consumer)
func fanInPattern() {
// Multiple producer channels
producer1 := func() <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < 3; i++ {
ch <- fmt.Sprintf("Producer1-%d", i)
time.Sleep(300 * time.Millisecond)
}
}()
return ch
}
producer2 := func() <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < 3; i++ {
ch <- fmt.Sprintf("Producer2-%d", i)
time.Sleep(500 * time.Millisecond)
}
}()
return ch
}
// Fan-in: merge multiple channels into one
fanIn := func(ch1, ch2 <-chan string) <-chan string {
merged := make(chan string)
go func() {
defer close(merged)
for ch1 != nil || ch2 != nil {
select {
case msg, ok := <-ch1:
if !ok {
ch1 = nil // Channel closed
continue
}
merged <- msg
case msg, ok := <-ch2:
if !ok {
ch2 = nil // Channel closed
continue
}
merged <- msg
}
}
}()
return merged
}
// Use fan-in
ch1 := producer1()
ch2 := producer2()
merged := fanIn(ch1, ch2)
fmt.Println("Fan-in pattern:")
for msg := range merged {
fmt.Printf("Received: %s\n", msg)
}
}
2. Circuit Breaker with Select
type CircuitBreaker struct {
maxFailures int
failures int
lastFailure time.Time
timeout time.Duration
state string // "closed", "open", "half-open"
}
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
timeout: timeout,
state: "closed",
}
}
func (cb *CircuitBreaker) Call(operation func() error) error {
switch cb.state {
case "open":
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
fmt.Println("🔄 Circuit breaker: half-open")
} else {
return fmt.Errorf("circuit breaker is open")
}
}
// Execute operation with timeout
done := make(chan error, 1)
go func() {
done <- operation()
}()
select {
case err := <-done:
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
case <-time.After(5 * time.Second):
cb.onFailure()
return fmt.Errorf("operation timeout")
}
}
func (cb *CircuitBreaker) onFailure() {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = "open"
fmt.Printf("🔴 Circuit breaker opened (failures: %d)\n", cb.failures)
}
}
func (cb *CircuitBreaker) onSuccess() {
cb.failures = 0
cb.state = "closed"
fmt.Println("🟢 Circuit breaker closed")
}
func circuitBreakerExample() {
cb := NewCircuitBreaker(3, 2*time.Second)
// Simulate operations with varying success rates
operations := []func() error{
func() error { return nil }, // Success
func() error { return fmt.Errorf("failure 1") },
func() error { return fmt.Errorf("failure 2") },
func() error { return fmt.Errorf("failure 3") }, // This opens the circuit
func() error { return nil }, // This should be rejected
func() error { return nil }, // This should be rejected
}
for i, op := range operations {
fmt.Printf("\nOperation %d: ", i+1)
err := cb.Call(op)
if err != nil {
fmt.Printf("❌ %v\n", err)
} else {
fmt.Printf("✅ Success\n")
}
time.Sleep(500 * time.Millisecond)
}
// Wait for circuit to half-open
fmt.Println("\nWaiting for circuit to half-open...")
time.Sleep(3 * time.Second)
// Try again
fmt.Printf("\nRetry operation: ")
err := cb.Call(func() error { return nil })
if err != nil {
fmt.Printf("❌ %v\n", err)
} else {
fmt.Printf("✅ Success\n")
}
}
3. Rate Limiter with Select
type RateLimiter struct {
tokens chan struct{}
ticker *time.Ticker
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, burst),
ticker: time.NewTicker(time.Second / time.Duration(rate)),
}
// Fill initial tokens
for i := 0; i < burst; i++ {
rl.tokens <- struct{}{}
}
// Refill tokens
go func() {
for range rl.ticker.C {
select {
case rl.tokens <- struct{}{}:
// Token added
default:
// Bucket full, drop token
}
}
}()
return rl
}
func (rl *RateLimiter) Allow() bool {
select {
case <-rl.tokens:
return true
default:
return false
}
}
func (rl *RateLimiter) Wait() {
<-rl.tokens
}
func (rl *RateLimiter) WaitWithTimeout(timeout time.Duration) bool {
select {
case <-rl.tokens:
return true
case <-time.After(timeout):
return false
}
}
func (rl *RateLimiter) Close() {
rl.ticker.Stop()
}
func rateLimiterExample() {
// Allow 2 requests per second with burst of 5
limiter := NewRateLimiter(2, 5)
defer limiter.Close()
// Simulate rapid requests
for i := 0; i < 10; i++ {
start := time.Now()
if limiter.Allow() {
fmt.Printf("Request %d: ✅ Allowed (immediate)\n", i+1)
} else {
fmt.Printf("Request %d: ⏳ Waiting for token...", i+1)
if limiter.WaitWithTimeout(2 * time.Second) {
fmt.Printf(" ✅ Allowed (waited %v)\n", time.Since(start))
} else {
fmt.Printf(" ❌ Timeout\n")
}
}
time.Sleep(200 * time.Millisecond)
}
}
Select Statement Best Practices
1. Avoid Goroutine Leaks
func avoidGoroutineLeaks() {
// ❌ BAD: Potential goroutine leak
badExample := func() <-chan string {
ch := make(chan string)
go func() {
// This goroutine might run forever
for {
ch <- "data"
time.Sleep(1 * time.Second)
}
}()
return ch
}
// ✅ GOOD: Cancellable goroutine
goodExample := func(done <-chan struct{}) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case ch <- "data":
case <-done:
return
}
case <-done:
return
}
}
}()
return ch
}
done := make(chan struct{})
ch := goodExample(done)
// Receive a few messages
for i := 0; i < 3; i++ {
select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
case <-time.After(2 * time.Second):
fmt.Println("Timeout")
}
}
// Clean shutdown
close(done)
}
2. Handle Channel Closing Properly
func handleChannelClosing() {
producer := func() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(200 * time.Millisecond)
}
}()
return ch
}
ch := producer()
timeout := time.After(2 * time.Second)
for {
select {
case value, ok := <-ch:
if !ok {
fmt.Println("Channel closed, exiting")
return
}
fmt.Printf("Received: %d\n", value)
case <-timeout:
fmt.Println("Timeout reached")
return
}
}
}
3. Random Selection Behavior
func randomSelectionDemo() {
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
// Both channels have data ready
ch1 <- "from ch1"
ch2 <- "from ch2"
// Select will randomly choose one
fmt.Println("Random selection (run multiple times):")
for i := 0; i < 5; i++ {
// Refill channels
select {
case ch1 <- "from ch1":
default:
}
select {
case ch2 <- "from ch2":
default:
}
// Random selection
select {
case msg := <-ch1:
fmt.Printf("Iteration %d: %s\n", i+1, msg)
case msg := <-ch2:
fmt.Printf("Iteration %d: %s\n", i+1, msg)
}
}
}
Testing Select-Based Code
func TestSelectTimeout(t *testing.T) {
slowCh := make(chan string)
// Don't send anything to slowCh to test timeout
start := time.Now()
select {
case <-slowCh:
t.Error("Should not receive from slow channel")
case <-time.After(100 * time.Millisecond):
// Expected timeout
}
duration := time.Since(start)
if duration < 90*time.Millisecond || duration > 150*time.Millisecond {
t.Errorf("Timeout took %v, expected ~100ms", duration)
}
}
func TestSelectNonBlocking(t *testing.T) {
ch := make(chan int, 1)
// Test non-blocking send
select {
case ch <- 42:
// Success
default:
t.Error("Should be able to send to buffered channel")
}
// Test non-blocking receive
select {
case value := <-ch:
if value != 42 {
t.Errorf("Expected 42, got %d", value)
}
default:
t.Error("Should be able to receive from channel with data")
}
// Test non-blocking receive from empty channel
select {
case <-ch:
t.Error("Should not receive from empty channel")
default:
// Expected
}
}
Performance Considerations
func selectPerformance() {
// Benchmark select vs if-else for channel operations
numOperations := 1000000
// Select-based approach
selectBench := func() time.Duration {
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 1
start := time.Now()
for i := 0; i < numOperations; i++ {
select {
case val := <-ch1:
ch1 <- val
case ch2 <- i:
<-ch2
}
}
return time.Since(start)
}
fmt.Printf("Select performance for %d operations: %v\n",
numOperations, selectBench())
}
Conclusion
The select statement is a powerful tool that enables:
- Non-blocking channel operations with the default case
- Timeout handling for robust network operations
- Multiplexing multiple channel operations
- Elegant control flow in concurrent programs
Key Takeaways
- Use select for multiple channels - it’s the idiomatic way to handle multiple channel operations
- Default case prevents blocking - essential for non-blocking operations
- Timeouts prevent hangs - always consider timeout patterns for external operations
- Random selection - when multiple cases are ready, select chooses randomly
- Proper cleanup - always handle channel closing and goroutine lifecycle
Select Use Cases
- API timeouts: Prevent hanging on slow network calls
- Heartbeat monitoring: Combine work with periodic health checks
- Fan-in patterns: Merge multiple data sources
- Circuit breakers: Implement failure detection and recovery
- Rate limiting: Control operation frequency
What’s Next?
Now that you’ve mastered the fundamentals of goroutines, channels, and select statements, you’re ready to explore more sophisticated patterns. Next, we’ll dive into the Pipeline Pattern - a powerful way to structure concurrent data processing workflows.
In the next post, we’ll explore Pipeline Pattern and learn how to build efficient, composable data processing pipelines using channels.
This post is part of the Go Concurrency Patterns series. The select statement is fundamental to many advanced patterns we’ll cover in upcoming posts.