{{series_nav(current_post=7)}}

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, and scalable data pipelines. Originally developed at LinkedIn, Kafka combines messaging, storage, and stream processing into a single platform that can handle trillions of events per day.

What is Apache Kafka?

Kafka is built around a distributed commit log that allows producers to publish records to topics, which are partitioned and replicated across a cluster of brokers. Consumers read from these topics using consumer groups, enabling horizontal scaling and fault tolerance.

Core Concepts

Topics and Partitions: Topics are logical channels that organize messages. Each topic is divided into partitions for parallelism and scalability.

Producers: Applications that publish records to Kafka topics, with configurable delivery semantics.

Consumers: Applications that subscribe to topics and process records, organized into consumer groups for load distribution.

Brokers: Kafka servers that store data and serve client requests.

Offsets: Sequential IDs assigned to each record within a partition, used for tracking consumer progress.

Architecture

graph TB subgraph "Producers" P1[Producer 1] P2[Producer 2] P3[Producer 3] end subgraph "Kafka Cluster" subgraph "Broker 1 (Leader)" T1P0[Topic: orders
Partition 0
Leader] T1P1[Topic: orders
Partition 1
Replica] end subgraph "Broker 2" T1P1L[Topic: orders
Partition 1
Leader] T1P2[Topic: orders
Partition 2
Replica] end subgraph "Broker 3 (Follower)" T1P0R[Topic: orders
Partition 0
Replica] T1P2L[Topic: orders
Partition 2
Leader] end ZK[ZooKeeper/
KRaft] end subgraph "Consumer Group A" C1[Consumer 1
Partition 0] C2[Consumer 2
Partition 1,2] end subgraph "Consumer Group B" C3[Consumer 3
All Partitions] end P1 -->|Round Robin| T1P0 P2 -->|Key Hash| T1P1L P3 -->|Custom Partitioner| T1P2L T1P0 -.->|Replicate| T1P0R T1P1L -.->|Replicate| T1P1 T1P2L -.->|Replicate| T1P2 T1P0 --> C1 T1P1L --> C2 T1P2L --> C2 T1P0 --> C3 T1P1L --> C3 T1P2L --> C3 ZK -.->|Coordinate| T1P0 ZK -.->|Coordinate| T1P1L ZK -.->|Coordinate| T1P2L

Message Flow

sequenceDiagram participant P as Producer participant B1 as Broker (Leader) participant B2 as Broker (Follower) participant C as Consumer participant Z as ZooKeeper/KRaft Note over P,Z: Topic Creation P->>B1: Create Topic Request B1->>Z: Register Topic Metadata Z->>B1: Assign Partitions & Leaders B1->>B2: Replicate Metadata Note over P,C: Message Publishing P->>B1: Produce (key, value, headers) B1->>B1: Append to Partition Log B1->>B2: Replicate to ISR B2->>B1: ACK Replication B1->>P: ACK (offset) Note over C,B1: Message Consumption C->>Z: Join Consumer Group Z->>C: Assign Partitions C->>B1: Fetch (offset, max_bytes) B1->>C: Records Batch C->>C: Process Messages C->>B1: Commit Offset B1->>Z: Update Offset

Real-World Use Cases

1. Event-Driven Microservices

Decouple services using Kafka as an event backbone for asynchronous communication.

Perfect for: Order processing, inventory updates, notification systems

2. Log Aggregation

Collect logs from multiple services into centralized storage for analysis.

Perfect for: Centralized logging, security monitoring, application debugging

3. Stream Processing

Process data in real-time using Kafka Streams or external processors.

Perfect for: Fraud detection, real-time analytics, alerting systems

4. Change Data Capture (CDC)

Capture database changes and propagate them to downstream systems.

Perfect for: Data replication, cache invalidation, search index updates

5. Metrics Collection

Aggregate metrics from distributed systems for monitoring and alerting.

Perfect for: APM systems, business intelligence, operational dashboards

6. Message Queue Replacement

Replace traditional message queues with Kafka’s durable, scalable architecture.

Perfect for: Task queues, job scheduling, async processing

7. Commit Log for Databases

Use Kafka as a durable write-ahead log for custom storage systems.

