Go Concurrency Patterns Series: ← Select Statement | Series Overview | Fan-Out/Fan-In →


What is the Pipeline Pattern?

The Pipeline pattern is a powerful way to structure concurrent data processing by breaking work into stages connected by channels. Each stage runs in its own goroutine, receives data from an input channel, processes it, and sends results to an output channel. This creates a chain of processing stages that can run concurrently, dramatically improving throughput.

Think of it like an assembly line in a factory - each worker (stage) performs a specific task and passes the work to the next worker. While one worker processes item N, the previous worker can already be working on item N+1, creating parallel processing across the entire pipeline.

The Problem: Sequential Processing Bottlenecks

Without pipelines, data processing is often sequential and inefficient:

// ❌ SEQUENTIAL: Inefficient processing
func sequentialProcessing(data []string) []string {
    var results []string
    
    for _, item := range data {
        // Stage 1: Validate
        if !validate(item) {
            continue
        }
        
        // Stage 2: Transform
        transformed := transform(item)
        
        // Stage 3: Enrich
        enriched := enrich(transformed)
        
        // Stage 4: Format
        formatted := format(enriched)
        
        results = append(results, formatted)
    }
    
    return results
}

This approach processes one item completely before starting the next, wasting CPU cycles and creating bottlenecks.

Basic Pipeline Implementation

Here’s how to implement a concurrent pipeline:

package main

import (
    "fmt"
    "strings"
    "time"
)

// Stage 1: Generate data
func generator(data []string) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        for _, item := range data {
            out <- item
        }
    }()
    
    return out
}

// Stage 2: Validate data
func validator(in <-chan string) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        for item := range in {
            // Simulate validation work
            time.Sleep(50 * time.Millisecond)
            
            if len(item) > 0 && !strings.Contains(item, "invalid") {
                out <- item
            }
        }
    }()
    
    return out
}

// Stage 3: Transform data
func transformer(in <-chan string) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        for item := range in {
            // Simulate transformation work
            time.Sleep(100 * time.Millisecond)
            
            transformed := strings.ToUpper(item)
            out <- transformed
        }
    }()
    
    return out
}

// Stage 4: Format data
func formatter(in <-chan string) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        for item := range in {
            // Simulate formatting work
            time.Sleep(75 * time.Millisecond)
            
            formatted := fmt.Sprintf("[PROCESSED: %s]", item)
            out <- formatted
        }
    }()
    
    return out
}

func basicPipeline() {
    // Input data
    data := []string{
        "hello", "world", "invalid", "golang", 
        "pipeline", "concurrent", "processing",
    }
    
    fmt.Println("🚀 Starting pipeline processing...")
    start := time.Now()
    
    // Build the pipeline
    stage1 := generator(data)
    stage2 := validator(stage1)
    stage3 := transformer(stage2)
    stage4 := formatter(stage3)
    
    // Consume results
    var results []string
    for result := range stage4 {
        results = append(results, result)
        fmt.Printf("✅ %s\n", result)
    }
    
    duration := time.Since(start)
    fmt.Printf("\n📊 Processed %d items in %v\n", len(results), duration)
    fmt.Printf("📈 Average time per item: %v\n", duration/time.Duration(len(results)))
}

Real-World Example: Image Processing Pipeline

Let’s build a more sophisticated pipeline for processing images:

package main

import (
    "crypto/md5"
    "fmt"
    "image"
    "image/jpeg"
    "image/png"
    "os"
    "path/filepath"
    "strings"
    "sync"
    "time"
)

type ImageJob struct {
    ID       int
    Filename string
    Path     string
}

type ProcessedImage struct {
    Job      ImageJob
    Image    image.Image
    Metadata map[string]interface{}
    Error    error
    Duration time.Duration
}

type ImagePipeline struct {
    maxWorkers int
    bufferSize int
}

func NewImagePipeline(maxWorkers, bufferSize int) *ImagePipeline {
    return &ImagePipeline{
        maxWorkers: maxWorkers,
        bufferSize: bufferSize,
    }
}

