{{series_nav(current_post=9)}}

Redis Streams is a powerful data structure that enables log-style data storage and messaging. Introduced in Redis 5.0, it combines the simplicity of Redis with sophisticated stream processing capabilities, making it ideal for building real-time data pipelines, activity feeds, and event sourcing systems without the operational overhead of dedicated streaming platforms.

What are Redis Streams?

Redis Streams is an append-only log data structure that allows producers to add entries and multiple consumers to read from them. It supports consumer groups for distributed processing, automatic ID generation, and a pending entry list (PEL) for tracking message acknowledgments.

Core Concepts

Stream: An ordered sequence of entries, each with a unique ID and field-value pairs.

Entry ID: Auto-generated timestamp-based identifier (e.g., 1609459200000-0), ensuring ordering.

Consumer Groups: Allow multiple consumers to cooperatively process stream entries, similar to Kafka consumer groups.

Pending Entry List (PEL): Tracks messages delivered but not acknowledged, enabling retry mechanisms.

XREAD: Read entries from streams (simple consumer).

XREADGROUP: Read as part of a consumer group (distributed processing).

Architecture

graph TB subgraph "Producers" P1[Producer 1] P2[Producer 2] P3[Producer 3] end subgraph "Redis Server" subgraph "Streams" S1["Stream: events
1-0: {user: alice, action: login}
2-0: {user: bob, action: purchase}
3-0: {user: alice, action: logout}"] S2["Stream: metrics
1-0: {cpu: 45, mem: 2048}
2-0: {cpu: 52, mem: 2156}"] end subgraph "Consumer Group: processors" CG1["Consumer: worker-1
PEL: [2-0]
Last ID: 2-0"] CG2["Consumer: worker-2
PEL: []
Last ID: 1-0"] end subgraph "Consumer Group: analytics" CG3["Consumer: analyzer
PEL: []
Last ID: 3-0"] end end subgraph "Consumers" C1[Worker 1] C2[Worker 2] C3[Analyzer] end P1 -->|XADD events| S1 P2 -->|XADD events| S1 P3 -->|XADD metrics| S2 S1 -->|XREADGROUP processors| CG1 S1 -->|XREADGROUP processors| CG2 S1 -->|XREADGROUP analytics| CG3 CG1 --> C1 CG2 --> C2 CG3 --> C3 C1 -.->|XACK| S1 C2 -.->|XACK| S1 C3 -.->|XACK| S1

Message Flow

sequenceDiagram participant P as Producer participant S as Redis Stream participant CG as Consumer Group participant C1 as Consumer 1 participant C2 as Consumer 2 Note over P,C2: Stream Creation & Publishing P->>S: XADD events * user alice action login S->>P: Return ID: 1609459200000-0 P->>S: XADD events * user bob action purchase S->>P: Return ID: 1609459201000-0 Note over CG,C2: Consumer Group Setup C1->>S: XGROUP CREATE events processors 0 S->>C1: OK C1->>S: XREADGROUP GROUP processors worker1 COUNT 1 S->>C1: Entry: 1609459200000-0 Note over CG: Entry added to PEL (worker1) C2->>S: XREADGROUP GROUP processors worker2 COUNT 1 S->>C2: Entry: 1609459201000-0 Note over CG: Entry added to PEL (worker2) Note over C1,C2: Processing & Acknowledgment C1->>C1: Process entry C1->>S: XACK events processors 1609459200000-0 Note over CG: Entry removed from PEL (worker1) Note over C2: Failure (no ACK) Note over CG: Entry remains in PEL (worker2) Note over C1: Claim abandoned message C1->>S: XCLAIM events processors worker1 3600000 1609459201000-0 S->>C1: Entry: 1609459201000-0 C1->>C1: Process entry C1->>S: XACK events processors 1609459201000-0

Real-World Use Cases

1. Activity Feeds

Build social media-style activity feeds with real-time updates and pagination.

Perfect for: Social networks, collaboration tools, notification systems

2. Event Sourcing

Store domain events as a stream, enabling event replay and temporal queries.

Perfect for: CQRS systems, audit logs, state reconstruction

3. Real-time Analytics

Process streaming data for dashboards, metrics, and monitoring.

Perfect for: Application telemetry, user behavior tracking, IoT sensors

4. Message Queues

Lightweight message queue with consumer groups and delivery guarantees.

Perfect for: Task distribution, background jobs, async processing

5. Time-Series Data

Store and query time-ordered data efficiently.

Perfect for: Metrics collection, sensor data, financial ticks

6. Chat and Messaging

Build chat systems with message history and presence tracking.

Perfect for: Chat applications, collaboration tools, customer support

7. Change Data Capture (CDC)

Capture and stream database changes to downstream consumers.

Perfect for: Cache invalidation, search indexing, data replication

8. Rate Limiting and Throttling

Track API requests over time windows for rate limiting.

Perfect for: API gateways, DDoS protection, quota management

Project Structure

redis-streams-service/
├── cmd/
│   ├── producer/
│   │   └── main.go           # Producer application
│   └── consumer/
│       └── main.go           # Consumer application
├── internal/
│   ├── redis/
│   │   ├── client.go         # Redis client
│   │   └── streams.go        # Stream utilities
│   ├── producer/
│   │   ├── producer.go       # Stream producer
│   │   └── batch.go          # Batch publishing
│   ├── consumer/
│   │   ├── consumer.go       # Stream consumer
│   │   ├── group.go          # Consumer group
│   │   └── handler.go        # Message handler
│   ├── models/
│   │   └── event.go          # Event models
│   └── config/
│       └── config.go         # Configuration
├── pkg/
│   ├── patterns/
│   │   ├── fanout.go         # Fan-out pattern
│   │   ├── pipeline.go       # Pipeline pattern
│   │   └── cdc.go            # CDC pattern
│   └── monitoring/
│       ├── metrics.go        # Prometheus metrics
│       └── health.go         # Health checks
├── deployments/
│   └── docker-compose.yml    # Redis setup
├── go.mod
└── go.sum

Implementation

1. Event Models

// internal/models/event.go
package models

import (
    "encoding/json"
    "time"

    "github.com/google/uuid"
)

// Event represents a stream event
type Event struct {
    ID        string                 `json:"id"`
    Type      string                 `json:"type"`
    Timestamp time.Time              `json:"timestamp"`
    Data      map[string]interface{} `json:"data"`
    Metadata  map[string]string      `json:"metadata"`
}

// NewEvent creates a new event
func NewEvent(eventType string) *Event {
    return &Event{
        ID:        uuid.New().String(),
        Type:      eventType,
        Timestamp: time.Now(),
        Data:      make(map[string]interface{}),
        Metadata:  make(map[string]string),
    }
}

// ToFields converts event to Redis fields
func (e *Event) ToFields() map[string]interface{} {
    data, _ := json.Marshal(e.Data)
    metadata, _ := json.Marshal(e.Metadata)

    return map[string]interface{}{
        "id":        e.ID,
        "type":      e.Type,
        "timestamp": e.Timestamp.Unix(),
        "data":      string(data),
        "metadata":  string(metadata),
    }
}

// FromFields constructs event from Redis fields
func FromFields(fields map[string]interface{}) (*Event, error) {
    event := &Event{
        Data:     make(map[string]interface{}),
        Metadata: make(map[string]string),
    }

    if id, ok := fields["id"].(string); ok {
        event.ID = id
    }

    if typ, ok := fields["type"].(string); ok {
        event.Type = typ
    }

    if ts, ok := fields["timestamp"].(string); ok {
        var timestamp int64
        fmt.Sscanf(ts, "%d", &timestamp)
        event.Timestamp = time.Unix(timestamp, 0)
    }

    if data, ok := fields["data"].(string); ok {
        json.Unmarshal([]byte(data), &event.Data)
    }

    if metadata, ok := fields["metadata"].(string); ok {
        json.Unmarshal([]byte(metadata), &event.Metadata)
    }

    return event, nil
}

// ActivityEvent represents a user activity
type ActivityEvent struct {
    UserID     string    `json:"user_id"`
    Action     string    `json:"action"`
    ResourceID string    `json:"resource_id"`
    Details    string    `json:"details"`
    Timestamp  time.Time `json:"timestamp"`
}

2. Redis Client

// internal/redis/client.go
package redis

import (
    "context"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

// Client wraps Redis client with streams support
type Client struct {
    rdb *redis.Client
}

// Config holds Redis configuration
type Config struct {
    Addr         string
    Password     string
    DB           int
    MaxRetries   int
    DialTimeout  time.Duration
    ReadTimeout  time.Duration
    WriteTimeout time.Duration
    PoolSize     int
}

// NewClient creates a new Redis client
func NewClient(cfg Config) (*Client, error) {
    rdb := redis.NewClient(&redis.Options{
        Addr:         cfg.Addr,
        Password:     cfg.Password,
        DB:           cfg.DB,
        MaxRetries:   cfg.MaxRetries,
        DialTimeout:  cfg.DialTimeout,
        ReadTimeout:  cfg.ReadTimeout,
        WriteTimeout: cfg.WriteTimeout,
        PoolSize:     cfg.PoolSize,
    })

    // Test connection
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := rdb.Ping(ctx).Err(); err != nil {
        return nil, fmt.Errorf("ping: %w", err)
    }

    return &Client{rdb: rdb}, nil
}

// GetClient returns the underlying Redis client
func (c *Client) GetClient() *redis.Client {
    return c.rdb
}

// Close closes the Redis connection
func (c *Client) Close() error {
    return c.rdb.Close()
}

// StreamInfo returns information about a stream
func (c *Client) StreamInfo(ctx context.Context, stream string) (*redis.XInfoStream, error) {
    return c.rdb.XInfoStream(ctx, stream).Result()
}

// StreamLength returns the number of entries in a stream
func (c *Client) StreamLength(ctx context.Context, stream string) (int64, error) {
    return c.rdb.XLen(ctx, stream).Result()
}

// TrimStream trims stream to maximum length
func (c *Client) TrimStream(ctx context.Context, stream string, maxLen int64) (int64, error) {
    return c.rdb.XTrimMaxLen(ctx, stream, maxLen).Result()
}

// TrimStreamByTime trims entries older than minID
func (c *Client) TrimStreamByTime(ctx context.Context, stream, minID string) (int64, error) {
    return c.rdb.XTrimMinID(ctx, stream, minID).Result()
}

3. Producer Implementation

// internal/producer/producer.go
package producer

import (
    "context"
    "fmt"
    "log"

    "github.com/redis/go-redis/v9"
    redisclient "redis-streams-service/internal/redis"
    "redis-streams-service/internal/models"
)

// Producer publishes events to Redis Streams
type Producer struct {
    client *redisclient.Client
    stream string
}

// Config holds producer configuration
type Config struct {
    Stream string
    MaxLen int64 // Max stream length (0 = unlimited)
}

// NewProducer creates a new producer
func NewProducer(client *redisclient.Client, cfg Config) *Producer {
    return &Producer{
        client: client,
        stream: cfg.Stream,
    }
}

// Publish publishes an event to the stream
func (p *Producer) Publish(ctx context.Context, event *models.Event) (string, error) {
    fields := event.ToFields()

    id, err := p.client.GetClient().XAdd(ctx, &redis.XAddArgs{
        Stream: p.stream,
        ID:     "*", // Auto-generate ID
        Values: fields,
    }).Result()

    if err != nil {
        return "", fmt.Errorf("xadd: %w", err)
    }

    log.Printf("Published event: stream=%s, id=%s, type=%s", p.stream, id, event.Type)
    return id, nil
}

// PublishWithID publishes an event with specific ID
func (p *Producer) PublishWithID(ctx context.Context, id string, event *models.Event) error {
    fields := event.ToFields()

    _, err := p.client.GetClient().XAdd(ctx, &redis.XAddArgs{
        Stream: p.stream,
        ID:     id,
        Values: fields,
    }).Result()

    if err != nil {
        return fmt.Errorf("xadd: %w", err)
    }

    return nil
}

// PublishBatch publishes multiple events
func (p *Producer) PublishBatch(ctx context.Context, events []*models.Event) ([]string, error) {
    pipe := p.client.GetClient().Pipeline()

    for _, event := range events {
        fields := event.ToFields()
        pipe.XAdd(ctx, &redis.XAddArgs{
            Stream: p.stream,
            Values: fields,
        })
    }

    cmds, err := pipe.Exec(ctx)
    if err != nil {
        return nil, fmt.Errorf("pipeline exec: %w", err)
    }

    ids := make([]string, len(cmds))
    for i, cmd := range cmds {
        if xAddCmd, ok := cmd.(*redis.StringCmd); ok {
            ids[i], _ = xAddCmd.Result()
        }
    }

    log.Printf("Published batch: stream=%s, count=%d", p.stream, len(events))
    return ids, nil
}

// PublishWithMaxLen publishes event and trims stream
func (p *Producer) PublishWithMaxLen(ctx context.Context, event *models.Event, maxLen int64) (string, error) {
    fields := event.ToFields()

    id, err := p.client.GetClient().XAdd(ctx, &redis.XAddArgs{
        Stream: p.stream,
        MaxLen: maxLen,
        Approx: true, // Use approximate trimming for performance
        Values: fields,
    }).Result()

    if err != nil {
        return "", fmt.Errorf("xadd: %w", err)
    }

    return id, nil
}

4. Consumer Implementation

// internal/consumer/consumer.go
package consumer

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
    redisclient "redis-streams-service/internal/redis"
    "redis-streams-service/internal/models"
)

// Consumer consumes events from Redis Streams
type Consumer struct {
    client   *redisclient.Client
    stream   string
    group    string
    consumer string
    handler  MessageHandler
}

// MessageHandler processes stream messages
type MessageHandler interface {
    Handle(ctx context.Context, event *models.Event, id string) error
}

// Config holds consumer configuration
type Config struct {
    Stream       string
    Group        string
    Consumer     string
    BlockTime    time.Duration
    Count        int64
    StartID      string // "0" for beginning, ">" for new messages
}

// NewConsumer creates a new consumer
func NewConsumer(client *redisclient.Client, cfg Config, handler MessageHandler) (*Consumer, error) {
    // Create consumer group if it doesn't exist
    err := client.GetClient().XGroupCreateMkStream(
        context.Background(),
        cfg.Stream,
        cfg.Group,
        "$", // Start from end
    ).Err()

    // Ignore error if group already exists
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        return nil, fmt.Errorf("create group: %w", err)
    }

    return &Consumer{
        client:   client,
        stream:   cfg.Stream,
        group:    cfg.Group,
        consumer: cfg.Consumer,
        handler:  handler,
    }, nil
}