Perfect for: Event sourcing, distributed databases, state replication

8. IoT Data Pipelines

Ingest and process high-volume sensor data from IoT devices.

Perfect for: Smart cities, industrial monitoring, connected vehicles

Project Structure

kafka-service/
├── cmd/
│   ├── producer/
│   │   └── main.go           # Producer application
│   └── consumer/
│       └── main.go           # Consumer application
├── internal/
│   ├── producer/
│   │   ├── producer.go       # Producer implementation
│   │   └── partitioner.go    # Custom partitioner
│   ├── consumer/
│   │   ├── consumer.go       # Consumer implementation
│   │   └── handler.go        # Message handler
│   ├── models/
│   │   └── event.go          # Event models
│   └── config/
│       └── config.go         # Configuration
├── pkg/
│   ├── serializer/
│   │   ├── json.go           # JSON serialization
│   │   └── avro.go           # Avro serialization
│   └── interceptor/
│       ├── metrics.go        # Metrics interceptor
│       └── tracing.go        # Tracing interceptor
├── deployments/
│   ├── docker-compose.yml    # Kafka cluster setup
│   └── kafka/
│       └── server.properties # Kafka config
├── scripts/
│   ├── create-topics.sh      # Topic creation
│   └── setup.sh              # Environment setup
├── go.mod
└── go.sum

Implementation

1. Event Models

// internal/models/event.go
package models

import (
    "encoding/json"
    "time"
)

// Event represents a generic Kafka event
type Event struct {
    ID        string                 `json:"id"`
    Type      string                 `json:"type"`
    Timestamp time.Time              `json:"timestamp"`
    Source    string                 `json:"source"`
    Data      map[string]interface{} `json:"data"`
    Metadata  map[string]string      `json:"metadata"`
}

// OrderEvent represents an order-related event
type OrderEvent struct {
    OrderID    string    `json:"order_id"`
    UserID     string    `json:"user_id"`
    Status     string    `json:"status"`
    Amount     float64   `json:"amount"`
    Items      []Item    `json:"items"`
    CreatedAt  time.Time `json:"created_at"`
}