// Stage 1: File Discovery
func (ip *ImagePipeline) fileDiscovery(directories []string) <-chan ImageJob {
    jobs := make(chan ImageJob, ip.bufferSize)
    
    go func() {
        defer close(jobs)
        
        jobID := 1
        for _, dir := range directories {
            err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
                if err != nil {
                    return nil // Skip errors
                }
                
                if !info.IsDir() && isImageFile(path) {
                    jobs <- ImageJob{
                        ID:       jobID,
                        Filename: info.Name(),
                        Path:     path,
                    }
                    jobID++
                }
                return nil
            })
            
            if err != nil {
                fmt.Printf("Error walking directory %s: %v\n", dir, err)
            }
        }
    }()
    
    return jobs
}

// Stage 2: Image Loading
func (ip *ImagePipeline) imageLoader(jobs <-chan ImageJob) <-chan ProcessedImage {
    results := make(chan ProcessedImage, ip.bufferSize)
    
    var wg sync.WaitGroup
    for i := 0; i < ip.maxWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for job := range jobs {
                start := time.Now()
                result := ProcessedImage{
                    Job:      job,
                    Metadata: make(map[string]interface{}),
                    Duration: 0,
                }
                
                // Load image
                file, err := os.Open(job.Path)
                if err != nil {
                    result.Error = fmt.Errorf("failed to open file: %v", err)
                    result.Duration = time.Since(start)
                    results <- result
                    continue
                }
                
                img, format, err := image.Decode(file)
                file.Close()
                
                if err != nil {
                    result.Error = fmt.Errorf("failed to decode image: %v", err)
                } else {
                    result.Image = img
                    result.Metadata["format"] = format
                    result.Metadata["dimensions"] = fmt.Sprintf("%dx%d", 
                        img.Bounds().Dx(), img.Bounds().Dy())
                }
                
                result.Duration = time.Since(start)
                results <- result
            }
        }(i)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    return results
}

// Stage 3: Image Analysis
func (ip *ImagePipeline) imageAnalyzer(images <-chan ProcessedImage) <-chan ProcessedImage {
    results := make(chan ProcessedImage, ip.bufferSize)
    
    var wg sync.WaitGroup
    for i := 0; i < ip.maxWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for processed := range images {
                if processed.Error != nil {
                    results <- processed
                    continue
                }
                
                start := time.Now()
                
                // Analyze image properties
                bounds := processed.Image.Bounds()
                width := bounds.Dx()
                height := bounds.Dy()
                
                processed.Metadata["width"] = width
                processed.Metadata["height"] = height
                processed.Metadata["aspect_ratio"] = float64(width) / float64(height)
                processed.Metadata["pixels"] = width * height
                
                // Calculate average color (simplified)
                processed.Metadata["avg_color"] = calculateAverageColor(processed.Image)
                
                // Generate hash for duplicate detection
                processed.Metadata["hash"] = generateImageHash(processed.Image)
                
                processed.Duration += time.Since(start)
                results <- processed
            }
        }(i)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    return results
}

// Stage 4: Result Aggregation
func (ip *ImagePipeline) resultAggregator(processed <-chan ProcessedImage) <-chan []ProcessedImage {
    results := make(chan []ProcessedImage, 1)
    
    go func() {
        defer close(results)
        
        var allResults []ProcessedImage
        duplicates := make(map[string][]ProcessedImage)
        
        for result := range processed {
            allResults = append(allResults, result)
            
            if result.Error == nil {
                if hash, ok := result.Metadata["hash"].(string); ok {
                    duplicates[hash] = append(duplicates[hash], result)
                }
            }
        }
        
        // Mark duplicates
        for hash, images := range duplicates {
            if len(images) > 1 {
                for i := range images {
                    for j := range allResults {
                        if allResults[j].Job.ID == images[i].Job.ID {
                            allResults[j].Metadata["is_duplicate"] = true
                            allResults[j].Metadata["duplicate_group"] = hash
                            break
                        }
                    }
                }
            }
        }
        
        results <- allResults
    }()
    
    return results
}

