The Power of Streaming Pipelines

Imagine processing a million log entries. The naive approach loads everything into memory, processes it, then outputs results. But what if you don’t have enough RAM? What if you want results streaming in real-time?

Pipeline patterns break complex processing into stages connected by channels. Data flows through the pipeline, with each stage transforming it concurrently. It’s Unix pipes meets goroutines—and it’s beautiful.

The Sequential Approach

Here’s what we’re moving away from:

func ProcessLogs(filename string) ([]Result, error) {
    // Load everything into memory
    logs := loadAllLogs(filename) // Uh oh, 10GB file!

    // Process sequentially
    filtered := filterLogs(logs)
    transformed := transformLogs(filtered)
    enriched := enrichLogs(transformed)
    aggregated := aggregateLogs(enriched)

    return aggregated, nil
}

// Problems:
// - Memory explosion with large datasets
// - No concurrency
// - No early results
// - Hard to add/remove stages

Building a Simple Pipeline

Let’s create a data processing pipeline:

package main

import (
    "fmt"
    "strings"
    "time"
)

// Generator stage: produces data
func generate(nums ...int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()

    return out
}

// Transformer stage: squares numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()

    return out
}

// Filter stage: only even numbers
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()

    return out
}

// Consumer stage: prints results
func consume(in <-chan int) {
    for n := range in {
        fmt.Printf("%d ", n)
    }
    fmt.Println()
}

func main() {
    fmt.Println("=== Simple Pipeline Demo ===\n")

    // Build pipeline: generate -> square -> filter -> consume
    numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(numbers)
    evens := filterEven(squared)

    fmt.Print("Results: ")
    consume(evens)

    fmt.Println("\n=== Demo Complete ===")
}
graph LR Gen[Generator]:::lightBlue Square[Square]:::lightGreen Filter[Filter Even]:::lightYellow Consume[Consumer]:::lightPurple Gen -->|chan int| Square Square -->|chan int| Filter Filter -->|chan int| Consume classDef lightBlue fill:#87CEEB,stroke:#4682B4,stroke-width:2px,color:#000 classDef lightGreen fill:#90EE90,stroke:#228B22,stroke-width:2px,color:#000 classDef lightYellow fill:#FFFFE0,stroke:#FFD700,stroke-width:2px,color:#000 classDef lightPurple fill:#DDA0DD,stroke:#9370DB,stroke-width:2px,color:#000

Real-World Example: Log Processing Pipeline

Let’s build a practical log processing system:

package main

import (
    "bufio"
    "fmt"
    "strings"
    "time"
)

// LogEntry represents a parsed log line
type LogEntry struct {
    Timestamp time.Time
    Level     string
    Message   string
    Service   string
}

type ProcessedLog struct {
    LogEntry
    WordCount int
    HasError  bool
}

type Alert struct {
    Service string
    Count   int
    Period  time.Duration
}

// Stage 1: Read lines from source
func readLines(lines []string) <-chan string {
    out := make(chan string)

    go func() {
        defer close(out)
        for _, line := range lines {
            out <- line
            time.Sleep(10 * time.Millisecond) // Simulate streaming
        }
    }()

    return out
}

// Stage 2: Parse log lines into structured data
func parseLog(in <-chan string) <-chan LogEntry {
    out := make(chan LogEntry)

    go func() {
        defer close(out)
        for line := range in {
            parts := strings.SplitN(line, "|", 4)
            if len(parts) != 4 {
                continue // Skip malformed lines
            }

            entry := LogEntry{
                Timestamp: time.Now(),
                Level:     strings.TrimSpace(parts[1]),
                Service:   strings.TrimSpace(parts[2]),
                Message:   strings.TrimSpace(parts[3]),
            }

            out <- entry
        }
    }()

    return out
}

// Stage 3: Filter logs by level
func filterByLevel(in <-chan LogEntry, levels ...string) <-chan LogEntry {
    out := make(chan LogEntry)
    levelMap := make(map[string]bool)

    for _, level := range levels {
        levelMap[level] = true
    }

    go func() {
        defer close(out)
        for entry := range in {
            if levelMap[entry.Level] {
                out <- entry
            }
        }
    }()

    return out
}