// Start starts consuming messages
func (c *Consumer) Start(ctx context.Context, blockTime time.Duration, count int64) error {
    log.Printf("Starting consumer: stream=%s, group=%s, consumer=%s", c.stream, c.group, c.consumer)

    // First, claim any pending messages
    go c.claimPending(ctx, 30*time.Second)

    lastID := ">"

    for {
        select {
        case <-ctx.Done():
            log.Println("Consumer stopped")
            return ctx.Err()
        default:
            streams, err := c.client.GetClient().XReadGroup(ctx, &redis.XReadGroupArgs{
                Group:    c.group,
                Consumer: c.consumer,
                Streams:  []string{c.stream, lastID},
                Count:    count,
                Block:    blockTime,
            }).Result()

            if err != nil {
                if err == redis.Nil {
                    continue
                }
                log.Printf("XReadGroup error: %v", err)
                time.Sleep(time.Second)
                continue
            }

            for _, stream := range streams {
                for _, message := range stream.Messages {
                    if err := c.processMessage(ctx, message); err != nil {
                        log.Printf("Error processing message %s: %v", message.ID, err)
                        // Don't ACK on error - will be retried
                        continue
                    }

                    // Acknowledge message
                    if err := c.ack(ctx, message.ID); err != nil {
                        log.Printf("Error acknowledging message %s: %v", message.ID, err)
                    }
                }
            }
        }
    }
}