func (ip *ImagePipeline) Process(directories []string) []ProcessedImage {
    // Build the pipeline
    jobs := ip.fileDiscovery(directories)
    loaded := ip.imageLoader(jobs)
    analyzed := ip.imageAnalyzer(loaded)
    aggregated := ip.resultAggregator(analyzed)
    
    // Get final results
    results := <-aggregated
    return results
}

// Helper functions
func isImageFile(filename string) bool {
    ext := strings.ToLower(filepath.Ext(filename))
    return ext == ".jpg" || ext == ".jpeg" || ext == ".png" || ext == ".gif"
}

func calculateAverageColor(img image.Image) string {
    bounds := img.Bounds()
    var r, g, b uint32
    pixelCount := 0
    
    // Sample every 10th pixel for performance
    for y := bounds.Min.Y; y < bounds.Max.Y; y += 10 {
        for x := bounds.Min.X; x < bounds.Max.X; x += 10 {
            pr, pg, pb, _ := img.At(x, y).RGBA()
            r += pr >> 8
            g += pg >> 8
            b += pb >> 8
            pixelCount++
        }
    }
    
    if pixelCount == 0 {
        return "#000000"
    }
    
    return fmt.Sprintf("#%02x%02x%02x", 
        r/uint32(pixelCount), 
        g/uint32(pixelCount), 
        b/uint32(pixelCount))
}

func generateImageHash(img image.Image) string {
    // Simple hash based on image dimensions and sample pixels
    bounds := img.Bounds()
    hash := md5.New()
    
    // Include dimensions
    hash.Write([]byte(fmt.Sprintf("%dx%d", bounds.Dx(), bounds.Dy())))
    
    // Sample a few pixels
    for y := bounds.Min.Y; y < bounds.Max.Y; y += bounds.Dy() / 8 {
        for x := bounds.Min.X; x < bounds.Max.X; x += bounds.Dx() / 8 {
            r, g, b, a := img.At(x, y).RGBA()
            hash.Write([]byte{byte(r >> 8), byte(g >> 8), byte(b >> 8), byte(a >> 8)})
        }
    }
    
    return fmt.Sprintf("%x", hash.Sum(nil))[:16]
}

func createSampleImages() {
    // Create sample directory and images for demonstration
    os.MkdirAll("sample_images", 0755)
    
    // This would normally create actual image files
    // For demo purposes, we'll just create empty files with image extensions
    sampleFiles := []string{
        "sample_images/photo1.jpg",
        "sample_images/photo2.png", 
        "sample_images/photo3.jpg",
        "sample_images/duplicate.jpg",
    }
    
    for _, filename := range sampleFiles {
        file, _ := os.Create(filename)
        file.Close()
    }
}

func cleanupSampleImages() {
    os.RemoveAll("sample_images")
}

func main() {
    // Create sample images for demonstration
    createSampleImages()
    defer cleanupSampleImages()
    
    pipeline := NewImagePipeline(3, 10)
    
    fmt.Println("🖼️  Starting image processing pipeline...")
    start := time.Now()
    
    results := pipeline.Process([]string{"sample_images"})
    
    totalDuration := time.Since(start)
    
    // Display results
    fmt.Printf("\n📊 Pipeline Results (completed in %v):\n", totalDuration)
    fmt.Println(strings.Repeat("-", 60))
    
    successCount := 0
    errorCount := 0
    duplicateCount := 0
    
    for _, result := range results {
        if result.Error != nil {
            fmt.Printf("❌ %s: %v (processed in %v)\n", 
                result.Job.Filename, result.Error, result.Duration)
            errorCount++
        } else {
            isDuplicate := result.Metadata["is_duplicate"] == true
            if isDuplicate {
                duplicateCount++
            }
            
            fmt.Printf("✅ %s: %s, %s, avg_color: %s%s (processed in %v)\n",
                result.Job.Filename,
                result.Metadata["format"],
                result.Metadata["dimensions"],
                result.Metadata["avg_color"],
                func() string {
                    if isDuplicate {
                        return " [DUPLICATE]"
                    }
                    return ""
                }(),
                result.Duration)
            successCount++
        }
    }
    
    fmt.Println(strings.Repeat("-", 60))
    fmt.Printf("📈 Summary: %d successful, %d errors, %d duplicates\n", 
        successCount, errorCount, duplicateCount)
    fmt.Printf("⚡ Average processing time: %v per image\n", 
        totalDuration/time.Duration(len(results)))
}

