The Power of Streaming Pipelines
Imagine processing a million log entries. The naive approach loads everything into memory, processes it, then outputs results. But what if you don’t have enough RAM? What if you want results streaming in real-time?
Pipeline patterns break complex processing into stages connected by channels. Data flows through the pipeline, with each stage transforming it concurrently. It’s Unix pipes meets goroutines—and it’s beautiful.
The Sequential Approach
Here’s what we’re moving away from:
func ProcessLogs(filename string) ([]Result, error) {
// Load everything into memory
logs := loadAllLogs(filename) // Uh oh, 10GB file!
// Process sequentially
filtered := filterLogs(logs)
transformed := transformLogs(filtered)
enriched := enrichLogs(transformed)
aggregated := aggregateLogs(enriched)
return aggregated, nil
}
// Problems:
// - Memory explosion with large datasets
// - No concurrency
// - No early results
// - Hard to add/remove stages
Building a Simple Pipeline
Let’s create a data processing pipeline:
package main
import (
"fmt"
"strings"
"time"
)
// Generator stage: produces data
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Transformer stage: squares numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Filter stage: only even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
// Consumer stage: prints results
func consume(in <-chan int) {
for n := range in {
fmt.Printf("%d ", n)
}
fmt.Println()
}
func main() {
fmt.Println("=== Simple Pipeline Demo ===\n")
// Build pipeline: generate -> square -> filter -> consume
numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(numbers)
evens := filterEven(squared)
fmt.Print("Results: ")
consume(evens)
fmt.Println("\n=== Demo Complete ===")
}
Real-World Example: Log Processing Pipeline
Let’s build a practical log processing system:
package main
import (
"bufio"
"fmt"
"strings"
"time"
)
// LogEntry represents a parsed log line
type LogEntry struct {
Timestamp time.Time
Level string
Message string
Service string
}
type ProcessedLog struct {
LogEntry
WordCount int
HasError bool
}
type Alert struct {
Service string
Count int
Period time.Duration
}
// Stage 1: Read lines from source
func readLines(lines []string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, line := range lines {
out <- line
time.Sleep(10 * time.Millisecond) // Simulate streaming
}
}()
return out
}
// Stage 2: Parse log lines into structured data
func parseLog(in <-chan string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for line := range in {
parts := strings.SplitN(line, "|", 4)
if len(parts) != 4 {
continue // Skip malformed lines
}
entry := LogEntry{
Timestamp: time.Now(),
Level: strings.TrimSpace(parts[1]),
Service: strings.TrimSpace(parts[2]),
Message: strings.TrimSpace(parts[3]),
}
out <- entry
}
}()
return out
}
// Stage 3: Filter logs by level
func filterByLevel(in <-chan LogEntry, levels ...string) <-chan LogEntry {
out := make(chan LogEntry)
levelMap := make(map[string]bool)
for _, level := range levels {
levelMap[level] = true
}
go func() {
defer close(out)
for entry := range in {
if levelMap[entry.Level] {
out <- entry
}
}
}()
return out
}
// Stage 4: Enrich logs with additional data
func enrichLog(in <-chan LogEntry) <-chan ProcessedLog {
out := make(chan ProcessedLog)
go func() {
defer close(out)
for entry := range in {
processed := ProcessedLog{
LogEntry: entry,
WordCount: len(strings.Fields(entry.Message)),
HasError: strings.Contains(strings.ToLower(entry.Message), "error"),
}
out <- processed
}
}()
return out
}
// Stage 5: Fan-out pattern - duplicate stream for multiple consumers
func fanOut(in <-chan ProcessedLog, n int) []<-chan ProcessedLog {
outs := make([]chan ProcessedLog, n)
for i := range outs {
outs[i] = make(chan ProcessedLog)
}
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for log := range in {
// Send to all output channels
for _, out := range outs {
out <- log
}
}
}()
// Convert to receive-only channels
result := make([]<-chan ProcessedLog, n)
for i, out := range outs {
result[i] = out
}
return result
}
// Consumer 1: Print logs
func printLogs(in <-chan ProcessedLog, name string) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
count := 0
for log := range in {
fmt.Printf("[%s] [%s] %s: %s (words: %d)\n",
name, log.Level, log.Service, log.Message, log.WordCount)
count++
}
fmt.Printf("[%s] Processed %d logs\n", name, count)
}()
return done
}
// Consumer 2: Detect error patterns
func detectErrors(in <-chan ProcessedLog) <-chan Alert {
alerts := make(chan Alert)
go func() {
defer close(alerts)
errorCount := make(map[string]int)
for log := range in {
if log.HasError {
errorCount[log.Service]++
if errorCount[log.Service]%3 == 0 {
alerts <- Alert{
Service: log.Service,
Count: errorCount[log.Service],
Period: time.Minute,
}
}
}
}
}()
return alerts
}
// Consumer 3: Calculate statistics
func calculateStats(in <-chan ProcessedLog) <-chan map[string]int {
stats := make(chan map[string]int)
go func() {
defer close(stats)
levelCounts := make(map[string]int)
serviceCounts := make(map[string]int)
for log := range in {
levelCounts[log.Level]++
serviceCounts[log.Service]++
}
// Combine stats
allStats := make(map[string]int)
for k, v := range levelCounts {
allStats["level_"+k] = v
}
for k, v := range serviceCounts {
allStats["service_"+k] = v
}
stats <- allStats
}()
return stats
}
// Fan-in pattern: merge multiple channels
func merge(channels ...<-chan Alert) <-chan Alert {
out := make(chan Alert)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan Alert) {
defer wg.Done()
for alert := range c {
out <- alert
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
fmt.Println("=== Log Processing Pipeline Demo ===\n")
// Simulate log data
logData := []string{
"2024-08-30 10:00:00|INFO|api-service|Request processed successfully",
"2024-08-30 10:00:05|ERROR|api-service|Database connection failed",
"2024-08-30 10:00:10|WARN|auth-service|Slow response detected",
"2024-08-30 10:00:15|ERROR|api-service|Timeout error occurred",
"2024-08-30 10:00:20|INFO|payment-service|Payment completed",
"2024-08-30 10:00:25|ERROR|api-service|Internal server error",
"2024-08-30 10:00:30|INFO|api-service|Health check passed",
"2024-08-30 10:00:35|ERROR|auth-service|Invalid token error",
"2024-08-30 10:00:40|DEBUG|api-service|Debug information",
"2024-08-30 10:00:45|ERROR|payment-service|Payment processing error",
}
// Build pipeline
lines := readLines(logData)
parsed := parseLog(lines)
filtered := filterByLevel(parsed, "INFO", "ERROR", "WARN")
enriched := enrichLog(filtered)
// Fan-out to multiple consumers
streams := fanOut(enriched, 3)
// Start consumers
printer := printLogs(streams[0], "Printer")
errors := detectErrors(streams[1])
stats := calculateStats(streams[2])
// Process alerts
go func() {
for alert := range errors {
fmt.Printf("\n🚨 ALERT: %s has %d errors!\n\n", alert.Service, alert.Count)
}
}()
// Wait for printer to finish
<-printer
// Get final statistics
finalStats := <-stats
fmt.Println("\n=== Statistics ===")
for key, count := range finalStats {
fmt.Printf("%s: %d\n", key, count)
}
fmt.Println("\n=== Pipeline Complete ===")
}
Advanced: Buffered Stages
Control pipeline flow with buffered channels:
// Buffered stage prevents blocking
func bufferedTransform(in <-chan int, bufferSize int) <-chan int {
out := make(chan int, bufferSize) // Buffered channel
go func() {
defer close(out)
for n := range in {
// Heavy processing
time.Sleep(10 * time.Millisecond)
out <- n * 2
}
}()
return out
}
Advanced: Parallel Processing
Process data in parallel within a stage:
func parallelSquare(in <-chan int, workers int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Start worker pool
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for n := range in {
// Process in parallel
time.Sleep(10 * time.Millisecond)
out <- n * n
}
}()
}
// Close output when all workers done
go func() {
wg.Wait()
close(out)
}()
return out
}
Advanced: Error Handling
Propagate errors through the pipeline:
type Result struct {
Value int
Error error
}
func safeDivide(in <-chan int, divisor int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for n := range in {
if divisor == 0 {
out <- Result{Error: fmt.Errorf("division by zero")}
continue
}
out <- Result{Value: n / divisor}
}
}()
return out
}
Advanced: Cancellation with Context
Gracefully shutdown pipelines:
func cancellableStage(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
out <- n * 2
case <-ctx.Done():
fmt.Println("Stage cancelled")
return
}
}
}()
return out
}
// Usage
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stage := cancellableStage(ctx, input)
Performance Patterns
// Ordered processing: maintain input order
func orderedProcess(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
results := make(map[int]int)
nextIndex := 0
for item := range in {
results[item] = processItem(item)
// Output in order
for {
if val, ok := results[nextIndex]; ok {
out <- val
delete(results, nextIndex)
nextIndex++
} else {
break
}
}
}
}()
return out
}
Benefits of Pipeline Pattern
- Memory Efficiency: Process data as it arrives, no full load needed
- Concurrency: Each stage runs in parallel
- Composability: Easy to add/remove/reorder stages
- Testability: Test each stage independently
- Scalability: Add workers to bottleneck stages
When to Use Pipelines
Pipeline patterns excel when:
- Processing large datasets that don’t fit in memory
- You want streaming/real-time results
- Different processing stages have different speeds
- You need composable, reusable processing stages
- Building ETL (Extract, Transform, Load) systems
Common Pipeline Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Linear | Simple chain of stages | Log processing |
| Fan-out | Split data to multiple consumers | Load balancing |
| Fan-in | Merge results from multiple sources | Aggregation |
| Worker Pool | Parallel processing in stage | CPU-heavy tasks |
| Buffered | Add buffers between stages | Rate matching |
Thank you
Pipeline patterns transform complex data processing into elegant, concurrent flows. By breaking work into stages connected by channels, you leverage Go’s concurrency primitives to build scalable, maintainable systems. It’s the Unix philosophy brought to life with goroutines.
Please drop an email at [email protected] if you would like to share any feedback or suggestions. Peace!