func (c *Consumer) processMessage(ctx context.Context, msg redis.XMessage) error {
    event, err := models.FromFields(msg.Values)
    if err != nil {
        return fmt.Errorf("parse event: %w", err)
    }

    log.Printf("Processing message: id=%s, type=%s", msg.ID, event.Type)

    return c.handler.Handle(ctx, event, msg.ID)
}

func (c *Consumer) ack(ctx context.Context, id string) error {
    return c.client.GetClient().XAck(ctx, c.stream, c.group, id).Err()
}

// claimPending claims pending messages that haven't been acknowledged
func (c *Consumer) claimPending(ctx context.Context, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Get pending messages
            pending, err := c.client.GetClient().XPendingExt(ctx, &redis.XPendingExtArgs{
                Stream: c.stream,
                Group:  c.group,
                Start:  "-",
                End:    "+",
                Count:  100,
            }).Result()

            if err != nil {
                log.Printf("Error fetching pending: %v", err)
                continue
            }

            // Claim messages idle for more than 1 minute
            idleTime := 60 * time.Second
            for _, msg := range pending {
                if msg.Idle >= idleTime {
                    claimed, err := c.client.GetClient().XClaim(ctx, &redis.XClaimArgs{
                        Stream:   c.stream,
                        Group:    c.group,
                        Consumer: c.consumer,
                        MinIdle:  idleTime,
                        Messages: []string{msg.ID},
                    }).Result()

                    if err != nil {
                        log.Printf("Error claiming message %s: %v", msg.ID, err)
                        continue
                    }

                    // Process claimed messages
                    for _, claimedMsg := range claimed {
                        if err := c.processMessage(ctx, claimedMsg); err != nil {
                            log.Printf("Error processing claimed message %s: %v", claimedMsg.ID, err)
                        } else {
                            c.ack(ctx, claimedMsg.ID)
                        }
                    }
                }
            }
        }
    }
}