Pipeline Patterns and Variations

1. Buffered Pipeline for Throughput

func bufferedPipeline() {
    // Create pipeline with different buffer sizes for optimization
    stage1 := func(input []int) <-chan int {
        out := make(chan int, 10) // Large buffer for fast producer
        go func() {
            defer close(out)
            for _, val := range input {
                out <- val
            }
        }()
        return out
    }
    
    stage2 := func(in <-chan int) <-chan int {
        out := make(chan int, 5) // Medium buffer
        go func() {
            defer close(out)
            for val := range in {
                time.Sleep(10 * time.Millisecond) // Simulate work
                out <- val * 2
            }
        }()
        return out
    }
    
    stage3 := func(in <-chan int) <-chan int {
        out := make(chan int, 1) // Small buffer for final stage
        go func() {
            defer close(out)
            for val := range in {
                time.Sleep(5 * time.Millisecond) // Simulate work
                out <- val + 1
            }
        }()
        return out
    }
    
    // Run pipeline
    input := make([]int, 20)
    for i := range input {
        input[i] = i + 1
    }
    
    start := time.Now()
    results := stage3(stage2(stage1(input)))
    
    var output []int
    for result := range results {
        output = append(output, result)
    }
    
    fmt.Printf("Buffered pipeline processed %d items in %v\n", 
        len(output), time.Since(start))
}

2. Error Handling in Pipelines

type Result struct {
    Value int
    Error error
}

func errorHandlingPipeline() {
    // Stage that might fail
    riskyStage := func(in <-chan int) <-chan Result {
        out := make(chan Result)
        go func() {
            defer close(out)
            for val := range in {
                time.Sleep(10 * time.Millisecond)
                
                if val%7 == 0 {
                    out <- Result{Error: fmt.Errorf("unlucky number: %d", val)}
                } else {
                    out <- Result{Value: val * 2}
                }
            }
        }()
        return out
    }
    
    // Error filtering stage
    errorFilter := func(in <-chan Result) (<-chan int, <-chan error) {
        values := make(chan int)
        errors := make(chan error)
        
        go func() {
            defer close(values)
            defer close(errors)
            
            for result := range in {
                if result.Error != nil {
                    errors <- result.Error
                } else {
                    values <- result.Value
                }
            }
        }()
        
        return values, errors
    }
    
    // Input generator
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 20; i++ {
            input <- i
        }
    }()
    
    // Build pipeline
    results := riskyStage(input)
    values, errors := errorFilter(results)
    
    // Collect results and errors
    var successValues []int
    var errorList []error
    
    done := make(chan bool, 2)
    
    // Collect successful values
    go func() {
        for val := range values {
            successValues = append(successValues, val)
        }
        done <- true
    }()
    
    // Collect errors
    go func() {
        for err := range errors {
            errorList = append(errorList, err)
        }
        done <- true
    }()
    
    // Wait for both collectors
    <-done
    <-done
    
    fmt.Printf("Pipeline completed: %d successes, %d errors\n", 
        len(successValues), len(errorList))
    
    for _, err := range errorList {
        fmt.Printf("Error: %v\n", err)
    }
}

3. Cancellable Pipeline with Context

