Go Concurrency Patterns Series: ← Goroutine Basics | Series Overview | Select Statement →


What are Channels?

Channels are Go’s primary mechanism for communication between goroutines. They embody Go’s concurrency philosophy: “Don’t communicate by sharing memory; share memory by communicating.” Think of channels as typed pipes that allow goroutines to safely pass data back and forth.

Channels provide both communication and synchronization, making them incredibly powerful for building concurrent applications. They’re type-safe, can be buffered or unbuffered, and support directional constraints for better API design.

The Problem: Unsafe Shared Memory

Before diving into channels, let’s see why we need them. Here’s what happens when goroutines share memory unsafely:

package main

import (
    "fmt"
    "sync"
    "time"
)

// ❌ UNSAFE: Race condition example
func unsafeSharedMemory() {
    var counter int
    var wg sync.WaitGroup
    
    // Multiple goroutines incrementing the same variable
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // Race condition!
        }()
    }
    
    wg.Wait()
    fmt.Printf("Unsafe counter result: %d (should be 1000)\n", counter)
}

This code has a race condition. Multiple goroutines are accessing the same memory location simultaneously, leading to unpredictable results.

Channel Basics: Safe Communication

Here’s how channels solve the communication problem:

func safeChannelCommunication() {
    // Create a channel for integers
    ch := make(chan int)
    
    // Goroutine that sends data
    go func() {
        for i := 1; i <= 5; i++ {
            fmt.Printf("Sending: %d\n", i)
            ch <- i // Send value to channel
            time.Sleep(500 * time.Millisecond)
        }
        close(ch) // Signal that no more values will be sent
    }()
    
    // Receive data from channel
    for value := range ch {
        fmt.Printf("Received: %d\n", value)
    }
    
    fmt.Println("Channel communication complete")
}

Output:

Sending: 1
Received: 1
Sending: 2
Received: 2
Sending: 3
Received: 3
Sending: 4
Received: 4
Sending: 5
Received: 5
Channel communication complete

Channel Types and Creation

1. Unbuffered Channels (Synchronous)

func unbufferedChannels() {
    // Create unbuffered channel
    ch := make(chan string)
    
    go func() {
        fmt.Println("Goroutine: About to send")
        ch <- "Hello" // This blocks until someone receives
        fmt.Println("Goroutine: Sent successfully")
    }()
    
    time.Sleep(1 * time.Second) // Simulate delay
    fmt.Println("Main: About to receive")
    message := <-ch // This blocks until someone sends
    fmt.Printf("Main: Received '%s'\n", message)
}

Unbuffered channels provide synchronous communication - the sender blocks until the receiver is ready, and vice versa.

2. Buffered Channels (Asynchronous)

func bufferedChannels() {
    // Create buffered channel with capacity 3
    ch := make(chan int, 3)
    
    // Send values without blocking (until buffer is full)
    ch <- 1
    ch <- 2
    ch <- 3
    
    fmt.Printf("Sent 3 values, buffer length: %d, capacity: %d\n", 
        len(ch), cap(ch))
    
    // This would block because buffer is full
    // ch <- 4 // Deadlock!
    
    // Receive values
    for i := 0; i < 3; i++ {
        value := <-ch
        fmt.Printf("Received: %d\n", value)
    }
}

3. Channel Directions (Type Safety)

// Send-only channel parameter
func sender(ch chan<- int) {
    for i := 1; i <= 3; i++ {
        ch <- i
    }
    close(ch)
}

// Receive-only channel parameter
func receiver(ch <-chan int) {
    for value := range ch {
        fmt.Printf("Received: %d\n", value)
    }
}

func directionalChannels() {
    ch := make(chan int)
    
    go sender(ch)   // Can only send to ch
    receiver(ch)    // Can only receive from ch
}

Real-World Example: Producer-Consumer Pattern

Let’s build a practical example that demonstrates channel usage in a producer-consumer scenario:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Task struct {
    ID       int
    Data     string
    Priority int
}

type TaskProcessor struct {
    taskQueue   chan Task
    resultQueue chan string
    workers     int
    wg          sync.WaitGroup
}

func NewTaskProcessor(workers, queueSize int) *TaskProcessor {
    return &TaskProcessor{
        taskQueue:   make(chan Task, queueSize),
        resultQueue: make(chan string, queueSize),
        workers:     workers,
    }
}

func (tp *TaskProcessor) Start() {
    // Start worker goroutines
    for i := 0; i < tp.workers; i++ {
        tp.wg.Add(1)
        go tp.worker(i)
    }
    
    fmt.Printf("Started %d workers\n", tp.workers)
}