// GetPending returns pending messages for this consumer
func (c *Consumer) GetPending(ctx context.Context) ([]redis.XPendingExt, error) {
    return c.client.GetClient().XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream:   c.stream,
        Group:    c.group,
        Consumer: c.consumer,
        Start:    "-",
        End:      "+",
        Count:    100,
    }).Result()
}

5. Message Handler

// internal/consumer/handler.go
package consumer

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "redis-streams-service/internal/models"
)

// ActivityHandler processes activity events
type ActivityHandler struct {
    processingTime time.Duration
}

func NewActivityHandler(processingTime time.Duration) *ActivityHandler {
    return &ActivityHandler{processingTime: processingTime}
}

// Handle processes an event
func (h *ActivityHandler) Handle(ctx context.Context, event *models.Event, id string) error {
    log.Printf("Handling event: type=%s, id=%s, stream_id=%s", event.Type, event.ID, id)

    switch event.Type {
    case "user.login":
        return h.handleUserLogin(ctx, event)
    case "user.purchase":
        return h.handleUserPurchase(ctx, event)
    case "user.logout":
        return h.handleUserLogout(ctx, event)
    default:
        log.Printf("Unknown event type: %s", event.Type)
        return nil
    }
}

func (h *ActivityHandler) handleUserLogin(ctx context.Context, event *models.Event) error {
    var activity models.ActivityEvent
    data, _ := json.Marshal(event.Data)
    if err := json.Unmarshal(data, &activity); err != nil {
        return fmt.Errorf("unmarshal activity: %w", err)
    }

    log.Printf("User logged in: user_id=%s", activity.UserID)

    // Simulate processing
    time.Sleep(h.processingTime)

    // Business logic:
    // - Update user session
    // - Track login analytics
    // - Send notifications

    return nil
}