func cancellablePipeline(ctx context.Context) {
    // Context-aware stage
    contextStage := func(ctx context.Context, in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for {
                select {
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    
                    // Simulate work
                    select {
                    case <-time.After(100 * time.Millisecond):
                        select {
                        case out <- val * 2:
                        case <-ctx.Done():
                            return
                        }
                    case <-ctx.Done():
                        return
                    }
                    
                case <-ctx.Done():
                    return
                }
            }
        }()
        return out
    }
    
    // Input generator
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 100; i++ {
            select {
            case input <- i:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // Build pipeline
    stage1 := contextStage(ctx, input)
    stage2 := contextStage(ctx, stage1)
    
    // Consume with cancellation
    count := 0
    for {
        select {
        case val, ok := <-stage2:
            if !ok {
                fmt.Printf("Pipeline completed normally, processed %d items\n", count)
                return
            }
            count++
            fmt.Printf("Processed: %d\n", val)
            
        case <-ctx.Done():
            fmt.Printf("Pipeline cancelled after processing %d items\n", count)
            return
        }
    }
}

func demonstrateCancellation() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    fmt.Println("Starting cancellable pipeline with 2-second timeout...")
    cancellablePipeline(ctx)
}

Pipeline Performance Optimization

1. Benchmarking Pipeline Stages

func benchmarkPipeline() {
    measureStage := func(name string, stage func(<-chan int) <-chan int, input <-chan int) <-chan int {
        start := time.Now()
        output := stage(input)
        
        // Consume all output to measure complete stage time
        var results []int
        for val := range output {
            results = append(results, val)
        }
        
        duration := time.Since(start)
        fmt.Printf("Stage %s: processed %d items in %v (%.2f items/sec)\n", 
            name, len(results), duration, 
            float64(len(results))/duration.Seconds())
        
        // Return results as new channel for next stage
        out := make(chan int, len(results))
        go func() {
            defer close(out)
            for _, val := range results {
                out <- val
            }
        }()
        return out
    }
    
    // Define stages
    fastStage := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for val := range in {
                out <- val * 2 // Fast operation
            }
        }()
        return out
    }
    
    slowStage := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for val := range in {
                time.Sleep(1 * time.Millisecond) // Slow operation
                out <- val + 1
            }
        }()
        return out
    }
    
    // Create input
    input := make(chan int)
    go func() {
        defer close(input)
        for i := 1; i <= 1000; i++ {
            input <- i
        }
    }()
    
    // Benchmark each stage
    fmt.Println("Pipeline stage benchmarks:")
    stage1Out := measureStage("Fast", fastStage, input)
    stage2Out := measureStage("Slow", slowStage, stage1Out)
    
    // Final consumption
    count := 0
    for range stage2Out {
        count++
    }
    fmt.Printf("Final output: %d items\n", count)
}

2. Dynamic Pipeline Scaling

type ScalablePipeline struct {
    workerCount int
    bufferSize  int
}

func (sp *ScalablePipeline) adaptiveStage(in <-chan int) <-chan int {
    out := make(chan int, sp.bufferSize)
    
    var wg sync.WaitGroup
    for i := 0; i < sp.workerCount; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for val := range in {
                // Simulate variable processing time
                processingTime := time.Duration(val%10) * time.Millisecond
                time.Sleep(processingTime)
                out <- val * 2
            }
        }(i)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func scalablePipelineDemo() {
    // Test different scaling configurations
    configs := []struct {
        workers int
        buffer  int
    }{
        {1, 10},
        {2, 10},
        {4, 10},
        {8, 10},
    }
    
    for _, config := range configs {
        pipeline := &ScalablePipeline{
            workerCount: config.workers,
            bufferSize:  config.buffer,
        }
        
        // Create input
        input := make(chan int, 100)
        go func() {
            defer close(input)
            for i := 1; i <= 100; i++ {
                input <- i
            }
        }()
        
        start := time.Now()
        output := pipeline.adaptiveStage(input)
        
        count := 0
        for range output {
            count++
        }
        
        duration := time.Since(start)
        fmt.Printf("Workers: %d, Buffer: %d -> %d items in %v (%.2f items/sec)\n",
            config.workers, config.buffer, count, duration,
            float64(count)/duration.Seconds())
    }
}

