Introduction

Apache Kafka is the backbone of event-driven architectures at companies like LinkedIn, Uber, and Airbnb. It lets services communicate asynchronously at massive scale — without coupling them together. Go is an excellent fit for Kafka workloads: lightweight goroutines, clean channel semantics, and minimal overhead make it easy to build high-throughput producers and consumers.

This guide skips the theory-heavy basics and focuses on what you actually build. We’ll cover:

  • Kafka’s core model in plain terms
  • Setting up producers and consumers with kafka-go
  • Three real-world use cases with full code and diagrams
  • Patterns for reliability: retries, dead-letter queues, and consumer groups

How Kafka Works (The Short Version)

Kafka organizes data into topics. Each topic is split into partitions — ordered, immutable logs. Producers append messages to partitions. Consumers read messages from partitions at their own pace, tracking their position with an offset.

flowchart LR P1[Producer A] --> T P2[Producer B] --> T subgraph T[Topic: orders] direction TB PA[Partition 0\n─────────\nmsg 0\nmsg 1\nmsg 2] PB[Partition 1\n─────────\nmsg 0\nmsg 1\nmsg 2] PC[Partition 2\n─────────\nmsg 0\nmsg 1\nmsg 2] end T --> C1[Consumer Group A] T --> C2[Consumer Group B]

Key properties to internalize:

Concept What it means for you
Offset Each consumer tracks its own read position. Kafka doesn’t delete messages on read.
Consumer Group Multiple consumers sharing a topic. Each partition is assigned to one consumer in the group.
Retention Messages persist for a configurable duration (default 7 days), regardless of consumption.
Ordering Guaranteed within a partition, not across partitions. Use a consistent message key to route related events to the same partition.

Project Setup

We’ll use kafka-go — a pure Go library with a clean API and no CGo dependency.

go mod init kafka-guide
go get github.com/segmentio/kafka-go

For local development, spin up Kafka with Docker Compose:

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
docker-compose up -d

A Minimal Producer and Consumer

Before diving into use cases, here’s the skeleton you’ll reuse throughout:

// producer.go
package main

import (
	"context"
	"log"

	kafka "github.com/segmentio/kafka-go"
)

func newWriter(topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:                   kafka.TCP("localhost:9092"),
		Topic:                  topic,
		Balancer:               &kafka.Hash{}, // route same key to same partition
		RequiredAcks:           kafka.RequireAll, // wait for leader + replicas
		Async:                  false,
	}
}

func publish(w *kafka.Writer, key, value string) error {
	return w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte(key),
			Value: []byte(value),
		},
	)
}
// consumer.go
package main

import (
	"context"
	"log"

	kafka "github.com/segmentio/kafka-go"
)

func newReader(topic, groupID string) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"localhost:9092"},
		Topic:    topic,
		GroupID:  groupID,  // enables consumer group offset management
		MinBytes: 1,
		MaxBytes: 10e6,    // 10MB
	})
}

func consume(r *kafka.Reader) {
	for {
		msg, err := r.ReadMessage(context.Background())
		if err != nil {
			log.Printf("read error: %v", err)
			continue
		}
		log.Printf("[%s] key=%s value=%s", r.Config().Topic, msg.Key, msg.Value)
	}
}

Note on RequiredAcks: RequireAll waits for the leader and all in-sync replicas to acknowledge the write. Slower, but safe. Use RequireOne if you need throughput over durability.


Use Case 1: E-Commerce Order Processing

The Problem

A user places an order. Immediately, several things need to happen in parallel:

  • Inventory must be reserved
  • A payment must be initiated
  • A confirmation email must be sent
  • Analytics must be updated

Doing these synchronously in a single HTTP handler means your checkout endpoint is only as fast as your slowest downstream service. One failure cascades into a failed order.

The Kafka Solution

The checkout service publishes a single order.placed event. Each downstream service consumes it independently, at its own pace, in its own consumer group.