func (h *ActivityHandler) handleUserPurchase(ctx context.Context, event *models.Event) error {
    log.Printf("User purchase: %s", event.ID)
    time.Sleep(h.processingTime)
    return nil
}

func (h *ActivityHandler) handleUserLogout(ctx context.Context, event *models.Event) error {
    log.Printf("User logout: %s", event.ID)
    time.Sleep(h.processingTime)
    return nil
}

// AggregatorHandler aggregates events
type AggregatorHandler struct {
    aggregates map[string]int
    mu         sync.Mutex
}

func NewAggregatorHandler() *AggregatorHandler {
    return &AggregatorHandler{
        aggregates: make(map[string]int),
    }
}

func (h *AggregatorHandler) Handle(ctx context.Context, event *models.Event, id string) error {
    h.mu.Lock()
    defer h.mu.Unlock()

    h.aggregates[event.Type]++

    log.Printf("Aggregate updated: %s = %d", event.Type, h.aggregates[event.Type])
    return nil
}

func (h *AggregatorHandler) GetAggregates() map[string]int {
    h.mu.Lock()
    defer h.mu.Unlock()

    result := make(map[string]int)
    for k, v := range h.aggregates {
        result[k] = v
    }

    return result
}

6. Producer Application

// cmd/producer/main.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "redis-streams-service/internal/models"
    "redis-streams-service/internal/producer"
    redisclient "redis-streams-service/internal/redis"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create Redis client
    client, err := redisclient.NewClient(redisclient.Config{
        Addr:         "localhost:6379",
        Password:     "",
        DB:           0,
        MaxRetries:   3,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
        PoolSize:     10,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create producer
    prod := producer.NewProducer(client, producer.Config{
        Stream: "activities",
        MaxLen: 10000,
    })

    // Start producing
    go produceActivities(ctx, prod)

    // Wait for shutdown
    <-sigChan
    log.Println("Shutting down producer...")
    cancel()
}

func produceActivities(ctx context.Context, prod *producer.Producer) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    actions := []string{"login", "purchase", "logout", "view", "search"}
    users := []string{"alice", "bob", "charlie", "david", "eve"}

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            event := models.NewEvent(fmt.Sprintf("user.%s", actions[time.Now().Unix()%int64(len(actions))]))

            activity := models.ActivityEvent{
                UserID:     users[time.Now().Unix()%int64(len(users))],
                Action:     actions[time.Now().Unix()%int64(len(actions))],
                ResourceID: fmt.Sprintf("res-%d", time.Now().Unix()%1000),
                Details:    "Activity details here",
                Timestamp:  time.Now(),
            }

            event.Data["activity"] = activity

            id, err := prod.Publish(ctx, event)
            if err != nil {
                log.Printf("Failed to publish: %v", err)
            } else {
                log.Printf("Published activity: id=%s, user=%s, action=%s",
                    id, activity.UserID, activity.Action)
            }
        }
    }
}