func (tp *TaskProcessor) worker(id int) {
    defer tp.wg.Done()
    
    for task := range tp.taskQueue {
        // Simulate processing time based on priority
        processingTime := time.Duration(task.Priority) * 100 * time.Millisecond
        time.Sleep(processingTime)
        
        result := fmt.Sprintf("Worker %d processed task %d (%s) in %v", 
            id, task.ID, task.Data, processingTime)
        
        tp.resultQueue <- result
    }
    
    fmt.Printf("Worker %d finished\n", id)
}

func (tp *TaskProcessor) SubmitTask(task Task) {
    tp.taskQueue <- task
}

func (tp *TaskProcessor) Stop() {
    close(tp.taskQueue)
    tp.wg.Wait()
    close(tp.resultQueue)
}

func (tp *TaskProcessor) GetResults() []string {
    var results []string
    for result := range tp.resultQueue {
        results = append(results, result)
    }
    return results
}

func main() {
    // Create task processor with 3 workers and buffer size 10
    processor := NewTaskProcessor(3, 10)
    processor.Start()
    
    // Producer: Generate tasks
    go func() {
        tasks := []Task{
            {ID: 1, Data: "Process image", Priority: 3},
            {ID: 2, Data: "Send email", Priority: 1},
            {ID: 3, Data: "Generate report", Priority: 5},
            {ID: 4, Data: "Update database", Priority: 2},
            {ID: 5, Data: "Backup files", Priority: 4},
            {ID: 6, Data: "Analyze logs", Priority: 2},
        }
        
        for _, task := range tasks {
            fmt.Printf("Submitting task %d: %s (priority %d)\n", 
                task.ID, task.Data, task.Priority)
            processor.SubmitTask(task)
        }
        
        // Signal no more tasks
        processor.Stop()
    }()
    
    // Consumer: Collect results
    results := processor.GetResults()
    
    fmt.Println("\nAll tasks completed:")
    for _, result := range results {
        fmt.Printf("✅ %s\n", result)
    }
}

Channel Patterns and Idioms

1. Channel Closing and Detection

func channelClosingPatterns() {
    ch := make(chan int, 5)
    
    // Producer
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- i
        }
        close(ch) // Important: close when done sending
    }()
    
    // Consumer with close detection
    for {
        value, ok := <-ch
        if !ok {
            fmt.Println("Channel closed")
            break
        }
        fmt.Printf("Received: %d\n", value)
    }
    
    // Alternative: range automatically handles closing
    ch2 := make(chan string, 3)
    go func() {
        ch2 <- "Hello"
        ch2 <- "World"
        ch2 <- "!"
        close(ch2)
    }()
    
    for message := range ch2 {
        fmt.Printf("Message: %s\n", message)
    }
}

2. Channel Ownership Pattern

// ✅ GOOD: Clear ownership - creator closes
func channelOwnership() {
    // Function that creates and owns the channel
    createDataStream := func() <-chan int {
        ch := make(chan int)
        
        go func() {
            defer close(ch) // Owner closes the channel
            for i := 1; i <= 5; i++ {
                ch <- i
                time.Sleep(100 * time.Millisecond)
            }
        }()
        
        return ch
    }
    
    // Consumer just receives
    dataStream := createDataStream()
    for data := range dataStream {
        fmt.Printf("Processing: %d\n", data)
    }
}

3. Channel Multiplexing

func channelMultiplexing() {
    // Multiple input channels
    ch1 := make(chan string)
    ch2 := make(chan string)
    ch3 := make(chan string)
    
    // Producers
    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "from channel 1"
    }()
    
    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "from channel 2"
    }()
    
    go func() {
        time.Sleep(150 * time.Millisecond)
        ch3 <- "from channel 3"
    }()
    
    // Collect from all channels
    for i := 0; i < 3; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Printf("Received: %s\n", msg1)
        case msg2 := <-ch2:
            fmt.Printf("Received: %s\n", msg2)
        case msg3 := <-ch3:
            fmt.Printf("Received: %s\n", msg3)
        }
    }
}

Advanced Channel Patterns

1. Channel of Channels (Dynamic Routing)

type Request struct {
    Data     string
    Response chan string
}