sequenceDiagram actor User participant Checkout as Checkout Service participant Kafka as Kafka (order.placed) participant Inventory as Inventory Service participant Payment as Payment Service participant Email as Email Service participant Analytics as Analytics Service User->>Checkout: POST /checkout Checkout->>Kafka: publish order.placed {orderID, items, total} Checkout-->>User: 202 Accepted par Parallel consumers Kafka->>Inventory: reserve items Kafka->>Payment: charge card Kafka->>Email: send confirmation Kafka->>Analytics: record sale end

The checkout handler returns 202 Accepted immediately. Downstream services process the event on their own schedule.

The Code

Shared types:

// types.go
package main

import "encoding/json"

type OrderItem struct {
	ProductID string  `json:"product_id"`
	Quantity  int     `json:"quantity"`
	Price     float64 `json:"price"`
}

type OrderPlacedEvent struct {
	OrderID    string      `json:"order_id"`
	CustomerID string      `json:"customer_id"`
	Items      []OrderItem `json:"items"`
	Total      float64     `json:"total"`
	CreatedAt  string      `json:"created_at"`
}

func (e OrderPlacedEvent) Encode() ([]byte, error) {
	return json.Marshal(e)
}

func DecodeOrder(data []byte) (OrderPlacedEvent, error) {
	var e OrderPlacedEvent
	return e, json.Unmarshal(data, &e)
}

Checkout producer:

// checkout/main.go
package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/google/uuid"
	kafka "github.com/segmentio/kafka-go"
)

const orderTopic = "order.placed"

func main() {
	writer := newWriter(orderTopic)
	defer writer.Close()

	http.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) {
		order := OrderPlacedEvent{
			OrderID:    uuid.New().String(),
			CustomerID: r.Header.Get("X-Customer-ID"),
			Items: []OrderItem{
				{ProductID: "SKU-001", Quantity: 2, Price: 29.99},
			},
			Total:     59.98,
			CreatedAt: time.Now().UTC().Format(time.RFC3339),
		}

		payload, err := order.Encode()
		if err != nil {
			http.Error(w, "encode error", http.StatusInternalServerError)
			return
		}

		// Key by customerID → same customer's orders go to same partition
		if err := publish(writer, order.CustomerID, string(payload)); err != nil {
			http.Error(w, "publish error", http.StatusInternalServerError)
			return
		}

		w.WriteHeader(http.StatusAccepted)
		fmt.Fprintf(w, `{"order_id":%q}`, order.OrderID)
	})

	log.Fatal(http.ListenAndServe(":8080", nil))
}

Inventory consumer (same pattern for payment, email, analytics):

// inventory/main.go
package main

import (
	"context"
	"log"

	kafka "github.com/segmentio/kafka-go"
)

func main() {
	reader := newReader("order.placed", "inventory-service")
	defer reader.Close()

	log.Println("Inventory service listening for orders...")

	for {
		msg, err := reader.FetchMessage(context.Background())
		if err != nil {
			log.Printf("fetch error: %v", err)
			continue
		}

		order, err := DecodeOrder(msg.Value)
		if err != nil {
			log.Printf("decode error: %v — skipping", err)
			// commit anyway to avoid infinite retry on bad message
			_ = reader.CommitMessages(context.Background(), msg)
			continue
		}

		if err := reserveInventory(order); err != nil {
			log.Printf("reserve error for order %s: %v", order.OrderID, err)
			// Don't commit — message will be re-delivered
			continue
		}

		// Only commit after successful processing
		if err := reader.CommitMessages(context.Background(), msg); err != nil {
			log.Printf("commit error: %v", err)
		}
	}
}

func reserveInventory(order OrderPlacedEvent) error {
	for _, item := range order.Items {
		log.Printf("Reserving %d x %s for order %s", item.Quantity, item.ProductID, order.OrderID)
		// ... call your inventory DB here
	}
	return nil
}

Why FetchMessage instead of ReadMessage?
ReadMessage auto-commits after each read. FetchMessage + CommitMessages gives you manual control — you only commit once processing succeeds. This prevents data loss on crashes.