type Item struct {
    ProductID string  `json:"product_id"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

// Serialize converts event to JSON bytes
func (e *Event) Serialize() ([]byte, error) {
    return json.Marshal(e)
}

// Deserialize converts JSON bytes to event
func (e *Event) Deserialize(data []byte) error {
    return json.Unmarshal(data, e)
}

// Key returns the partition key for the event
func (e *OrderEvent) Key() string {
    return e.UserID // Partition by user ID to maintain order
}

2. Producer Implementation

// internal/producer/producer.go
package producer

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "kafka-service/internal/models"
)

type Producer struct {
    writer *kafka.Writer
    topic  string
}

// Config holds producer configuration
type Config struct {
    Brokers          []string
    Topic            string
    Compression      kafka.Compression
    BatchSize        int
    BatchTimeout     time.Duration
    MaxAttempts      int
    RequiredAcks     int // -1 all, 0 none, 1 leader
    Async            bool
}

// NewProducer creates a new Kafka producer
func NewProducer(cfg Config) *Producer {
    writer := &kafka.Writer{
        Addr:         kafka.TCP(cfg.Brokers...),
        Topic:        cfg.Topic,
        Balancer:     &kafka.Hash{}, // Partition by message key
        Compression:  cfg.Compression,
        BatchSize:    cfg.BatchSize,
        BatchTimeout: cfg.BatchTimeout,
        MaxAttempts:  cfg.MaxAttempts,
        RequiredAcks: kafka.RequiredAcks(cfg.RequiredAcks),
        Async:        cfg.Async,

        // Error handling
        ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
            log.Printf("[PRODUCER ERROR] "+msg, args...)
        }),
    }

    return &Producer{
        writer: writer,
        topic:  cfg.Topic,
    }
}

// ProduceEvent sends a single event to Kafka
func (p *Producer) ProduceEvent(ctx context.Context, event *models.OrderEvent) error {
    data, err := event.Serialize()
    if err != nil {
        return fmt.Errorf("serialize event: %w", err)
    }

    msg := kafka.Message{
        Key:   []byte(event.Key()),
        Value: data,
        Headers: []kafka.Header{
            {Key: "event-type", Value: []byte("order")},
            {Key: "version", Value: []byte("v1")},
            {Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))},
        },
        Time: time.Now(),
    }

    err = p.writer.WriteMessages(ctx, msg)
    if err != nil {
        return fmt.Errorf("write message: %w", err)
    }

    log.Printf("Produced message: key=%s, offset=%d, partition=%d",
        string(msg.Key), msg.Offset, msg.Partition)

    return nil
}

// ProduceBatch sends multiple events in a batch
func (p *Producer) ProduceBatch(ctx context.Context, events []*models.OrderEvent) error {
    messages := make([]kafka.Message, len(events))

    for i, event := range events {
        data, err := event.Serialize()
        if err != nil {
            return fmt.Errorf("serialize event %d: %w", i, err)
        }

        messages[i] = kafka.Message{
            Key:   []byte(event.Key()),
            Value: data,
            Time:  time.Now(),
        }
    }

    err := p.writer.WriteMessages(ctx, messages...)
    if err != nil {
        return fmt.Errorf("write batch: %w", err)
    }

    log.Printf("Produced batch of %d messages", len(messages))
    return nil
}

// ProduceWithPartition sends event to specific partition
func (p *Producer) ProduceWithPartition(ctx context.Context, partition int, event *models.OrderEvent) error {
    data, err := event.Serialize()
    if err != nil {
        return fmt.Errorf("serialize event: %w", err)
    }

    msg := kafka.Message{
        Partition: partition,
        Key:       []byte(event.Key()),
        Value:     data,
        Time:      time.Now(),
    }

    err = p.writer.WriteMessages(ctx, msg)
    if err != nil {
        return fmt.Errorf("write message: %w", err)
    }

    return nil
}

// Stats returns producer statistics
func (p *Producer) Stats() kafka.WriterStats {
    return p.writer.Stats()
}

// Close closes the producer
func (p *Producer) Close() error {
    return p.writer.Close()
}

3. Custom Partitioner

// internal/producer/partitioner.go
package producer

import (
    "hash/fnv"
    "github.com/segmentio/kafka-go"
)

// CustomPartitioner implements custom partitioning logic
type CustomPartitioner struct {
    partitions int
}

func NewCustomPartitioner(partitions int) *CustomPartitioner {
    return &CustomPartitioner{partitions: partitions}
}

// Balance distributes messages across partitions
func (p *CustomPartitioner) Balance(msg kafka.Message, partitions ...int) int {
    if len(partitions) == 0 {
        return 0
    }

    // If message has key, use consistent hashing
    if len(msg.Key) > 0 {
        h := fnv.New32a()
        h.Write(msg.Key)
        return int(h.Sum32()) % len(partitions)
    }

    // If no key, use round-robin
    return int(msg.Offset) % len(partitions)
}

// PriorityPartitioner sends high-priority messages to dedicated partition
type PriorityPartitioner struct {
    highPriorityPartition int
    totalPartitions       int
}

func (p *PriorityPartitioner) Balance(msg kafka.Message, partitions ...int) int {
    // Check for priority header
    for _, header := range msg.Headers {
        if header.Key == "priority" && string(header.Value) == "high" {
            return p.highPriorityPartition
        }
    }

    // Regular messages go to other partitions
    h := fnv.New32a()
    h.Write(msg.Key)
    partition := int(h.Sum32()) % (p.totalPartitions - 1)

    // Skip high-priority partition
    if partition >= p.highPriorityPartition {
        partition++
    }

    return partition
}

4. Consumer Implementation

// internal/consumer/consumer.go
package consumer

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "kafka-service/internal/models"
)

type Consumer struct {
    reader  *kafka.Reader
    handler MessageHandler
}

// MessageHandler processes consumed messages
type MessageHandler interface {
    Handle(ctx context.Context, msg *models.Event) error
}

// Config holds consumer configuration
type Config struct {
    Brokers        []string
    Topic          string
    GroupID        string
    MinBytes       int
    MaxBytes       int
    MaxWait        time.Duration
    CommitInterval time.Duration
    StartOffset    int64 // kafka.FirstOffset or kafka.LastOffset
    Partition      int   // -1 for auto-assignment
}

// NewConsumer creates a new Kafka consumer
func NewConsumer(cfg Config, handler MessageHandler) *Consumer {
    readerConfig := kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topic,
        GroupID:        cfg.GroupID,
        MinBytes:       cfg.MinBytes,
        MaxBytes:       cfg.MaxBytes,
        MaxWait:        cfg.MaxWait,
        CommitInterval: cfg.CommitInterval,
        StartOffset:    cfg.StartOffset,

        // Error handling
        ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
            log.Printf("[CONSUMER ERROR] "+msg, args...)
        }),
    }

    // Partition assignment
    if cfg.Partition >= 0 {
        readerConfig.Partition = cfg.Partition
    }

    reader := kafka.NewReader(readerConfig)

    return &Consumer{
        reader:  reader,
        handler: handler,
    }
}

// Start begins consuming messages
func (c *Consumer) Start(ctx context.Context) error {
    log.Printf("Starting consumer for topic: %s", c.reader.Config().Topic)

    for {
        select {
        case <-ctx.Done():
            log.Println("Consumer stopped")
            return ctx.Err()
        default:
            msg, err := c.reader.FetchMessage(ctx)
            if err != nil {
                if err == context.Canceled {
                    return nil
                }
                log.Printf("Error fetching message: %v", err)
                continue
            }

            if err := c.processMessage(ctx, msg); err != nil {
                log.Printf("Error processing message: %v", err)
                // Don't commit on error - will retry
                continue
            }

            // Commit offset after successful processing
            if err := c.reader.CommitMessages(ctx, msg); err != nil {
                log.Printf("Error committing message: %v", err)
            }
        }
    }
}

// processMessage handles a single message
func (c *Consumer) processMessage(ctx context.Context, msg kafka.Message) error {
    log.Printf("Processing message: partition=%d, offset=%d, key=%s",
        msg.Partition, msg.Offset, string(msg.Key))

    var event models.Event
    if err := event.Deserialize(msg.Value); err != nil {
        return fmt.Errorf("deserialize message: %w", err)
    }

    // Add metadata from Kafka message
    event.Metadata = make(map[string]string)
    for _, header := range msg.Headers {
        event.Metadata[header.Key] = string(header.Value)
    }
    event.Metadata["partition"] = fmt.Sprintf("%d", msg.Partition)
    event.Metadata["offset"] = fmt.Sprintf("%d", msg.Offset)

    return c.handler.Handle(ctx, &event)
}

// Stats returns consumer statistics
func (c *Consumer) Stats() kafka.ReaderStats {
    return c.reader.Stats()
}

// Close closes the consumer
func (c *Consumer) Close() error {
    return c.reader.Close()
}

// Seek moves to a specific offset
func (c *Consumer) Seek(offset int64) error {
    return c.reader.SetOffset(offset)
}

5. Message Handler

// internal/consumer/handler.go
package consumer

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "kafka-service/internal/models"
)

// OrderHandler processes order events
type OrderHandler struct {
    // Dependencies like database, cache, etc.
    retryLimit int
}

func NewOrderHandler(retryLimit int) *OrderHandler {
    return &OrderHandler{
        retryLimit: retryLimit,
    }
}

// Handle processes a single event
func (h *OrderHandler) Handle(ctx context.Context, event *models.Event) error {
    log.Printf("Handling event: type=%s, id=%s", event.Type, event.ID)

    switch event.Type {
    case "order.created":
        return h.handleOrderCreated(ctx, event)
    case "order.updated":
        return h.handleOrderUpdated(ctx, event)
    case "order.cancelled":
        return h.handleOrderCancelled(ctx, event)
    default:
        log.Printf("Unknown event type: %s", event.Type)
        return nil
    }
}

func (h *OrderHandler) handleOrderCreated(ctx context.Context, event *models.Event) error {
    var order models.OrderEvent
    data, _ := json.Marshal(event.Data)
    if err := json.Unmarshal(data, &order); err != nil {
        return fmt.Errorf("unmarshal order: %w", err)
    }

    log.Printf("Processing new order: id=%s, user=%s, amount=%.2f",
        order.OrderID, order.UserID, order.Amount)

    // Simulate processing
    time.Sleep(100 * time.Millisecond)

    // Business logic here:
    // - Save to database
    // - Send notifications
    // - Update inventory
    // - Process payment

    return nil
}

func (h *OrderHandler) handleOrderUpdated(ctx context.Context, event *models.Event) error {
    log.Printf("Order updated: %s", event.ID)
    // Handle update logic
    return nil
}

func (h *OrderHandler) handleOrderCancelled(ctx context.Context, event *models.Event) error {
    log.Printf("Order cancelled: %s", event.ID)
    // Handle cancellation logic
    return nil
}

// RetryHandler wraps another handler with retry logic
type RetryHandler struct {
    handler    MessageHandler
    maxRetries int
    backoff    time.Duration
}

func NewRetryHandler(handler MessageHandler, maxRetries int, backoff time.Duration) *RetryHandler {
    return &RetryHandler{
        handler:    handler,
        maxRetries: maxRetries,
        backoff:    backoff,
    }
}

func (h *RetryHandler) Handle(ctx context.Context, event *models.Event) error {
    var lastErr error

    for attempt := 0; attempt <= h.maxRetries; attempt++ {
        if attempt > 0 {
            wait := h.backoff * time.Duration(1<<uint(attempt-1)) // Exponential backoff
            log.Printf("Retry attempt %d/%d after %v", attempt, h.maxRetries, wait)
            time.Sleep(wait)
        }

        if err := h.handler.Handle(ctx, event); err != nil {
            lastErr = err
            log.Printf("Handler error (attempt %d): %v", attempt, err)
            continue
        }

        return nil
    }

    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

6. Producer Application

// cmd/producer/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/google/uuid"
    "github.com/segmentio/kafka-go"
    "kafka-service/internal/models"
    "kafka-service/internal/producer"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create producer
    prod := producer.NewProducer(producer.Config{
        Brokers:      []string{"localhost:9092"},
        Topic:        "orders",
        Compression:  kafka.Snappy,
        BatchSize:    100,
        BatchTimeout: 10 * time.Millisecond,
        MaxAttempts:  3,
        RequiredAcks: -1, // All in-sync replicas
        Async:        false,
    })
    defer prod.Close()

    // Start producing
    go produceOrders(ctx, prod)

    // Wait for shutdown
    <-sigChan
    log.Println("Shutting down producer...")
    cancel()

    // Print stats
    stats := prod.Stats()
    log.Printf("Producer stats: Messages=%d, Bytes=%d, Errors=%d",
        stats.Messages, stats.Bytes, stats.Errors)
}

func produceOrders(ctx context.Context, prod *producer.Producer) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            event := &models.OrderEvent{
                OrderID:   uuid.New().String(),
                UserID:    fmt.Sprintf("user-%d", time.Now().Unix()%100),
                Status:    "pending",
                Amount:    float64(time.Now().Unix()%1000) / 10,
                CreatedAt: time.Now(),
                Items: []models.Item{
                    {
                        ProductID: "prod-123",
                        Quantity:  2,
                        Price:     29.99,
                    },
                },
            }

            if err := prod.ProduceEvent(ctx, event); err != nil {
                log.Printf("Failed to produce event: %v", err)
            } else {
                log.Printf("Produced order: %s", event.OrderID)
            }
        }
    }
}

7. Consumer Application

// cmd/consumer/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/segmentio/kafka-go"
    "kafka-service/internal/consumer"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create handler with retry logic
    baseHandler := consumer.NewOrderHandler(3)
    handler := consumer.NewRetryHandler(baseHandler, 3, 1*time.Second)

    // Create consumer
    cons := consumer.NewConsumer(consumer.Config{
        Brokers:        []string{"localhost:9092"},
        Topic:          "orders",
        GroupID:        "order-processor",
        MinBytes:       1024,           // 1KB
        MaxBytes:       10 * 1024 * 1024, // 10MB
        MaxWait:        500 * time.Millisecond,
        CommitInterval: 1 * time.Second,
        StartOffset:    kafka.FirstOffset,
        Partition:      -1, // Auto-assign
    }, handler)
    defer cons.Close()

    // Start consuming in goroutine
    errChan := make(chan error, 1)
    go func() {
        errChan <- cons.Start(ctx)
    }()

    // Wait for shutdown or error
    select {
    case <-sigChan:
        log.Println("Shutting down consumer...")
        cancel()
    case err := <-errChan:
        if err != nil && err != context.Canceled {
            log.Printf("Consumer error: %v", err)
        }
    }

    // Print stats
    stats := cons.Stats()
    log.Printf("Consumer stats: Messages=%d, Bytes=%d, Lag=%d",
        stats.Messages, stats.Bytes, stats.Lag)
}

Advanced Patterns

1. Exactly-Once Semantics (EOS)

// Transactional producer for exactly-once delivery
type TransactionalProducer struct {
    transport *kafka.Transport
}

func NewTransactionalProducer(brokers []string, transactionalID string) *TransactionalProducer {
    transport := &kafka.Transport{
        TLS: nil, // Add TLS config if needed
    }

    // Initialize transactions
    // Note: kafka-go has limited transaction support
    // For production, consider confluent-kafka-go

    return &TransactionalProducer{transport: transport}
}

// In practice, use Confluent's library for full transaction support:
/*
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

func createTransactionalProducer() (*kafka.Producer, error) {
    return kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "transactional.id":  "txn-producer-1",
        "acks":              "all",
    })
}
*/

2. Offset Management

// Manual offset management for fine-grained control
type OffsetManager struct {
    reader    *kafka.Reader
    committed map[int]int64 // partition -> offset
}

func (om *OffsetManager) Process(ctx context.Context) error {
    msg, err := om.reader.FetchMessage(ctx)
    if err != nil {
        return err
    }

    // Process message
    if err := om.handleMessage(msg); err != nil {
        // Don't commit, will retry
        return err
    }

    // Track committed offset
    om.committed[msg.Partition] = msg.Offset + 1

    // Commit
    return om.reader.CommitMessages(ctx, msg)
}

// Reset to specific offset
func (om *OffsetManager) ResetToTimestamp(t time.Time) error {
    // Seek to timestamp
    return om.reader.SetOffsetAt(context.Background(), t)
}

// Get lag for monitoring
func (om *OffsetManager) GetLag() (int64, error) {
    stats := om.reader.Stats()
    return stats.Lag, nil
}

3. Consumer Rebalancing

// Custom rebalance listener
type RebalanceListener struct {
    reader *kafka.Reader
}

func (rl *RebalanceListener) OnPartitionsAssigned(partitions []kafka.Partition) {
    log.Printf("Partitions assigned: %v", partitions)

    // Perform initialization for new partitions
    for _, partition := range partitions {
        log.Printf("Initializing partition %d", partition.ID)
        // Load state, initialize resources, etc.
    }
}

func (rl *RebalanceListener) OnPartitionsRevoked(partitions []kafka.Partition) {
    log.Printf("Partitions revoked: %v", partitions)

    // Cleanup before rebalancing
    for _, partition := range partitions {
        log.Printf("Cleaning up partition %d", partition.ID)
        // Flush caches, commit offsets, etc.
    }
}

4. Dead Letter Queue (DLQ)

type DLQHandler struct {
    handler    MessageHandler
    dlqWriter  *kafka.Writer
    maxRetries int
}

func NewDLQHandler(handler MessageHandler, dlqTopic string, maxRetries int) *DLQHandler {
    return &DLQHandler{
        handler: handler,
        dlqWriter: &kafka.Writer{
            Addr:  kafka.TCP("localhost:9092"),
            Topic: dlqTopic,
        },
        maxRetries: maxRetries,
    }
}

func (h *DLQHandler) Handle(ctx context.Context, event *models.Event) error {
    retryCount := 0
    if val, ok := event.Metadata["retry_count"]; ok {
        fmt.Sscanf(val, "%d", &retryCount)
    }

    err := h.handler.Handle(ctx, event)
    if err == nil {
        return nil
    }

    retryCount++
    if retryCount >= h.maxRetries {
        // Send to DLQ
        log.Printf("Max retries exceeded, sending to DLQ: %s", event.ID)

        data, _ := event.Serialize()
        dlqMsg := kafka.Message{
            Key:   []byte(event.ID),
            Value: data,
            Headers: []kafka.Header{
                {Key: "error", Value: []byte(err.Error())},
                {Key: "retry_count", Value: []byte(fmt.Sprintf("%d", retryCount))},
                {Key: "original_topic", Value: []byte("orders")},
            },
        }

        return h.dlqWriter.WriteMessages(ctx, dlqMsg)
    }

    // Update retry count and return error to trigger retry
    event.Metadata["retry_count"] = fmt.Sprintf("%d", retryCount)
    return err
}

5. Message Filtering and Routing

// Filter messages before processing
type FilteringConsumer struct {
    consumer *Consumer
    filters  []MessageFilter
}

type MessageFilter func(*models.Event) bool

func (fc *FilteringConsumer) AddFilter(filter MessageFilter) {
    fc.filters = append(fc.filters, filter)
}

func (fc *FilteringConsumer) shouldProcess(event *models.Event) bool {
    for _, filter := range fc.filters {
        if !filter(event) {
            return false
        }
    }
    return true
}

// Example filters
func EventTypeFilter(allowedTypes ...string) MessageFilter {
    typeMap := make(map[string]bool)
    for _, t := range allowedTypes {
        typeMap[t] = true
    }

    return func(event *models.Event) bool {
        return typeMap[event.Type]
    }
}

func TimeRangeFilter(start, end time.Time) MessageFilter {
    return func(event *models.Event) bool {
        return event.Timestamp.After(start) && event.Timestamp.Before(end)
    }
}

6. Schema Registry Integration

// Avro schema with schema registry
type AvroProducer struct {
    writer         *kafka.Writer
    schemaRegistry string
    schemas        map[string]int // schema -> ID
}

func (ap *AvroProducer) ProduceAvro(ctx context.Context, schema string, data interface{}) error {
    // Get schema ID from registry
    schemaID, err := ap.getSchemaID(schema)
    if err != nil {
        return err
    }

    // Serialize with Avro
    avroData, err := ap.serializeAvro(data, schema)
    if err != nil {
        return err
    }

    // Prepend schema ID (Confluent wire format)
    // Format: [0x00, schemaID (4 bytes), avro data]
    payload := make([]byte, 5+len(avroData))
    payload[0] = 0
    binary.BigEndian.PutUint32(payload[1:5], uint32(schemaID))
    copy(payload[5:], avroData)

    return ap.writer.WriteMessages(ctx, kafka.Message{
        Value: payload,
    })
}

func (ap *AvroProducer) getSchemaID(schema string) (int, error) {
    // Check cache
    if id, ok := ap.schemas[schema]; ok {
        return id, nil
    }

    // Register schema with registry
    // Implementation depends on schema registry API
    return 0, nil
}

7. Metrics and Monitoring

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    messagesProduced = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "kafka_messages_produced_total",
            Help: "Total number of messages produced",
        },
        []string{"topic", "status"},
    )

    messagesConsumed = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "kafka_messages_consumed_total",
            Help: "Total number of messages consumed",
        },
        []string{"topic", "partition", "consumer_group"},
    )

    consumerLag = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "kafka_consumer_lag",
            Help: "Current consumer lag",
        },
        []string{"topic", "partition", "consumer_group"},
    )

    processingDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "kafka_message_processing_duration_seconds",
            Help:    "Message processing duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"topic", "event_type"},
    )
)

// MetricsHandler wraps handler with metrics
type MetricsHandler struct {
    handler MessageHandler
    topic   string
}

func (mh *MetricsHandler) Handle(ctx context.Context, event *models.Event) error {
    start := time.Now()

    err := mh.handler.Handle(ctx, event)

    duration := time.Since(start).Seconds()
    processingDuration.WithLabelValues(mh.topic, event.Type).Observe(duration)

    status := "success"
    if err != nil {
        status = "error"
    }
    messagesConsumed.WithLabelValues(mh.topic, event.Metadata["partition"], "default").Inc()

    return err
}

Dependencies

// go.mod
module kafka-service

go 1.21

require (
    github.com/segmentio/kafka-go v0.4.47
    github.com/google/uuid v1.6.0
    github.com/prometheus/client_golang v1.19.0
    github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 // For advanced features
)

Docker Compose Setup

# deployments/docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka-1:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_COMPRESSION_TYPE: snappy

  kafka-2:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2

  kafka-3:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9093,kafka-3:9094
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

Best Practices

1. Topic Design

Partition Count: Choose based on target throughput and consumer parallelism.

// Rule of thumb: partitions = max(target_throughput_mb/s / 10, max_consumers)
// Example: 100 MB/s throughput, 20 consumers = max(10, 20) = 20 partitions

Naming Convention: Use hierarchical naming for organization.

<domain>.<entity>.<event_type>
Examples:
- commerce.orders.created
- user.profile.updated
- analytics.pageviews.tracked

2. Message Serialization

Use Schema Registry: Ensures backward/forward compatibility.

Avro over JSON: For better performance and schema evolution.

3. Consumer Group Management

One Consumer Group per Application: Isolate applications for independent scaling.

Monitor Lag: Set up alerts for consumer lag exceeding thresholds.

func monitorLag(consumer *Consumer) {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        stats := consumer.Stats()
        if stats.Lag > 10000 {
            alert("High consumer lag detected: %d", stats.Lag)
        }
    }
}

4. Error Handling

Idempotent Consumers: Design handlers to safely process duplicates.

Dead Letter Queue: Route failed messages for manual inspection.

Circuit Breaker: Prevent cascading failures in downstream systems.

5. Performance Optimization

Batch Processing: Process messages in batches when possible.

func (c *Consumer) processBatch(ctx context.Context, size int) error {
    messages := make([]kafka.Message, 0, size)

    for i := 0; i < size; i++ {
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            break
        }
        messages = append(messages, msg)
    }

    // Process batch
    if err := c.handler.HandleBatch(ctx, messages); err != nil {
        return err
    }

    // Commit batch
    return c.reader.CommitMessages(ctx, messages...)
}

Compression: Use Snappy or LZ4 for network efficiency.

Partition Assignment: Use sticky assignor to minimize rebalancing.

6. Security

Enable TLS: Encrypt data in transit.

writer := &kafka.Writer{
    Addr: kafka.TCP("localhost:9092"),
    Transport: &kafka.Transport{
        TLS: &tls.Config{
            // TLS configuration
        },
    },
}

SASL Authentication: Secure client-broker communication.

ACLs: Implement topic-level access control.

7. Monitoring

Key Metrics to Track:

  • Producer: throughput, latency, error rate
  • Consumer: lag, throughput, rebalance frequency
  • Broker: disk usage, network I/O, request latency

Set Up Alerts:

  • Consumer lag > threshold
  • Rebalancing frequency spike
  • Producer error rate increase

8. Capacity Planning

Estimate Storage: storage = daily_throughput × retention_days × replication_factor

Sizing Brokers: Ensure disk I/O can handle peak write loads.

Network Bandwidth: Account for replication traffic (multiply by replication factor).

When to Use Kafka

Perfect for:

  • High-throughput event streaming (millions of events/sec)
  • Event sourcing and CQRS patterns
  • Log aggregation from distributed systems
  • Real-time analytics pipelines
  • Building event-driven microservices

Not ideal for:

  • Low-latency RPC (<10ms)
  • Small-scale applications (operational complexity)
  • Transient messaging without persistence needs
  • Simple pub-sub with few consumers

Conclusion

Apache Kafka provides a robust, scalable platform for event streaming that can handle massive throughput while maintaining ordering guarantees and fault tolerance. The Go ecosystem offers excellent Kafka client libraries that make it easy to build production-grade event-driven systems.

Key takeaways:

  • Use partitioning strategically for parallelism and ordering
  • Implement proper error handling with DLQs
  • Monitor consumer lag continuously
  • Design for idempotency
  • Leverage consumer groups for horizontal scaling

For additional patterns like Kafka Streams, Connect, and ksqlDB integration, explore the official Kafka documentation and consider the Confluent platform for enterprise features