// Stage 4: Enrich logs with additional data
func enrichLog(in <-chan LogEntry) <-chan ProcessedLog {
    out := make(chan ProcessedLog)

    go func() {
        defer close(out)
        for entry := range in {
            processed := ProcessedLog{
                LogEntry:  entry,
                WordCount: len(strings.Fields(entry.Message)),
                HasError:  strings.Contains(strings.ToLower(entry.Message), "error"),
            }
            out <- processed
        }
    }()

    return out
}

// Stage 5: Fan-out pattern - duplicate stream for multiple consumers
func fanOut(in <-chan ProcessedLog, n int) []<-chan ProcessedLog {
    outs := make([]chan ProcessedLog, n)
    for i := range outs {
        outs[i] = make(chan ProcessedLog)
    }

    go func() {
        defer func() {
            for _, out := range outs {
                close(out)
            }
        }()

        for log := range in {
            // Send to all output channels
            for _, out := range outs {
                out <- log
            }
        }
    }()

    // Convert to receive-only channels
    result := make([]<-chan ProcessedLog, n)
    for i, out := range outs {
        result[i] = out
    }

    return result
}

// Consumer 1: Print logs
func printLogs(in <-chan ProcessedLog, name string) <-chan struct{} {
    done := make(chan struct{})

    go func() {
        defer close(done)
        count := 0
        for log := range in {
            fmt.Printf("[%s] [%s] %s: %s (words: %d)\n",
                name, log.Level, log.Service, log.Message, log.WordCount)
            count++
        }
        fmt.Printf("[%s] Processed %d logs\n", name, count)
    }()

    return done
}

// Consumer 2: Detect error patterns
func detectErrors(in <-chan ProcessedLog) <-chan Alert {
    alerts := make(chan Alert)

    go func() {
        defer close(alerts)
        errorCount := make(map[string]int)

        for log := range in {
            if log.HasError {
                errorCount[log.Service]++

                if errorCount[log.Service]%3 == 0 {
                    alerts <- Alert{
                        Service: log.Service,
                        Count:   errorCount[log.Service],
                        Period:  time.Minute,
                    }
                }
            }
        }
    }()

    return alerts
}

// Consumer 3: Calculate statistics
func calculateStats(in <-chan ProcessedLog) <-chan map[string]int {
    stats := make(chan map[string]int)

    go func() {
        defer close(stats)

        levelCounts := make(map[string]int)
        serviceCounts := make(map[string]int)

        for log := range in {
            levelCounts[log.Level]++
            serviceCounts[log.Service]++
        }

        // Combine stats
        allStats := make(map[string]int)
        for k, v := range levelCounts {
            allStats["level_"+k] = v
        }
        for k, v := range serviceCounts {
            allStats["service_"+k] = v
        }

        stats <- allStats
    }()

    return stats
}