func channelOfChannels() {
    requests := make(chan Request)
    
    // Server goroutine
    go func() {
        for req := range requests {
            // Process request
            result := fmt.Sprintf("Processed: %s", req.Data)
            
            // Send response back through the request's channel
            req.Response <- result
            close(req.Response)
        }
    }()
    
    // Client function
    makeRequest := func(data string) string {
        responseCh := make(chan string)
        requests <- Request{
            Data:     data,
            Response: responseCh,
        }
        
        return <-responseCh
    }
    
    // Make several requests
    response1 := makeRequest("Hello")
    response2 := makeRequest("World")
    
    fmt.Printf("Response 1: %s\n", response1)
    fmt.Printf("Response 2: %s\n", response2)
    
    close(requests)
}

2. Channel Timeout Pattern

func channelTimeouts() {
    slowOperation := func() <-chan string {
        ch := make(chan string)
        go func() {
            time.Sleep(2 * time.Second) // Simulate slow operation
            ch <- "Operation completed"
        }()
        return ch
    }
    
    fmt.Println("Starting operation with timeout...")
    
    select {
    case result := <-slowOperation():
        fmt.Printf("Success: %s\n", result)
    case <-time.After(1 * time.Second):
        fmt.Println("Operation timed out")
    }
}

3. Channel Cancellation Pattern

func channelCancellation() {
    done := make(chan bool)
    
    // Long-running operation
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                fmt.Println("Working...")
            case <-done:
                fmt.Println("Operation cancelled")
                return
            }
        }
    }()
    
    // Let it run for a while
    time.Sleep(2 * time.Second)
    
    // Cancel the operation
    close(done)
    time.Sleep(100 * time.Millisecond) // Give time to cleanup
}

Channel Performance Considerations

1. Buffer Size Impact

func bufferSizePerformance() {
    measureChannelPerformance := func(bufferSize int, numMessages int) time.Duration {
        ch := make(chan int, bufferSize)
        start := time.Now()
        
        // Producer
        go func() {
            for i := 0; i < numMessages; i++ {
                ch <- i
            }
            close(ch)
        }()
        
        // Consumer
        for range ch {
            // Minimal processing
        }
        
        return time.Since(start)
    }
    
    numMessages := 100000
    
    // Test different buffer sizes
    bufferSizes := []int{0, 1, 10, 100, 1000}
    
    for _, size := range bufferSizes {
        duration := measureChannelPerformance(size, numMessages)
        fmt.Printf("Buffer size %d: %v\n", size, duration)
    }
}

2. Channel vs Mutex Performance

func channelVsMutexPerformance() {
    numOperations := 100000
    
    // Channel-based counter
    channelCounter := func() time.Duration {
        ch := make(chan int, 1)
        ch <- 0 // Initial value
        
        start := time.Now()
        
        var wg sync.WaitGroup
        for i := 0; i < numOperations; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                current := <-ch
                ch <- current + 1
            }()
        }
        
        wg.Wait()
        return time.Since(start)
    }
    
    // Mutex-based counter
    mutexCounter := func() time.Duration {
        var mu sync.Mutex
        counter := 0
        
        start := time.Now()
        
        var wg sync.WaitGroup
        for i := 0; i < numOperations; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                mu.Lock()
                counter++
                mu.Unlock()
            }()
        }
        
        wg.Wait()
        return time.Since(start)
    }
    
    fmt.Printf("Channel-based counter: %v\n", channelCounter())
    fmt.Printf("Mutex-based counter: %v\n", mutexCounter())
}

Common Channel Pitfalls

1. Deadlock Prevention

func deadlockPrevention() {
    // ❌ DEADLOCK: Sending to unbuffered channel with no receiver
    deadlockExample := func() {
        ch := make(chan int)
        ch <- 1 // This will block forever!
        fmt.Println("This will never print")
    }
    
    // ✅ SOLUTION 1: Use buffered channel
    solution1 := func() {
        ch := make(chan int, 1) // Buffer size 1
        ch <- 1
        value := <-ch
        fmt.Printf("Solution 1: %d\n", value)
    }
    
    // ✅ SOLUTION 2: Use goroutine
    solution2 := func() {
        ch := make(chan int)
        go func() {
            ch <- 1
        }()
        value := <-ch
        fmt.Printf("Solution 2: %d\n", value)
    }
    
    solution1()
    solution2()
}

2. Channel Leaks