Use Case 2: Real-Time Analytics Pipeline

The Problem

You want to track user behavior — page views, clicks, searches — and make it available for dashboards within seconds, not hours. Traditional batch ETL jobs run hourly at best.

The Kafka Solution

Events flow from browser → API → Kafka → stream processor → data store. Multiple processors can tap the same stream for different aggregations.

flowchart TD Browser[Browser / Mobile App] API[Events API\nGo HTTP Server] Browser -->|POST /events| API API -->|publish| K1[Kafka: user.events] K1 -->|consumer group: page-views| PV[Page View Counter] K1 -->|consumer group: search-agg| SA[Search Aggregator] K1 -->|consumer group: session-tracker| ST[Session Tracker] PV --> Redis[(Redis\nReal-time Counters)] SA --> PG[(PostgreSQL\nSearch Analytics)] ST --> PG Redis --> Dash[Live Dashboard] PG --> Dash

The Code

Event ingestion API:

// events-api/main.go
package main

import (
	"encoding/json"
	"io"
	"log"
	"net/http"
	"time"

	kafka "github.com/segmentio/kafka-go"
)

type UserEvent struct {
	Type      string            `json:"type"`       // "page_view", "click", "search"
	UserID    string            `json:"user_id"`
	SessionID string            `json:"session_id"`
	Payload   map[string]string `json:"payload"`
	Timestamp string            `json:"timestamp"`
}

func main() {
	writer := newWriter("user.events")
	defer writer.Close()

	http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
		body, err := io.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "bad request", http.StatusBadRequest)
			return
		}

		var event UserEvent
		if err := json.Unmarshal(body, &event); err != nil {
			http.Error(w, "invalid JSON", http.StatusBadRequest)
			return
		}

		event.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)

		payload, _ := json.Marshal(event)

		// Key by session ID → all events in a session go to same partition
		if err := publish(writer, event.SessionID, string(payload)); err != nil {
			log.Printf("publish error: %v", err)
			http.Error(w, "internal error", http.StatusInternalServerError)
			return
		}

		w.WriteHeader(http.StatusNoContent)
	})

	log.Fatal(http.ListenAndServe(":8081", nil))
}

Page view counter (fan-out consumer):

// page-view-counter/main.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"sync/atomic"
	"time"

	kafka "github.com/segmentio/kafka-go"
)

type counter struct {
	counts map[string]*atomic.Int64
}

func newCounter() *counter {
	return &counter{counts: make(map[string]*atomic.Int64)}
}

func (c *counter) increment(page string) {
	if _, ok := c.counts[page]; !ok {
		c.counts[page] = &atomic.Int64{}
	}
	c.counts[page].Add(1)
}

func (c *counter) snapshot() map[string]int64 {
	snap := make(map[string]int64, len(c.counts))
	for k, v := range c.counts {
		snap[k] = v.Load()
	}
	return snap
}

func main() {
	reader := newReader("user.events", "page-views")
	defer reader.Close()

	c := newCounter()

	// Flush counts to Redis every 5 seconds
	go func() {
		tick := time.NewTicker(5 * time.Second)
		for range tick.C {
			snap := c.snapshot()
			for page, count := range snap {
				log.Printf("[dashboard] %s → %d views", page, count)
				// rdb.IncrBy(ctx, "views:"+page, count)
			}
		}
	}()

	for {
		msg, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Printf("read error: %v", err)
			continue
		}

		var event struct {
			Type    string            `json:"type"`
			Payload map[string]string `json:"payload"`
		}{}

		if err := json.Unmarshal(msg.Value, &event); err != nil {
			continue
		}

		if event.Type == "page_view" {
			page := event.Payload["path"]
			c.increment(page)
		}
	}
}

How multiple consumer groups fan out:

flowchart LR subgraph Kafka [Kafka: user.events — 3 partitions] P0[Partition 0] P1[Partition 1] P2[Partition 2] end subgraph G1 [Consumer Group: page-views] C1A[Consumer 1A → P0] C1B[Consumer 1B → P1] C1C[Consumer 1C → P2] end subgraph G2 [Consumer Group: search-agg] C2A[Consumer 2A → P0, P1, P2] end P0 --> C1A P1 --> C1B P2 --> C1C P0 --> C2A P1 --> C2A P2 --> C2A

Each group gets a full copy of every message. Within a group, partitions are divided among consumers — three partitions means up to three consumers can process in parallel.


Use Case 3: Microservices Event Coordination

The Problem

You have a distributed workflow: a user uploads a video → it must be transcoded, thumbnail-generated, and indexed for search. Each step depends on the previous one completing. How do you coordinate this without tight coupling or a central orchestrator?

The Kafka Solution

Each service publishes a completion event that the next service listens to. The workflow advances through the event log. No service calls another directly.

sequenceDiagram participant Upload as Upload Service participant K1 as Kafka: video.uploaded participant Transcode as Transcode Service participant K2 as Kafka: video.transcoded participant Thumb as Thumbnail Service participant Search as Search Indexer participant K3 as Kafka: video.ready participant User as User Notification Upload->>K1: video.uploaded {videoID, s3Key} K1->>Transcode: consume Transcode->>K2: video.transcoded {videoID, formats} par K2->>Thumb: generate thumbnail K2->>Search: index metadata end Thumb->>K3: video.ready {videoID} K3->>User: notify user

The Code

Shared event types:

// events/video.go
package events

type VideoUploaded struct {
	VideoID   string `json:"video_id"`
	S3Key     string `json:"s3_key"`
	UserID    string `json:"user_id"`
	SizeBytes int64  `json:"size_bytes"`
}

type VideoTranscoded struct {
	VideoID  string   `json:"video_id"`
	Formats  []string `json:"formats"`  // ["720p", "1080p", "4k"]
	Duration int      `json:"duration_seconds"`
}

type VideoReady struct {
	VideoID      string `json:"video_id"`
	UserID       string `json:"user_id"`
	ThumbnailURL string `json:"thumbnail_url"`
}

Transcode service — consumes video.uploaded, publishes video.transcoded:

// transcode/main.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"events"
	kafka "github.com/segmentio/kafka-go"
)

func main() {
	reader := newReader("video.uploaded", "transcode-service")
	writer := newWriter("video.transcoded")
	defer reader.Close()
	defer writer.Close()

	for {
		msg, err := reader.FetchMessage(context.Background())
		if err != nil {
			log.Printf("fetch error: %v", err)
			continue
		}

		var uploaded events.VideoUploaded
		if err := json.Unmarshal(msg.Value, &uploaded); err != nil {
			log.Printf("decode error: %v", err)
			_ = reader.CommitMessages(context.Background(), msg)
			continue
		}

		formats, err := transcode(uploaded)
		if err != nil {
			log.Printf("transcode failed for %s: %v", uploaded.VideoID, err)
			// Retry by not committing. In production, add retry limit
			// and route to a dead-letter topic after N attempts.
			continue
		}

		readyEvent := events.VideoTranscoded{
			VideoID:  uploaded.VideoID,
			Formats:  formats,
			Duration: 142,
		}

		payload, _ := json.Marshal(readyEvent)
		if err := publish(writer, uploaded.VideoID, string(payload)); err != nil {
			log.Printf("publish error: %v", err)
			continue
		}

		_ = reader.CommitMessages(context.Background(), msg)
		log.Printf("Transcoded video %s → formats: %v", uploaded.VideoID, formats)
	}
}

func transcode(v events.VideoUploaded) ([]string, error) {
	log.Printf("Transcoding %s (%d bytes)...", v.VideoID, v.SizeBytes)
	time.Sleep(2 * time.Second) // simulate work
	return []string{"720p", "1080p"}, nil
}

The chain in one diagram:

flowchart LR U[Upload Service] -->|video.uploaded| K1[Kafka] K1 -->|consumes| T[Transcode Service] T -->|video.transcoded| K2[Kafka] K2 -->|consumes| TH[Thumbnail Service] K2 -->|consumes| SI[Search Indexer] TH -->|video.ready| K3[Kafka] K3 -->|consumes| N[Notification Service]

This is the choreography pattern: each service reacts to events without a central coordinator. Adding a new step (e.g., a virus scanner) means adding a consumer to video.uploaded — nothing else changes.


Reliability Patterns

Dead-Letter Queue

When a message fails processing repeatedly, don’t block the partition forever. Route it to a dead-letter topic (DLQ) for investigation.

flowchart TD K[Kafka: orders] --> C[Consumer] C -->|success| Commit[Commit Offset] C -->|fail attempt 1-3| Retry[Retry with backoff] Retry --> C C -->|fail attempt 4| DLQ[Kafka: orders.dlq] DLQ --> Alert[Alert / Manual Review]
// retry with dead-letter queue
func processWithRetry(
	reader *kafka.Reader,
	dlqWriter *kafka.Writer,
	maxAttempts int,
) {
	for {
		msg, err := reader.FetchMessage(context.Background())
		if err != nil {
			log.Printf("fetch error: %v", err)
			continue
		}

		var lastErr error
		for attempt := 1; attempt <= maxAttempts; attempt++ {
			if lastErr = processOrder(msg.Value); lastErr == nil {
				break
			}
			backoff := time.Duration(attempt*attempt) * time.Second
			log.Printf("attempt %d failed: %v — retrying in %s", attempt, lastErr, backoff)
			time.Sleep(backoff)
		}

		if lastErr != nil {
			// Send to dead-letter topic with error metadata
			_ = dlqWriter.WriteMessages(context.Background(), kafka.Message{
				Key:   msg.Key,
				Value: msg.Value,
				Headers: []kafka.Header{
					{Key: "error", Value: []byte(lastErr.Error())},
					{Key: "original-topic", Value: []byte(reader.Config().Topic)},
				},
			})
		}

		_ = reader.CommitMessages(context.Background(), msg)
	}
}

func processOrder(data []byte) error {
	// your business logic here
	return nil
}

Graceful Shutdown

Always handle shutdown signals to avoid losing in-flight messages:

func main() {
	reader := newReader("orders", "payment-service")

	ctx, cancel := context.WithCancel(context.Background())

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		<-sigs
		log.Println("Shutting down...")
		cancel()
	}()

	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				log.Println("Context cancelled, exiting cleanly")
				break
			}
			log.Printf("fetch error: %v", err)
			continue
		}

		if err := processOrder(msg.Value); err == nil {
			_ = reader.CommitMessages(ctx, msg)
		}
	}

	_ = reader.Close()
}

Choosing the Right Message Key

Your partition key determines ordering and parallelism. Pick it carefully:

flowchart TD Q{What ordering do you need?} Q -->|Events for the same user must be ordered| U[Key: UserID] Q -->|Events for the same order must be ordered| O[Key: OrderID] Q -->|Events for the same session must be ordered| S[Key: SessionID] Q -->|No ordering needed, maximize throughput| R[Key: Random / empty] U --> Note[Same key → same partition\nOrdering guaranteed] O --> Note S --> Note R --> Spread[Distributed across all partitions]
Key choice Parallelism Ordering guarantee
UserID Up to partition count Per-user
OrderID Up to partition count Per-order
Random / nil Maximum None

Summary

Here’s what we built:

Use Case Topic(s) Pattern
E-commerce checkout order.placed Fan-out to independent services
Real-time analytics user.events Multiple consumer groups, same stream
Video processing video.uploaded → .transcoded → .ready Event chain / choreography

The three ideas that make all of these work:

  1. Publish once, consume many times — different consumer groups get the full stream independently
  2. Manual commit for reliability — commit only after successful processing
  3. Key for ordering, not identity — route related messages to the same partition


Resources


Questions or corrections? Reach me at [email protected] or @colossus21.