7. Consumer Application

// cmd/consumer/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "redis-streams-service/internal/consumer"
    redisclient "redis-streams-service/internal/redis"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create Redis client
    client, err := redisclient.NewClient(redisclient.Config{
        Addr:         "localhost:6379",
        Password:     "",
        DB:           0,
        MaxRetries:   3,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
        PoolSize:     10,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create handler
    handler := consumer.NewActivityHandler(100 * time.Millisecond)

    // Create consumer
    cons, err := consumer.NewConsumer(client, consumer.Config{
        Stream:   "activities",
        Group:    "processors",
        Consumer: "worker-1",
    }, handler)
    if err != nil {
        log.Fatal(err)
    }

    // Start consuming
    go func() {
        if err := cons.Start(ctx, 2*time.Second, 10); err != nil {
            if err != context.Canceled {
                log.Printf("Consumer error: %v", err)
            }
        }
    }()

    // Wait for shutdown
    <-sigChan
    log.Println("Shutting down consumer...")
    cancel()

    time.Sleep(2 * time.Second) // Grace period
}

Advanced Patterns

1. Range Queries

// Read stream range
func readRange(ctx context.Context, rdb *redis.Client, stream, start, end string) ([]redis.XMessage, error) {
    return rdb.XRange(ctx, stream, start, end).Result()
}

// Read last N messages
func readLastN(ctx context.Context, rdb *redis.Client, stream string, count int64) ([]redis.XMessage, error) {
    return rdb.XRevRangeN(ctx, stream, "+", "-", count).Result()
}

// Read by time range
func readByTimeRange(ctx context.Context, rdb *redis.Client, stream string, start, end time.Time) ([]redis.XMessage, error) {
    startID := fmt.Sprintf("%d-0", start.UnixMilli())
    endID := fmt.Sprintf("%d-0", end.UnixMilli())

    return rdb.XRange(ctx, stream, startID, endID).Result()
}

2. Fan-out Pattern

// Fan-out to multiple consumer groups
type FanOutProcessor struct {
    client *redisclient.Client
    stream string
    groups []string
}

func (f *FanOutProcessor) StartAll(ctx context.Context, handler consumer.MessageHandler) error {
    var wg sync.WaitGroup

    for _, group := range f.groups {
        wg.Add(1)

        go func(grp string) {
            defer wg.Done()

            cons, err := consumer.NewConsumer(f.client, consumer.Config{
                Stream:   f.stream,
                Group:    grp,
                Consumer: fmt.Sprintf("worker-%s", grp),
            }, handler)

            if err != nil {
                log.Printf("Error creating consumer for group %s: %v", grp, err)
                return
            }

            cons.Start(ctx, 2*time.Second, 10)
        }(group)
    }

    wg.Wait()
    return nil
}

3. Time-based Retention

// Trim stream by time (keep last 24 hours)
func trimByTime(ctx context.Context, rdb *redis.Client, stream string, retention time.Duration) error {
    cutoff := time.Now().Add(-retention)
    minID := fmt.Sprintf("%d-0", cutoff.UnixMilli())

    deleted, err := rdb.XTrimMinID(ctx, stream, minID).Result()
    if err != nil {
        return err
    }

    log.Printf("Trimmed %d entries from %s", deleted, stream)
    return nil
}

// Background trimming
func startBackgroundTrimming(ctx context.Context, rdb *redis.Client, stream string, retention time.Duration) {
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := trimByTime(ctx, rdb, stream, retention); err != nil {
                log.Printf("Trimming error: %v", err)
            }
        }
    }
}