func channelLeakPrevention() {
    // ❌ POTENTIAL LEAK: Goroutine blocked on channel send
    leakyFunction := func() <-chan int {
        ch := make(chan int)
        go func() {
            for i := 0; i < 10; i++ {
                ch <- i
                // If receiver stops early, this goroutine leaks!
            }
        }()
        return ch
    }
    
    // ✅ LEAK PREVENTION: Use context for cancellation
    safeFunction := func(ctx context.Context) <-chan int {
        ch := make(chan int)
        go func() {
            defer close(ch)
            for i := 0; i < 10; i++ {
                select {
                case ch <- i:
                    // Successfully sent
                case <-ctx.Done():
                    return // Cleanup on cancellation
                }
            }
        }()
        return ch
    }
    
    // Use the safe function
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    ch := safeFunction(ctx)
    
    // Read only first 3 values
    for i := 0; i < 3; i++ {
        value := <-ch
        fmt.Printf("Received: %d\n", value)
    }
    
    cancel() // This prevents the goroutine leak
}

Testing Channel-Based Code

func TestChannelCommunication(t *testing.T) {
    // Test channel communication with timeout
    ch := make(chan string, 1)
    
    // Send in goroutine
    go func() {
        time.Sleep(50 * time.Millisecond)
        ch <- "test message"
    }()
    
    // Receive with timeout
    select {
    case message := <-ch:
        if message != "test message" {
            t.Errorf("Expected 'test message', got '%s'", message)
        }
    case <-time.After(100 * time.Millisecond):
        t.Error("Timeout waiting for message")
    }
}

func TestChannelClosing(t *testing.T) {
    ch := make(chan int, 3)
    
    // Send values and close
    go func() {
        for i := 1; i <= 3; i++ {
            ch <- i
        }
        close(ch)
    }()
    
    // Collect all values
    var values []int
    for value := range ch {
        values = append(values, value)
    }
    
    expected := []int{1, 2, 3}
    if !reflect.DeepEqual(values, expected) {
        t.Errorf("Expected %v, got %v", expected, values)
    }
}

Best Practices

1. Channel Design Guidelines

// ✅ GOOD: Clear channel ownership and direction
func goodChannelDesign() {
    // Producer function returns receive-only channel
    createNumbers := func(count int) <-chan int {
        ch := make(chan int)
        go func() {
            defer close(ch)
            for i := 1; i <= count; i++ {
                ch <- i
            }
        }()
        return ch
    }
    
    // Consumer function accepts send-only channel
    processNumbers := func(input <-chan int, output chan<- string) {
        defer close(output)
        for num := range input {
            output <- fmt.Sprintf("Processed: %d", num)
        }
    }
    
    // Usage
    numbers := createNumbers(5)
    results := make(chan string)
    
    go processNumbers(numbers, results)
    
    for result := range results {
        fmt.Println(result)
    }
}

2. Error Handling with Channels

type Result struct {
    Value string
    Error error
}

func errorHandlingWithChannels() {
    processData := func(data string) <-chan Result {
        ch := make(chan Result, 1)
        
        go func() {
            defer close(ch)
            
            // Simulate processing that might fail
            if data == "invalid" {
                ch <- Result{Error: fmt.Errorf("invalid data: %s", data)}
                return
            }
            
            // Simulate processing time
            time.Sleep(100 * time.Millisecond)
            ch <- Result{Value: fmt.Sprintf("processed: %s", data)}
        }()
        
        return ch
    }
    
    // Process multiple items
    items := []string{"valid1", "invalid", "valid2"}
    
    for _, item := range items {
        result := <-processData(item)
        if result.Error != nil {
            fmt.Printf("Error processing %s: %v\n", item, result.Error)
        } else {
            fmt.Printf("Success: %s\n", result.Value)
        }
    }
}

Conclusion

Channels are the cornerstone of Go’s concurrency model, providing:

  • Type-safe communication between goroutines
  • Synchronization without explicit locks
  • Composability for building complex concurrent systems
  • Directional constraints for better API design

Key Takeaways

  1. Use unbuffered channels for synchronization and buffered channels for decoupling
  2. Follow ownership patterns - the creator should close the channel
  3. Use directional channels in function parameters for clarity
  4. Always handle channel closing to prevent goroutine leaks
  5. Prefer channels over shared memory for goroutine communication

Channel Selection Guide

  • Unbuffered channels: When you need synchronization between goroutines
  • Buffered channels: When you want to decouple producers and consumers
  • Directional channels: In function parameters to enforce usage patterns
  • Channel of channels: For dynamic request-response patterns

What’s Next?

Now that you understand channels, the next step is mastering the select statement - Go’s powerful tool for handling multiple channel operations simultaneously. The select statement enables non-blocking communication, timeouts, and elegant multiplexing patterns.

In the next post, we’ll explore Select Statement and learn how to build sophisticated channel-based control flow.


This post is part of the Go Concurrency Patterns series. Understanding channels is crucial for all advanced concurrency patterns we’ll cover later.