{{series_nav(current_post=7)}}
Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, and scalable data pipelines. Originally developed at LinkedIn, Kafka combines messaging, storage, and stream processing into a single platform that can handle trillions of events per day.
What is Apache Kafka?
Kafka is built around a distributed commit log that allows producers to publish records to topics, which are partitioned and replicated across a cluster of brokers. Consumers read from these topics using consumer groups, enabling horizontal scaling and fault tolerance.
Core Concepts
Topics and Partitions: Topics are logical channels that organize messages. Each topic is divided into partitions for parallelism and scalability.
Producers: Applications that publish records to Kafka topics, with configurable delivery semantics.
Consumers: Applications that subscribe to topics and process records, organized into consumer groups for load distribution.
Brokers: Kafka servers that store data and serve client requests.
Offsets: Sequential IDs assigned to each record within a partition, used for tracking consumer progress.
Architecture
Partition 0
Leader] T1P1[Topic: orders
Partition 1
Replica] end subgraph "Broker 2" T1P1L[Topic: orders
Partition 1
Leader] T1P2[Topic: orders
Partition 2
Replica] end subgraph "Broker 3 (Follower)" T1P0R[Topic: orders
Partition 0
Replica] T1P2L[Topic: orders
Partition 2
Leader] end ZK[ZooKeeper/
KRaft] end subgraph "Consumer Group A" C1[Consumer 1
Partition 0] C2[Consumer 2
Partition 1,2] end subgraph "Consumer Group B" C3[Consumer 3
All Partitions] end P1 -->|Round Robin| T1P0 P2 -->|Key Hash| T1P1L P3 -->|Custom Partitioner| T1P2L T1P0 -.->|Replicate| T1P0R T1P1L -.->|Replicate| T1P1 T1P2L -.->|Replicate| T1P2 T1P0 --> C1 T1P1L --> C2 T1P2L --> C2 T1P0 --> C3 T1P1L --> C3 T1P2L --> C3 ZK -.->|Coordinate| T1P0 ZK -.->|Coordinate| T1P1L ZK -.->|Coordinate| T1P2L
Message Flow
Real-World Use Cases
1. Event-Driven Microservices
Decouple services using Kafka as an event backbone for asynchronous communication.
Perfect for: Order processing, inventory updates, notification systems
2. Log Aggregation
Collect logs from multiple services into centralized storage for analysis.
Perfect for: Centralized logging, security monitoring, application debugging
3. Stream Processing
Process data in real-time using Kafka Streams or external processors.
Perfect for: Fraud detection, real-time analytics, alerting systems
4. Change Data Capture (CDC)
Capture database changes and propagate them to downstream systems.
Perfect for: Data replication, cache invalidation, search index updates
5. Metrics Collection
Aggregate metrics from distributed systems for monitoring and alerting.
Perfect for: APM systems, business intelligence, operational dashboards
6. Message Queue Replacement
Replace traditional message queues with Kafka’s durable, scalable architecture.
Perfect for: Task queues, job scheduling, async processing
7. Commit Log for Databases
Use Kafka as a durable write-ahead log for custom storage systems.
Perfect for: Event sourcing, distributed databases, state replication
8. IoT Data Pipelines
Ingest and process high-volume sensor data from IoT devices.
Perfect for: Smart cities, industrial monitoring, connected vehicles
Project Structure
kafka-service/
├── cmd/
│ ├── producer/
│ │ └── main.go # Producer application
│ └── consumer/
│ └── main.go # Consumer application
├── internal/
│ ├── producer/
│ │ ├── producer.go # Producer implementation
│ │ └── partitioner.go # Custom partitioner
│ ├── consumer/
│ │ ├── consumer.go # Consumer implementation
│ │ └── handler.go # Message handler
│ ├── models/
│ │ └── event.go # Event models
│ └── config/
│ └── config.go # Configuration
├── pkg/
│ ├── serializer/
│ │ ├── json.go # JSON serialization
│ │ └── avro.go # Avro serialization
│ └── interceptor/
│ ├── metrics.go # Metrics interceptor
│ └── tracing.go # Tracing interceptor
├── deployments/
│ ├── docker-compose.yml # Kafka cluster setup
│ └── kafka/
│ └── server.properties # Kafka config
├── scripts/
│ ├── create-topics.sh # Topic creation
│ └── setup.sh # Environment setup
├── go.mod
└── go.sum
Implementation
1. Event Models
// internal/models/event.go
package models
import (
"encoding/json"
"time"
)
// Event represents a generic Kafka event
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"`
Data map[string]interface{} `json:"data"`
Metadata map[string]string `json:"metadata"`
}
// OrderEvent represents an order-related event
type OrderEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Status string `json:"status"`
Amount float64 `json:"amount"`
Items []Item `json:"items"`
CreatedAt time.Time `json:"created_at"`
}
type Item struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
// Serialize converts event to JSON bytes
func (e *Event) Serialize() ([]byte, error) {
return json.Marshal(e)
}
// Deserialize converts JSON bytes to event
func (e *Event) Deserialize(data []byte) error {
return json.Unmarshal(data, e)
}
// Key returns the partition key for the event
func (e *OrderEvent) Key() string {
return e.UserID // Partition by user ID to maintain order
}
2. Producer Implementation
// internal/producer/producer.go
package producer
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
"kafka-service/internal/models"
)
type Producer struct {
writer *kafka.Writer
topic string
}
// Config holds producer configuration
type Config struct {
Brokers []string
Topic string
Compression kafka.Compression
BatchSize int
BatchTimeout time.Duration
MaxAttempts int
RequiredAcks int // -1 all, 0 none, 1 leader
Async bool
}
// NewProducer creates a new Kafka producer
func NewProducer(cfg Config) *Producer {
writer := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topic,
Balancer: &kafka.Hash{}, // Partition by message key
Compression: cfg.Compression,
BatchSize: cfg.BatchSize,
BatchTimeout: cfg.BatchTimeout,
MaxAttempts: cfg.MaxAttempts,
RequiredAcks: kafka.RequiredAcks(cfg.RequiredAcks),
Async: cfg.Async,
// Error handling
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
log.Printf("[PRODUCER ERROR] "+msg, args...)
}),
}
return &Producer{
writer: writer,
topic: cfg.Topic,
}
}
// ProduceEvent sends a single event to Kafka
func (p *Producer) ProduceEvent(ctx context.Context, event *models.OrderEvent) error {
data, err := event.Serialize()
if err != nil {
return fmt.Errorf("serialize event: %w", err)
}
msg := kafka.Message{
Key: []byte(event.Key()),
Value: data,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte("order")},
{Key: "version", Value: []byte("v1")},
{Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))},
},
Time: time.Now(),
}
err = p.writer.WriteMessages(ctx, msg)
if err != nil {
return fmt.Errorf("write message: %w", err)
}
log.Printf("Produced message: key=%s, offset=%d, partition=%d",
string(msg.Key), msg.Offset, msg.Partition)
return nil
}
// ProduceBatch sends multiple events in a batch
func (p *Producer) ProduceBatch(ctx context.Context, events []*models.OrderEvent) error {
messages := make([]kafka.Message, len(events))
for i, event := range events {
data, err := event.Serialize()
if err != nil {
return fmt.Errorf("serialize event %d: %w", i, err)
}
messages[i] = kafka.Message{
Key: []byte(event.Key()),
Value: data,
Time: time.Now(),
}
}
err := p.writer.WriteMessages(ctx, messages...)
if err != nil {
return fmt.Errorf("write batch: %w", err)
}
log.Printf("Produced batch of %d messages", len(messages))
return nil
}
// ProduceWithPartition sends event to specific partition
func (p *Producer) ProduceWithPartition(ctx context.Context, partition int, event *models.OrderEvent) error {
data, err := event.Serialize()
if err != nil {
return fmt.Errorf("serialize event: %w", err)
}
msg := kafka.Message{
Partition: partition,
Key: []byte(event.Key()),
Value: data,
Time: time.Now(),
}
err = p.writer.WriteMessages(ctx, msg)
if err != nil {
return fmt.Errorf("write message: %w", err)
}
return nil
}
// Stats returns producer statistics
func (p *Producer) Stats() kafka.WriterStats {
return p.writer.Stats()
}
// Close closes the producer
func (p *Producer) Close() error {
return p.writer.Close()
}
3. Custom Partitioner
// internal/producer/partitioner.go
package producer
import (
"hash/fnv"
"github.com/segmentio/kafka-go"
)
// CustomPartitioner implements custom partitioning logic
type CustomPartitioner struct {
partitions int
}
func NewCustomPartitioner(partitions int) *CustomPartitioner {
return &CustomPartitioner{partitions: partitions}
}
// Balance distributes messages across partitions
func (p *CustomPartitioner) Balance(msg kafka.Message, partitions ...int) int {
if len(partitions) == 0 {
return 0
}
// If message has key, use consistent hashing
if len(msg.Key) > 0 {
h := fnv.New32a()
h.Write(msg.Key)
return int(h.Sum32()) % len(partitions)
}
// If no key, use round-robin
return int(msg.Offset) % len(partitions)
}
// PriorityPartitioner sends high-priority messages to dedicated partition
type PriorityPartitioner struct {
highPriorityPartition int
totalPartitions int
}
func (p *PriorityPartitioner) Balance(msg kafka.Message, partitions ...int) int {
// Check for priority header
for _, header := range msg.Headers {
if header.Key == "priority" && string(header.Value) == "high" {
return p.highPriorityPartition
}
}
// Regular messages go to other partitions
h := fnv.New32a()
h.Write(msg.Key)
partition := int(h.Sum32()) % (p.totalPartitions - 1)
// Skip high-priority partition
if partition >= p.highPriorityPartition {
partition++
}
return partition
}
4. Consumer Implementation
// internal/consumer/consumer.go
package consumer
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
"kafka-service/internal/models"
)
type Consumer struct {
reader *kafka.Reader
handler MessageHandler
}
// MessageHandler processes consumed messages
type MessageHandler interface {
Handle(ctx context.Context, msg *models.Event) error
}
// Config holds consumer configuration
type Config struct {
Brokers []string
Topic string
GroupID string
MinBytes int
MaxBytes int
MaxWait time.Duration
CommitInterval time.Duration
StartOffset int64 // kafka.FirstOffset or kafka.LastOffset
Partition int // -1 for auto-assignment
}
// NewConsumer creates a new Kafka consumer
func NewConsumer(cfg Config, handler MessageHandler) *Consumer {
readerConfig := kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
MaxWait: cfg.MaxWait,
CommitInterval: cfg.CommitInterval,
StartOffset: cfg.StartOffset,
// Error handling
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
log.Printf("[CONSUMER ERROR] "+msg, args...)
}),
}
// Partition assignment
if cfg.Partition >= 0 {
readerConfig.Partition = cfg.Partition
}
reader := kafka.NewReader(readerConfig)
return &Consumer{
reader: reader,
handler: handler,
}
}
// Start begins consuming messages
func (c *Consumer) Start(ctx context.Context) error {
log.Printf("Starting consumer for topic: %s", c.reader.Config().Topic)
for {
select {
case <-ctx.Done():
log.Println("Consumer stopped")
return ctx.Err()
default:
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if err == context.Canceled {
return nil
}
log.Printf("Error fetching message: %v", err)
continue
}
if err := c.processMessage(ctx, msg); err != nil {
log.Printf("Error processing message: %v", err)
// Don't commit on error - will retry
continue
}
// Commit offset after successful processing
if err := c.reader.CommitMessages(ctx, msg); err != nil {
log.Printf("Error committing message: %v", err)
}
}
}
}
// processMessage handles a single message
func (c *Consumer) processMessage(ctx context.Context, msg kafka.Message) error {
log.Printf("Processing message: partition=%d, offset=%d, key=%s",
msg.Partition, msg.Offset, string(msg.Key))
var event models.Event
if err := event.Deserialize(msg.Value); err != nil {
return fmt.Errorf("deserialize message: %w", err)
}
// Add metadata from Kafka message
event.Metadata = make(map[string]string)
for _, header := range msg.Headers {
event.Metadata[header.Key] = string(header.Value)
}
event.Metadata["partition"] = fmt.Sprintf("%d", msg.Partition)
event.Metadata["offset"] = fmt.Sprintf("%d", msg.Offset)
return c.handler.Handle(ctx, &event)
}
// Stats returns consumer statistics
func (c *Consumer) Stats() kafka.ReaderStats {
return c.reader.Stats()
}
// Close closes the consumer
func (c *Consumer) Close() error {
return c.reader.Close()
}
// Seek moves to a specific offset
func (c *Consumer) Seek(offset int64) error {
return c.reader.SetOffset(offset)
}
5. Message Handler
// internal/consumer/handler.go
package consumer
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"kafka-service/internal/models"
)
// OrderHandler processes order events
type OrderHandler struct {
// Dependencies like database, cache, etc.
retryLimit int
}
func NewOrderHandler(retryLimit int) *OrderHandler {
return &OrderHandler{
retryLimit: retryLimit,
}
}
// Handle processes a single event
func (h *OrderHandler) Handle(ctx context.Context, event *models.Event) error {
log.Printf("Handling event: type=%s, id=%s", event.Type, event.ID)
switch event.Type {
case "order.created":
return h.handleOrderCreated(ctx, event)
case "order.updated":
return h.handleOrderUpdated(ctx, event)
case "order.cancelled":
return h.handleOrderCancelled(ctx, event)
default:
log.Printf("Unknown event type: %s", event.Type)
return nil
}
}
func (h *OrderHandler) handleOrderCreated(ctx context.Context, event *models.Event) error {
var order models.OrderEvent
data, _ := json.Marshal(event.Data)
if err := json.Unmarshal(data, &order); err != nil {
return fmt.Errorf("unmarshal order: %w", err)
}
log.Printf("Processing new order: id=%s, user=%s, amount=%.2f",
order.OrderID, order.UserID, order.Amount)
// Simulate processing
time.Sleep(100 * time.Millisecond)
// Business logic here:
// - Save to database
// - Send notifications
// - Update inventory
// - Process payment
return nil
}
func (h *OrderHandler) handleOrderUpdated(ctx context.Context, event *models.Event) error {
log.Printf("Order updated: %s", event.ID)
// Handle update logic
return nil
}
func (h *OrderHandler) handleOrderCancelled(ctx context.Context, event *models.Event) error {
log.Printf("Order cancelled: %s", event.ID)
// Handle cancellation logic
return nil
}
// RetryHandler wraps another handler with retry logic
type RetryHandler struct {
handler MessageHandler
maxRetries int
backoff time.Duration
}
func NewRetryHandler(handler MessageHandler, maxRetries int, backoff time.Duration) *RetryHandler {
return &RetryHandler{
handler: handler,
maxRetries: maxRetries,
backoff: backoff,
}
}
func (h *RetryHandler) Handle(ctx context.Context, event *models.Event) error {
var lastErr error
for attempt := 0; attempt <= h.maxRetries; attempt++ {
if attempt > 0 {
wait := h.backoff * time.Duration(1<<uint(attempt-1)) // Exponential backoff
log.Printf("Retry attempt %d/%d after %v", attempt, h.maxRetries, wait)
time.Sleep(wait)
}
if err := h.handler.Handle(ctx, event); err != nil {
lastErr = err
log.Printf("Handler error (attempt %d): %v", attempt, err)
continue
}
return nil
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
6. Producer Application
// cmd/producer/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"kafka-service/internal/models"
"kafka-service/internal/producer"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create producer
prod := producer.NewProducer(producer.Config{
Brokers: []string{"localhost:9092"},
Topic: "orders",
Compression: kafka.Snappy,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
MaxAttempts: 3,
RequiredAcks: -1, // All in-sync replicas
Async: false,
})
defer prod.Close()
// Start producing
go produceOrders(ctx, prod)
// Wait for shutdown
<-sigChan
log.Println("Shutting down producer...")
cancel()
// Print stats
stats := prod.Stats()
log.Printf("Producer stats: Messages=%d, Bytes=%d, Errors=%d",
stats.Messages, stats.Bytes, stats.Errors)
}
func produceOrders(ctx context.Context, prod *producer.Producer) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
event := &models.OrderEvent{
OrderID: uuid.New().String(),
UserID: fmt.Sprintf("user-%d", time.Now().Unix()%100),
Status: "pending",
Amount: float64(time.Now().Unix()%1000) / 10,
CreatedAt: time.Now(),
Items: []models.Item{
{
ProductID: "prod-123",
Quantity: 2,
Price: 29.99,
},
},
}
if err := prod.ProduceEvent(ctx, event); err != nil {
log.Printf("Failed to produce event: %v", err)
} else {
log.Printf("Produced order: %s", event.OrderID)
}
}
}
}
7. Consumer Application
// cmd/consumer/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
"kafka-service/internal/consumer"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create handler with retry logic
baseHandler := consumer.NewOrderHandler(3)
handler := consumer.NewRetryHandler(baseHandler, 3, 1*time.Second)
// Create consumer
cons := consumer.NewConsumer(consumer.Config{
Brokers: []string{"localhost:9092"},
Topic: "orders",
GroupID: "order-processor",
MinBytes: 1024, // 1KB
MaxBytes: 10 * 1024 * 1024, // 10MB
MaxWait: 500 * time.Millisecond,
CommitInterval: 1 * time.Second,
StartOffset: kafka.FirstOffset,
Partition: -1, // Auto-assign
}, handler)
defer cons.Close()
// Start consuming in goroutine
errChan := make(chan error, 1)
go func() {
errChan <- cons.Start(ctx)
}()
// Wait for shutdown or error
select {
case <-sigChan:
log.Println("Shutting down consumer...")
cancel()
case err := <-errChan:
if err != nil && err != context.Canceled {
log.Printf("Consumer error: %v", err)
}
}
// Print stats
stats := cons.Stats()
log.Printf("Consumer stats: Messages=%d, Bytes=%d, Lag=%d",
stats.Messages, stats.Bytes, stats.Lag)
}
Advanced Patterns
1. Exactly-Once Semantics (EOS)
// Transactional producer for exactly-once delivery
type TransactionalProducer struct {
transport *kafka.Transport
}
func NewTransactionalProducer(brokers []string, transactionalID string) *TransactionalProducer {
transport := &kafka.Transport{
TLS: nil, // Add TLS config if needed
}
// Initialize transactions
// Note: kafka-go has limited transaction support
// For production, consider confluent-kafka-go
return &TransactionalProducer{transport: transport}
}
// In practice, use Confluent's library for full transaction support:
/*
import "github.com/confluentinc/confluent-kafka-go/v2/kafka"
func createTransactionalProducer() (*kafka.Producer, error) {
return kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"transactional.id": "txn-producer-1",
"acks": "all",
})
}
*/
2. Offset Management
// Manual offset management for fine-grained control
type OffsetManager struct {
reader *kafka.Reader
committed map[int]int64 // partition -> offset
}
func (om *OffsetManager) Process(ctx context.Context) error {
msg, err := om.reader.FetchMessage(ctx)
if err != nil {
return err
}
// Process message
if err := om.handleMessage(msg); err != nil {
// Don't commit, will retry
return err
}
// Track committed offset
om.committed[msg.Partition] = msg.Offset + 1
// Commit
return om.reader.CommitMessages(ctx, msg)
}
// Reset to specific offset
func (om *OffsetManager) ResetToTimestamp(t time.Time) error {
// Seek to timestamp
return om.reader.SetOffsetAt(context.Background(), t)
}
// Get lag for monitoring
func (om *OffsetManager) GetLag() (int64, error) {
stats := om.reader.Stats()
return stats.Lag, nil
}
3. Consumer Rebalancing
// Custom rebalance listener
type RebalanceListener struct {
reader *kafka.Reader
}
func (rl *RebalanceListener) OnPartitionsAssigned(partitions []kafka.Partition) {
log.Printf("Partitions assigned: %v", partitions)
// Perform initialization for new partitions
for _, partition := range partitions {
log.Printf("Initializing partition %d", partition.ID)
// Load state, initialize resources, etc.
}
}
func (rl *RebalanceListener) OnPartitionsRevoked(partitions []kafka.Partition) {
log.Printf("Partitions revoked: %v", partitions)
// Cleanup before rebalancing
for _, partition := range partitions {
log.Printf("Cleaning up partition %d", partition.ID)
// Flush caches, commit offsets, etc.
}
}
4. Dead Letter Queue (DLQ)
type DLQHandler struct {
handler MessageHandler
dlqWriter *kafka.Writer
maxRetries int
}
func NewDLQHandler(handler MessageHandler, dlqTopic string, maxRetries int) *DLQHandler {
return &DLQHandler{
handler: handler,
dlqWriter: &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: dlqTopic,
},
maxRetries: maxRetries,
}
}
func (h *DLQHandler) Handle(ctx context.Context, event *models.Event) error {
retryCount := 0
if val, ok := event.Metadata["retry_count"]; ok {
fmt.Sscanf(val, "%d", &retryCount)
}
err := h.handler.Handle(ctx, event)
if err == nil {
return nil
}
retryCount++
if retryCount >= h.maxRetries {
// Send to DLQ
log.Printf("Max retries exceeded, sending to DLQ: %s", event.ID)
data, _ := event.Serialize()
dlqMsg := kafka.Message{
Key: []byte(event.ID),
Value: data,
Headers: []kafka.Header{
{Key: "error", Value: []byte(err.Error())},
{Key: "retry_count", Value: []byte(fmt.Sprintf("%d", retryCount))},
{Key: "original_topic", Value: []byte("orders")},
},
}
return h.dlqWriter.WriteMessages(ctx, dlqMsg)
}
// Update retry count and return error to trigger retry
event.Metadata["retry_count"] = fmt.Sprintf("%d", retryCount)
return err
}
5. Message Filtering and Routing
// Filter messages before processing
type FilteringConsumer struct {
consumer *Consumer
filters []MessageFilter
}
type MessageFilter func(*models.Event) bool
func (fc *FilteringConsumer) AddFilter(filter MessageFilter) {
fc.filters = append(fc.filters, filter)
}
func (fc *FilteringConsumer) shouldProcess(event *models.Event) bool {
for _, filter := range fc.filters {
if !filter(event) {
return false
}
}
return true
}
// Example filters
func EventTypeFilter(allowedTypes ...string) MessageFilter {
typeMap := make(map[string]bool)
for _, t := range allowedTypes {
typeMap[t] = true
}
return func(event *models.Event) bool {
return typeMap[event.Type]
}
}
func TimeRangeFilter(start, end time.Time) MessageFilter {
return func(event *models.Event) bool {
return event.Timestamp.After(start) && event.Timestamp.Before(end)
}
}
6. Schema Registry Integration
// Avro schema with schema registry
type AvroProducer struct {
writer *kafka.Writer
schemaRegistry string
schemas map[string]int // schema -> ID
}
func (ap *AvroProducer) ProduceAvro(ctx context.Context, schema string, data interface{}) error {
// Get schema ID from registry
schemaID, err := ap.getSchemaID(schema)
if err != nil {
return err
}
// Serialize with Avro
avroData, err := ap.serializeAvro(data, schema)
if err != nil {
return err
}
// Prepend schema ID (Confluent wire format)
// Format: [0x00, schemaID (4 bytes), avro data]
payload := make([]byte, 5+len(avroData))
payload[0] = 0
binary.BigEndian.PutUint32(payload[1:5], uint32(schemaID))
copy(payload[5:], avroData)
return ap.writer.WriteMessages(ctx, kafka.Message{
Value: payload,
})
}
func (ap *AvroProducer) getSchemaID(schema string) (int, error) {
// Check cache
if id, ok := ap.schemas[schema]; ok {
return id, nil
}
// Register schema with registry
// Implementation depends on schema registry API
return 0, nil
}
7. Metrics and Monitoring
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
messagesProduced = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_messages_produced_total",
Help: "Total number of messages produced",
},
[]string{"topic", "status"},
)
messagesConsumed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_messages_consumed_total",
Help: "Total number of messages consumed",
},
[]string{"topic", "partition", "consumer_group"},
)
consumerLag = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_consumer_lag",
Help: "Current consumer lag",
},
[]string{"topic", "partition", "consumer_group"},
)
processingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "kafka_message_processing_duration_seconds",
Help: "Message processing duration",
Buckets: prometheus.DefBuckets,
},
[]string{"topic", "event_type"},
)
)
// MetricsHandler wraps handler with metrics
type MetricsHandler struct {
handler MessageHandler
topic string
}
func (mh *MetricsHandler) Handle(ctx context.Context, event *models.Event) error {
start := time.Now()
err := mh.handler.Handle(ctx, event)
duration := time.Since(start).Seconds()
processingDuration.WithLabelValues(mh.topic, event.Type).Observe(duration)
status := "success"
if err != nil {
status = "error"
}
messagesConsumed.WithLabelValues(mh.topic, event.Metadata["partition"], "default").Inc()
return err
}
Dependencies
// go.mod
module kafka-service
go 1.21
require (
github.com/segmentio/kafka-go v0.4.47
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 // For advanced features
)
Docker Compose Setup
# deployments/docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka-1:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_COMPRESSION_TYPE: snappy
kafka-2:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
kafka-3:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9093,kafka-3:9094
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Best Practices
1. Topic Design
Partition Count: Choose based on target throughput and consumer parallelism.
// Rule of thumb: partitions = max(target_throughput_mb/s / 10, max_consumers)
// Example: 100 MB/s throughput, 20 consumers = max(10, 20) = 20 partitions
Naming Convention: Use hierarchical naming for organization.
<domain>.<entity>.<event_type>
Examples:
- commerce.orders.created
- user.profile.updated
- analytics.pageviews.tracked
2. Message Serialization
Use Schema Registry: Ensures backward/forward compatibility.
Avro over JSON: For better performance and schema evolution.
3. Consumer Group Management
One Consumer Group per Application: Isolate applications for independent scaling.
Monitor Lag: Set up alerts for consumer lag exceeding thresholds.
func monitorLag(consumer *Consumer) {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
stats := consumer.Stats()
if stats.Lag > 10000 {
alert("High consumer lag detected: %d", stats.Lag)
}
}
}
4. Error Handling
Idempotent Consumers: Design handlers to safely process duplicates.
Dead Letter Queue: Route failed messages for manual inspection.
Circuit Breaker: Prevent cascading failures in downstream systems.
5. Performance Optimization
Batch Processing: Process messages in batches when possible.
func (c *Consumer) processBatch(ctx context.Context, size int) error {
messages := make([]kafka.Message, 0, size)
for i := 0; i < size; i++ {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
break
}
messages = append(messages, msg)
}
// Process batch
if err := c.handler.HandleBatch(ctx, messages); err != nil {
return err
}
// Commit batch
return c.reader.CommitMessages(ctx, messages...)
}
Compression: Use Snappy or LZ4 for network efficiency.
Partition Assignment: Use sticky assignor to minimize rebalancing.
6. Security
Enable TLS: Encrypt data in transit.
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Transport: &kafka.Transport{
TLS: &tls.Config{
// TLS configuration
},
},
}
SASL Authentication: Secure client-broker communication.
ACLs: Implement topic-level access control.
7. Monitoring
Key Metrics to Track:
- Producer: throughput, latency, error rate
- Consumer: lag, throughput, rebalance frequency
- Broker: disk usage, network I/O, request latency
Set Up Alerts:
- Consumer lag > threshold
- Rebalancing frequency spike
- Producer error rate increase
8. Capacity Planning
Estimate Storage: storage = daily_throughput × retention_days × replication_factor
Sizing Brokers: Ensure disk I/O can handle peak write loads.
Network Bandwidth: Account for replication traffic (multiply by replication factor).
When to Use Kafka
✅ Perfect for:
- High-throughput event streaming (millions of events/sec)
- Event sourcing and CQRS patterns
- Log aggregation from distributed systems
- Real-time analytics pipelines
- Building event-driven microservices
❌ Not ideal for:
- Low-latency RPC (<10ms)
- Small-scale applications (operational complexity)
- Transient messaging without persistence needs
- Simple pub-sub with few consumers
Conclusion
Apache Kafka provides a robust, scalable platform for event streaming that can handle massive throughput while maintaining ordering guarantees and fault tolerance. The Go ecosystem offers excellent Kafka client libraries that make it easy to build production-grade event-driven systems.
Key takeaways:
- Use partitioning strategically for parallelism and ordering
- Implement proper error handling with DLQs
- Monitor consumer lag continuously
- Design for idempotency
- Leverage consumer groups for horizontal scaling
For additional patterns like Kafka Streams, Connect, and ksqlDB integration, explore the official Kafka documentation and consider the Confluent platform for enterprise features