Go Concurrency Patterns Series: ← Goroutine Basics | Series Overview | Select Statement →
What are Channels?
Channels are Go’s primary mechanism for communication between goroutines. They embody Go’s concurrency philosophy: “Don’t communicate by sharing memory; share memory by communicating.” Think of channels as typed pipes that allow goroutines to safely pass data back and forth.
Channels provide both communication and synchronization, making them incredibly powerful for building concurrent applications. They’re type-safe, can be buffered or unbuffered, and support directional constraints for better API design.
The Problem: Unsafe Shared Memory
Before diving into channels, let’s see why we need them. Here’s what happens when goroutines share memory unsafely:
package main
import (
"fmt"
"sync"
"time"
)
// ❌ UNSAFE: Race condition example
func unsafeSharedMemory() {
var counter int
var wg sync.WaitGroup
// Multiple goroutines incrementing the same variable
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // Race condition!
}()
}
wg.Wait()
fmt.Printf("Unsafe counter result: %d (should be 1000)\n", counter)
}
This code has a race condition. Multiple goroutines are accessing the same memory location simultaneously, leading to unpredictable results.
Channel Basics: Safe Communication
Here’s how channels solve the communication problem:
func safeChannelCommunication() {
// Create a channel for integers
ch := make(chan int)
// Goroutine that sends data
go func() {
for i := 1; i <= 5; i++ {
fmt.Printf("Sending: %d\n", i)
ch <- i // Send value to channel
time.Sleep(500 * time.Millisecond)
}
close(ch) // Signal that no more values will be sent
}()
// Receive data from channel
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
fmt.Println("Channel communication complete")
}
Output:
Sending: 1
Received: 1
Sending: 2
Received: 2
Sending: 3
Received: 3
Sending: 4
Received: 4
Sending: 5
Received: 5
Channel communication complete
Channel Types and Creation
1. Unbuffered Channels (Synchronous)
func unbufferedChannels() {
// Create unbuffered channel
ch := make(chan string)
go func() {
fmt.Println("Goroutine: About to send")
ch <- "Hello" // This blocks until someone receives
fmt.Println("Goroutine: Sent successfully")
}()
time.Sleep(1 * time.Second) // Simulate delay
fmt.Println("Main: About to receive")
message := <-ch // This blocks until someone sends
fmt.Printf("Main: Received '%s'\n", message)
}
Unbuffered channels provide synchronous communication - the sender blocks until the receiver is ready, and vice versa.
2. Buffered Channels (Asynchronous)
func bufferedChannels() {
// Create buffered channel with capacity 3
ch := make(chan int, 3)
// Send values without blocking (until buffer is full)
ch <- 1
ch <- 2
ch <- 3
fmt.Printf("Sent 3 values, buffer length: %d, capacity: %d\n",
len(ch), cap(ch))
// This would block because buffer is full
// ch <- 4 // Deadlock!
// Receive values
for i := 0; i < 3; i++ {
value := <-ch
fmt.Printf("Received: %d\n", value)
}
}
3. Channel Directions (Type Safety)
// Send-only channel parameter
func sender(ch chan<- int) {
for i := 1; i <= 3; i++ {
ch <- i
}
close(ch)
}
// Receive-only channel parameter
func receiver(ch <-chan int) {
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
}
func directionalChannels() {
ch := make(chan int)
go sender(ch) // Can only send to ch
receiver(ch) // Can only receive from ch
}
Real-World Example: Producer-Consumer Pattern
Let’s build a practical example that demonstrates channel usage in a producer-consumer scenario:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
Data string
Priority int
}
type TaskProcessor struct {
taskQueue chan Task
resultQueue chan string
workers int
wg sync.WaitGroup
}
func NewTaskProcessor(workers, queueSize int) *TaskProcessor {
return &TaskProcessor{
taskQueue: make(chan Task, queueSize),
resultQueue: make(chan string, queueSize),
workers: workers,
}
}
func (tp *TaskProcessor) Start() {
// Start worker goroutines
for i := 0; i < tp.workers; i++ {
tp.wg.Add(1)
go tp.worker(i)
}
fmt.Printf("Started %d workers\n", tp.workers)
}
func (tp *TaskProcessor) worker(id int) {
defer tp.wg.Done()
for task := range tp.taskQueue {
// Simulate processing time based on priority
processingTime := time.Duration(task.Priority) * 100 * time.Millisecond
time.Sleep(processingTime)
result := fmt.Sprintf("Worker %d processed task %d (%s) in %v",
id, task.ID, task.Data, processingTime)
tp.resultQueue <- result
}
fmt.Printf("Worker %d finished\n", id)
}
func (tp *TaskProcessor) SubmitTask(task Task) {
tp.taskQueue <- task
}
func (tp *TaskProcessor) Stop() {
close(tp.taskQueue)
tp.wg.Wait()
close(tp.resultQueue)
}
func (tp *TaskProcessor) GetResults() []string {
var results []string
for result := range tp.resultQueue {
results = append(results, result)
}
return results
}
func main() {
// Create task processor with 3 workers and buffer size 10
processor := NewTaskProcessor(3, 10)
processor.Start()
// Producer: Generate tasks
go func() {
tasks := []Task{
{ID: 1, Data: "Process image", Priority: 3},
{ID: 2, Data: "Send email", Priority: 1},
{ID: 3, Data: "Generate report", Priority: 5},
{ID: 4, Data: "Update database", Priority: 2},
{ID: 5, Data: "Backup files", Priority: 4},
{ID: 6, Data: "Analyze logs", Priority: 2},
}
for _, task := range tasks {
fmt.Printf("Submitting task %d: %s (priority %d)\n",
task.ID, task.Data, task.Priority)
processor.SubmitTask(task)
}
// Signal no more tasks
processor.Stop()
}()
// Consumer: Collect results
results := processor.GetResults()
fmt.Println("\nAll tasks completed:")
for _, result := range results {
fmt.Printf("✅ %s\n", result)
}
}
Channel Patterns and Idioms
1. Channel Closing and Detection
func channelClosingPatterns() {
ch := make(chan int, 5)
// Producer
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch) // Important: close when done sending
}()
// Consumer with close detection
for {
value, ok := <-ch
if !ok {
fmt.Println("Channel closed")
break
}
fmt.Printf("Received: %d\n", value)
}
// Alternative: range automatically handles closing
ch2 := make(chan string, 3)
go func() {
ch2 <- "Hello"
ch2 <- "World"
ch2 <- "!"
close(ch2)
}()
for message := range ch2 {
fmt.Printf("Message: %s\n", message)
}
}
2. Channel Ownership Pattern
// ✅ GOOD: Clear ownership - creator closes
func channelOwnership() {
// Function that creates and owns the channel
createDataStream := func() <-chan int {
ch := make(chan int)
go func() {
defer close(ch) // Owner closes the channel
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(100 * time.Millisecond)
}
}()
return ch
}
// Consumer just receives
dataStream := createDataStream()
for data := range dataStream {
fmt.Printf("Processing: %d\n", data)
}
}
3. Channel Multiplexing
func channelMultiplexing() {
// Multiple input channels
ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)
// Producers
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "from channel 1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "from channel 2"
}()
go func() {
time.Sleep(150 * time.Millisecond)
ch3 <- "from channel 3"
}()
// Collect from all channels
for i := 0; i < 3; i++ {
select {
case msg1 := <-ch1:
fmt.Printf("Received: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("Received: %s\n", msg2)
case msg3 := <-ch3:
fmt.Printf("Received: %s\n", msg3)
}
}
}
Advanced Channel Patterns
1. Channel of Channels (Dynamic Routing)
type Request struct {
Data string
Response chan string
}
func channelOfChannels() {
requests := make(chan Request)
// Server goroutine
go func() {
for req := range requests {
// Process request
result := fmt.Sprintf("Processed: %s", req.Data)
// Send response back through the request's channel
req.Response <- result
close(req.Response)
}
}()
// Client function
makeRequest := func(data string) string {
responseCh := make(chan string)
requests <- Request{
Data: data,
Response: responseCh,
}
return <-responseCh
}
// Make several requests
response1 := makeRequest("Hello")
response2 := makeRequest("World")
fmt.Printf("Response 1: %s\n", response1)
fmt.Printf("Response 2: %s\n", response2)
close(requests)
}
2. Channel Timeout Pattern
func channelTimeouts() {
slowOperation := func() <-chan string {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second) // Simulate slow operation
ch <- "Operation completed"
}()
return ch
}
fmt.Println("Starting operation with timeout...")
select {
case result := <-slowOperation():
fmt.Printf("Success: %s\n", result)
case <-time.After(1 * time.Second):
fmt.Println("Operation timed out")
}
}
3. Channel Cancellation Pattern
func channelCancellation() {
done := make(chan bool)
// Long-running operation
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("Working...")
case <-done:
fmt.Println("Operation cancelled")
return
}
}
}()
// Let it run for a while
time.Sleep(2 * time.Second)
// Cancel the operation
close(done)
time.Sleep(100 * time.Millisecond) // Give time to cleanup
}
Channel Performance Considerations
1. Buffer Size Impact
func bufferSizePerformance() {
measureChannelPerformance := func(bufferSize int, numMessages int) time.Duration {
ch := make(chan int, bufferSize)
start := time.Now()
// Producer
go func() {
for i := 0; i < numMessages; i++ {
ch <- i
}
close(ch)
}()
// Consumer
for range ch {
// Minimal processing
}
return time.Since(start)
}
numMessages := 100000
// Test different buffer sizes
bufferSizes := []int{0, 1, 10, 100, 1000}
for _, size := range bufferSizes {
duration := measureChannelPerformance(size, numMessages)
fmt.Printf("Buffer size %d: %v\n", size, duration)
}
}
2. Channel vs Mutex Performance
func channelVsMutexPerformance() {
numOperations := 100000
// Channel-based counter
channelCounter := func() time.Duration {
ch := make(chan int, 1)
ch <- 0 // Initial value
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < numOperations; i++ {
wg.Add(1)
go func() {
defer wg.Done()
current := <-ch
ch <- current + 1
}()
}
wg.Wait()
return time.Since(start)
}
// Mutex-based counter
mutexCounter := func() time.Duration {
var mu sync.Mutex
counter := 0
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < numOperations; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
return time.Since(start)
}
fmt.Printf("Channel-based counter: %v\n", channelCounter())
fmt.Printf("Mutex-based counter: %v\n", mutexCounter())
}
Common Channel Pitfalls
1. Deadlock Prevention
func deadlockPrevention() {
// ❌ DEADLOCK: Sending to unbuffered channel with no receiver
deadlockExample := func() {
ch := make(chan int)
ch <- 1 // This will block forever!
fmt.Println("This will never print")
}
// ✅ SOLUTION 1: Use buffered channel
solution1 := func() {
ch := make(chan int, 1) // Buffer size 1
ch <- 1
value := <-ch
fmt.Printf("Solution 1: %d\n", value)
}
// ✅ SOLUTION 2: Use goroutine
solution2 := func() {
ch := make(chan int)
go func() {
ch <- 1
}()
value := <-ch
fmt.Printf("Solution 2: %d\n", value)
}
solution1()
solution2()
}
2. Channel Leaks
func channelLeakPrevention() {
// ❌ POTENTIAL LEAK: Goroutine blocked on channel send
leakyFunction := func() <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
// If receiver stops early, this goroutine leaks!
}
}()
return ch
}
// ✅ LEAK PREVENTION: Use context for cancellation
safeFunction := func(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 10; i++ {
select {
case ch <- i:
// Successfully sent
case <-ctx.Done():
return // Cleanup on cancellation
}
}
}()
return ch
}
// Use the safe function
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := safeFunction(ctx)
// Read only first 3 values
for i := 0; i < 3; i++ {
value := <-ch
fmt.Printf("Received: %d\n", value)
}
cancel() // This prevents the goroutine leak
}
Testing Channel-Based Code
func TestChannelCommunication(t *testing.T) {
// Test channel communication with timeout
ch := make(chan string, 1)
// Send in goroutine
go func() {
time.Sleep(50 * time.Millisecond)
ch <- "test message"
}()
// Receive with timeout
select {
case message := <-ch:
if message != "test message" {
t.Errorf("Expected 'test message', got '%s'", message)
}
case <-time.After(100 * time.Millisecond):
t.Error("Timeout waiting for message")
}
}
func TestChannelClosing(t *testing.T) {
ch := make(chan int, 3)
// Send values and close
go func() {
for i := 1; i <= 3; i++ {
ch <- i
}
close(ch)
}()
// Collect all values
var values []int
for value := range ch {
values = append(values, value)
}
expected := []int{1, 2, 3}
if !reflect.DeepEqual(values, expected) {
t.Errorf("Expected %v, got %v", expected, values)
}
}
Best Practices
1. Channel Design Guidelines
// ✅ GOOD: Clear channel ownership and direction
func goodChannelDesign() {
// Producer function returns receive-only channel
createNumbers := func(count int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 1; i <= count; i++ {
ch <- i
}
}()
return ch
}
// Consumer function accepts send-only channel
processNumbers := func(input <-chan int, output chan<- string) {
defer close(output)
for num := range input {
output <- fmt.Sprintf("Processed: %d", num)
}
}
// Usage
numbers := createNumbers(5)
results := make(chan string)
go processNumbers(numbers, results)
for result := range results {
fmt.Println(result)
}
}
2. Error Handling with Channels
type Result struct {
Value string
Error error
}
func errorHandlingWithChannels() {
processData := func(data string) <-chan Result {
ch := make(chan Result, 1)
go func() {
defer close(ch)
// Simulate processing that might fail
if data == "invalid" {
ch <- Result{Error: fmt.Errorf("invalid data: %s", data)}
return
}
// Simulate processing time
time.Sleep(100 * time.Millisecond)
ch <- Result{Value: fmt.Sprintf("processed: %s", data)}
}()
return ch
}
// Process multiple items
items := []string{"valid1", "invalid", "valid2"}
for _, item := range items {
result := <-processData(item)
if result.Error != nil {
fmt.Printf("Error processing %s: %v\n", item, result.Error)
} else {
fmt.Printf("Success: %s\n", result.Value)
}
}
}
Conclusion
Channels are the cornerstone of Go’s concurrency model, providing:
- Type-safe communication between goroutines
- Synchronization without explicit locks
- Composability for building complex concurrent systems
- Directional constraints for better API design
Key Takeaways
- Use unbuffered channels for synchronization and buffered channels for decoupling
- Follow ownership patterns - the creator should close the channel
- Use directional channels in function parameters for clarity
- Always handle channel closing to prevent goroutine leaks
- Prefer channels over shared memory for goroutine communication
Channel Selection Guide
- Unbuffered channels: When you need synchronization between goroutines
- Buffered channels: When you want to decouple producers and consumers
- Directional channels: In function parameters to enforce usage patterns
- Channel of channels: For dynamic request-response patterns
What’s Next?
Now that you understand channels, the next step is mastering the select statement - Go’s powerful tool for handling multiple channel operations simultaneously. The select statement enables non-blocking communication, timeouts, and elegant multiplexing patterns.
In the next post, we’ll explore Select Statement and learn how to build sophisticated channel-based control flow.
This post is part of the Go Concurrency Patterns series. Understanding channels is crucial for all advanced concurrency patterns we’ll cover later.