4. Activity Feed Pattern

// ActivityFeed implements a user activity feed
type ActivityFeed struct {
    client *redisclient.Client
}

// AddActivity adds activity to user's feed
func (a *ActivityFeed) AddActivity(ctx context.Context, userID string, activity *models.ActivityEvent) error {
    stream := fmt.Sprintf("feed:%s", userID)

    event := models.NewEvent("activity")
    event.Data["activity"] = activity

    fields := event.ToFields()

    _, err := a.client.GetClient().XAdd(ctx, &redis.XAddArgs{
        Stream: stream,
        MaxLen: 1000, // Keep last 1000 activities
        Approx: true,
        Values: fields,
    }).Result()

    return err
}

// GetActivities retrieves user's recent activities
func (a *ActivityFeed) GetActivities(ctx context.Context, userID string, count int64) ([]*models.Event, error) {
    stream := fmt.Sprintf("feed:%s", userID)

    messages, err := a.client.GetClient().XRevRangeN(ctx, stream, "+", "-", count).Result()
    if err != nil {
        return nil, err
    }

    events := make([]*models.Event, len(messages))
    for i, msg := range messages {
        event, err := models.FromFields(msg.Values)
        if err != nil {
            continue
        }
        events[i] = event
    }

    return events, nil
}

// GetActivitiesPaginated retrieves activities with pagination
func (a *ActivityFeed) GetActivitiesPaginated(ctx context.Context, userID, cursor string, count int64) ([]*models.Event, string, error) {
    stream := fmt.Sprintf("feed:%s", userID)

    if cursor == "" {
        cursor = "+"
    }

    messages, err := a.client.GetClient().XRevRangeN(ctx, stream, cursor, "-", count+1).Result()
    if err != nil {
        return nil, "", err
    }

    hasMore := len(messages) > int(count)
    if hasMore {
        messages = messages[:count]
    }

    events := make([]*models.Event, len(messages))
    for i, msg := range messages {
        event, _ := models.FromFields(msg.Values)
        events[i] = event
    }

    nextCursor := ""
    if hasMore && len(messages) > 0 {
        nextCursor = messages[len(messages)-1].ID
    }

    return events, nextCursor, nil
}

5. Change Data Capture (CDC)

// CDC captures database changes to stream
type CDCPublisher struct {
    producer *producer.Producer
}

func (c *CDCPublisher) PublishChange(ctx context.Context, table, operation string, oldData, newData map[string]interface{}) error {
    event := models.NewEvent(fmt.Sprintf("cdc.%s.%s", table, operation))

    event.Data["table"] = table
    event.Data["operation"] = operation
    event.Data["old"] = oldData
    event.Data["new"] = newData
    event.Metadata["source"] = "postgres"

    _, err := c.producer.Publish(ctx, event)
    return err
}

// CDC Consumer for cache invalidation
type CDCConsumer struct {
    cache Cache
}

func (c *CDCConsumer) Handle(ctx context.Context, event *models.Event, id string) error {
    table := event.Data["table"].(string)
    operation := event.Data["operation"].(string)

    switch operation {
    case "UPDATE", "DELETE":
        // Invalidate cache
        newData := event.Data["new"].(map[string]interface{})
        if pk, ok := newData["id"]; ok {
            cacheKey := fmt.Sprintf("%s:%v", table, pk)
            return c.cache.Delete(ctx, cacheKey)
        }
    }

    return nil
}