// Fan-in pattern: merge multiple channels
func merge(channels ...<-chan Alert) <-chan Alert {
    out := make(chan Alert)
    var wg sync.WaitGroup

    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan Alert) {
            defer wg.Done()
            for alert := range c {
                out <- alert
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    fmt.Println("=== Log Processing Pipeline Demo ===\n")

    // Simulate log data
    logData := []string{
        "2024-08-30 10:00:00|INFO|api-service|Request processed successfully",
        "2024-08-30 10:00:05|ERROR|api-service|Database connection failed",
        "2024-08-30 10:00:10|WARN|auth-service|Slow response detected",
        "2024-08-30 10:00:15|ERROR|api-service|Timeout error occurred",
        "2024-08-30 10:00:20|INFO|payment-service|Payment completed",
        "2024-08-30 10:00:25|ERROR|api-service|Internal server error",
        "2024-08-30 10:00:30|INFO|api-service|Health check passed",
        "2024-08-30 10:00:35|ERROR|auth-service|Invalid token error",
        "2024-08-30 10:00:40|DEBUG|api-service|Debug information",
        "2024-08-30 10:00:45|ERROR|payment-service|Payment processing error",
    }

    // Build pipeline
    lines := readLines(logData)
    parsed := parseLog(lines)
    filtered := filterByLevel(parsed, "INFO", "ERROR", "WARN")
    enriched := enrichLog(filtered)

    // Fan-out to multiple consumers
    streams := fanOut(enriched, 3)

    // Start consumers
    printer := printLogs(streams[0], "Printer")
    errors := detectErrors(streams[1])
    stats := calculateStats(streams[2])

    // Process alerts
    go func() {
        for alert := range errors {
            fmt.Printf("\n🚨 ALERT: %s has %d errors!\n\n", alert.Service, alert.Count)
        }
    }()

    // Wait for printer to finish
    <-printer

    // Get final statistics
    finalStats := <-stats
    fmt.Println("\n=== Statistics ===")
    for key, count := range finalStats {
        fmt.Printf("%s: %d\n", key, count)
    }

    fmt.Println("\n=== Pipeline Complete ===")
}

Advanced: Buffered Stages

Control pipeline flow with buffered channels:

// Buffered stage prevents blocking
func bufferedTransform(in <-chan int, bufferSize int) <-chan int {
    out := make(chan int, bufferSize) // Buffered channel

    go func() {
        defer close(out)
        for n := range in {
            // Heavy processing
            time.Sleep(10 * time.Millisecond)
            out <- n * 2
        }
    }()

    return out
}

Advanced: Parallel Processing

Process data in parallel within a stage:

func parallelSquare(in <-chan int, workers int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // Start worker pool
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for n := range in {
                // Process in parallel
                time.Sleep(10 * time.Millisecond)
                out <- n * n
            }
        }()
    }

    // Close output when all workers done
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Advanced: Error Handling

Propagate errors through the pipeline:

type Result struct {
    Value int
    Error error
}

func safeDivide(in <-chan int, divisor int) <-chan Result {
    out := make(chan Result)

    go func() {
        defer close(out)
        for n := range in {
            if divisor == 0 {
                out <- Result{Error: fmt.Errorf("division by zero")}
                continue
            }
            out <- Result{Value: n / divisor}
        }
    }()

    return out
}

Advanced: Cancellation with Context

Gracefully shutdown pipelines:

func cancellableStage(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for {
            select {
            case n, ok := <-in:
                if !ok {
                    return
                }
                out <- n * 2

            case <-ctx.Done():
                fmt.Println("Stage cancelled")
                return
            }
        }
    }()

    return out
}

// Usage
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stage := cancellableStage(ctx, input)

Performance Patterns

// Ordered processing: maintain input order
func orderedProcess(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        results := make(map[int]int)
        nextIndex := 0

        for item := range in {
            results[item] = processItem(item)

            // Output in order
            for {
                if val, ok := results[nextIndex]; ok {
                    out <- val
                    delete(results, nextIndex)
                    nextIndex++
                } else {
                    break
                }
            }
        }
    }()

    return out
}

Benefits of Pipeline Pattern

  1. Memory Efficiency: Process data as it arrives, no full load needed
  2. Concurrency: Each stage runs in parallel
  3. Composability: Easy to add/remove/reorder stages
  4. Testability: Test each stage independently
  5. Scalability: Add workers to bottleneck stages

When to Use Pipelines

Pipeline patterns excel when:

  • Processing large datasets that don’t fit in memory
  • You want streaming/real-time results
  • Different processing stages have different speeds
  • You need composable, reusable processing stages
  • Building ETL (Extract, Transform, Load) systems

Common Pipeline Patterns

Pattern Description Use Case
Linear Simple chain of stages Log processing
Fan-out Split data to multiple consumers Load balancing
Fan-in Merge results from multiple sources Aggregation
Worker Pool Parallel processing in stage CPU-heavy tasks
Buffered Add buffers between stages Rate matching

Thank you

Pipeline patterns transform complex data processing into elegant, concurrent flows. By breaking work into stages connected by channels, you leverage Go’s concurrency primitives to build scalable, maintainable systems. It’s the Unix philosophy brought to life with goroutines.

Please drop an email at [email protected] if you would like to share any feedback or suggestions. Peace!