Best Practices

1. Pipeline Design Guidelines

// ✅ GOOD: Well-designed pipeline stage
func goodPipelineStage(in <-chan Input) <-chan Output {
    out := make(chan Output, bufferSize) // Appropriate buffer
    
    go func() {
        defer close(out) // Always close output channel
        
        for input := range in { // Use range for clean channel reading
            // Process input
            result := processInput(input)
            
            // Handle backpressure
            select {
            case out <- result:
                // Successfully sent
            case <-time.After(timeout):
                // Handle slow consumers
                log.Printf("Warning: slow consumer detected")
                out <- result // Still try to send
            }
        }
    }()
    
    return out
}

2. Resource Management

func resourceManagedPipeline() {
    // Use sync.Pool for expensive objects
    var bufferPool = sync.Pool{
        New: func() interface{} {
            return make([]byte, 1024)
        },
    }
    
    processStage := func(in <-chan []byte) <-chan []byte {
        out := make(chan []byte)
        
        go func() {
            defer close(out)
            
            for data := range in {
                // Get buffer from pool
                buffer := bufferPool.Get().([]byte)
                
                // Process data using buffer
                processed := processWithBuffer(data, buffer)
                
                // Return buffer to pool
                bufferPool.Put(buffer)
                
                out <- processed
            }
        }()
        
        return out
    }
    
    // Use the stage...
}

func processWithBuffer(data, buffer []byte) []byte {
    // Simulate processing that uses the buffer
    copy(buffer, data)
    return buffer[:len(data)]
}

Testing Pipeline Code

func TestPipelineStage(t *testing.T) {
    // Create test input
    input := make(chan int, 5)
    expected := []int{2, 4, 6, 8, 10}
    
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
        }
    }()
    
    // Test the stage
    doubler := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for val := range in {
                out <- val * 2
            }
        }()
        return out
    }
    
    output := doubler(input)
    
    // Collect results
    var results []int
    for val := range output {
        results = append(results, val)
    }
    
    // Verify results
    if !reflect.DeepEqual(results, expected) {
        t.Errorf("Expected %v, got %v", expected, results)
    }
}

func TestPipelineTimeout(t *testing.T) {
    input := make(chan int)
    
    slowStage := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for val := range in {
                time.Sleep(100 * time.Millisecond) // Slow processing
                out <- val
            }
        }()
        return out
    }
    
    go func() {
        input <- 1
        close(input)
    }()
    
    output := slowStage(input)
    
    // Test with timeout
    select {
    case result := <-output:
        if result != 1 {
            t.Errorf("Expected 1, got %d", result)
        }
    case <-time.After(200 * time.Millisecond):
        // Should complete within timeout
    }
}

Conclusion

The Pipeline pattern provides:

  • Concurrent processing across multiple stages
  • Improved throughput through parallelism
  • Modular design with composable stages
  • Efficient resource utilization with proper buffering

Key Takeaways

  1. Break work into stages - each stage should have a single responsibility
  2. Use appropriate buffer sizes - balance memory usage with throughput
  3. Handle errors gracefully - don’t let one failure stop the entire pipeline
  4. Support cancellation - use context for clean shutdown
  5. Monitor performance - identify and optimize bottleneck stages

Pipeline Use Cases

  • Data processing: ETL operations, log analysis, stream processing
  • Image/video processing: Multi-stage transformations and filters
  • Network operations: Request processing, data validation, response formatting
  • File operations: Reading, transforming, and writing large datasets

What’s Next?

Now that you understand pipelines, the next pattern builds upon this foundation: Fan-Out/Fan-In. This pattern allows you to distribute work across multiple workers and then collect results, providing even more parallelism and scalability.

In the next post, we’ll explore Fan-Out/Fan-In and learn how to scale pipeline stages horizontally for maximum performance.


This post is part of the Go Concurrency Patterns series. The Pipeline pattern is fundamental to many data processing architectures and forms the basis for more advanced patterns.