6. Metrics and Monitoring

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    streamLength = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "redis_stream_length",
            Help: "Current length of stream",
        },
        []string{"stream"},
    )

    consumerLag = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "redis_consumer_lag",
            Help: "Consumer lag (pending messages)",
        },
        []string{"stream", "group", "consumer"},
    )

    messagesProcessed = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "redis_messages_processed_total",
            Help: "Total messages processed",
        },
        []string{"stream", "group", "status"},
    )

    processingDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "redis_message_processing_seconds",
            Help:    "Message processing duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"stream", "event_type"},
    )
)

// Monitor stream metrics
func monitorStream(ctx context.Context, client *redisclient.Client, stream string) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            length, err := client.StreamLength(ctx, stream)
            if err != nil {
                log.Printf("Error getting stream length: %v", err)
                continue
            }

            streamLength.WithLabelValues(stream).Set(float64(length))
        }
    }
}

// Monitor consumer lag
func monitorConsumerLag(ctx context.Context, client *redisclient.Client, stream, group, consumer string) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            pending, err := client.GetClient().XPending(ctx, stream, group).Result()
            if err != nil {
                log.Printf("Error getting pending: %v", err)
                continue
            }

            consumerLag.WithLabelValues(stream, group, consumer).Set(float64(pending.Count))
        }
    }
}

Dependencies

// go.mod
module redis-streams-service

go 1.21

require (
    github.com/redis/go-redis/v9 v9.5.1
    github.com/google/uuid v1.6.0
    github.com/prometheus/client_golang v1.19.0
)

Docker Compose Setup

# deployments/docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7.2-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  redis-insight:
    image: redislabs/redisinsight:latest
    ports:
      - "8001:8001"
    volumes:
      - redisinsight_data:/db

volumes:
  redis_data:
  redisinsight_data:

Best Practices

1. Stream Management

Trimming: Regularly trim streams to prevent unbounded growth.

// Trim by length
rdb.XTrimMaxLen(ctx, "stream", 10000)

// Trim by time (keep last 7 days)
minID := fmt.Sprintf("%d-0", time.Now().AddDate(0, 0, -7).UnixMilli())
rdb.XTrimMinID(ctx, "stream", minID)

Monitoring: Track stream length and consumer lag.

2. Consumer Groups

Use Consumer Groups for Distribution: Scale horizontally with multiple consumers.

Handle Pending Messages: Implement PEL claiming for fault tolerance.

Set Reasonable Block Times: Balance responsiveness and CPU usage.

3. Error Handling

Acknowledge Strategically: Only ACK after successful processing.

Implement Retry Logic: Use PEL claiming for automatic retries.

Dead Letter Streams: Move failed messages to separate stream after max retries.

4. Performance Optimization

Batch Operations: Use pipelining for bulk operations.

Prefetch with COUNT: Read multiple messages per request.

Approximate Trimming: Use MAXLEN ~ for better performance.

5. Data Modeling

Use Appropriate IDs: Let Redis auto-generate for simplicity.

Structured Fields: Use consistent field naming conventions.

Metadata: Include timestamps and types for filtering.

6. Operational Considerations

Memory Management: Set maxmemory and eviction policies.

Persistence: Enable AOF for durability if needed.

Monitoring: Track memory usage, command latency, and stream metrics.

When to Use Redis Streams

Perfect for:

  • Activity feeds and timelines
  • Real-time analytics and metrics
  • Lightweight message queues
  • Event sourcing with limited history
  • IoT data collection
  • Chat and messaging (with TTL)

Not ideal for:

  • Long-term data retention (use Kafka)
  • Complex routing (use RabbitMQ)
  • Transactional guarantees (use databases)
  • Large message payloads (>1MB)
  • Strict ordering across partitions

Conclusion

Redis Streams provides a powerful yet simple solution for real-time data streaming and messaging. With consumer groups, pending entry lists, and range queries, it offers sophisticated features while maintaining Redis’s legendary performance and ease of use.

Key takeaways:

  • Leverage consumer groups for scalable processing
  • Use PEL claiming for fault tolerance
  • Implement proper stream trimming strategies
  • Monitor stream length and consumer lag
  • Design events with clear field structures

Redis Streams shines when you need streaming capabilities without the operational overhead of dedicated platforms like Kafka, making it an excellent choice for many real-time use cases.