Introduction
Apache Kafka is the backbone of event-driven architectures at companies like LinkedIn, Uber, and Airbnb. It lets services communicate asynchronously at massive scale — without coupling them together. Go is an excellent fit for Kafka workloads: lightweight goroutines, clean channel semantics, and minimal overhead make it easy to build high-throughput producers and consumers.
This guide skips the theory-heavy basics and focuses on what you actually build. We’ll cover:
- Kafka’s core model in plain terms
- Setting up producers and consumers with
kafka-go - Three real-world use cases with full code and diagrams
- Patterns for reliability: retries, dead-letter queues, and consumer groups
How Kafka Works (The Short Version)
Kafka organizes data into topics. Each topic is split into partitions — ordered, immutable logs. Producers append messages to partitions. Consumers read messages from partitions at their own pace, tracking their position with an offset.
Key properties to internalize:
| Concept | What it means for you |
|---|---|
| Offset | Each consumer tracks its own read position. Kafka doesn’t delete messages on read. |
| Consumer Group | Multiple consumers sharing a topic. Each partition is assigned to one consumer in the group. |
| Retention | Messages persist for a configurable duration (default 7 days), regardless of consumption. |
| Ordering | Guaranteed within a partition, not across partitions. Use a consistent message key to route related events to the same partition. |
Project Setup
We’ll use kafka-go — a pure Go library with a clean API and no CGo dependency.
go mod init kafka-guide
go get github.com/segmentio/kafka-go
For local development, spin up Kafka with Docker Compose:
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
docker-compose up -d
A Minimal Producer and Consumer
Before diving into use cases, here’s the skeleton you’ll reuse throughout:
// producer.go
package main
import (
"context"
"log"
kafka "github.com/segmentio/kafka-go"
)
func newWriter(topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: topic,
Balancer: &kafka.Hash{}, // route same key to same partition
RequiredAcks: kafka.RequireAll, // wait for leader + replicas
Async: false,
}
}
func publish(w *kafka.Writer, key, value string) error {
return w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(key),
Value: []byte(value),
},
)
}
// consumer.go
package main
import (
"context"
"log"
kafka "github.com/segmentio/kafka-go"
)
func newReader(topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: groupID, // enables consumer group offset management
MinBytes: 1,
MaxBytes: 10e6, // 10MB
})
}
func consume(r *kafka.Reader) {
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
log.Printf("read error: %v", err)
continue
}
log.Printf("[%s] key=%s value=%s", r.Config().Topic, msg.Key, msg.Value)
}
}
Note on
RequiredAcks:RequireAllwaits for the leader and all in-sync replicas to acknowledge the write. Slower, but safe. UseRequireOneif you need throughput over durability.
Use Case 1: E-Commerce Order Processing
The Problem
A user places an order. Immediately, several things need to happen in parallel:
- Inventory must be reserved
- A payment must be initiated
- A confirmation email must be sent
- Analytics must be updated
Doing these synchronously in a single HTTP handler means your checkout endpoint is only as fast as your slowest downstream service. One failure cascades into a failed order.
The Kafka Solution
The checkout service publishes a single order.placed event. Each downstream service consumes it independently, at its own pace, in its own consumer group.
The checkout handler returns 202 Accepted immediately. Downstream services process the event on their own schedule.
The Code
Shared types:
// types.go
package main
import "encoding/json"
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type OrderPlacedEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
Total float64 `json:"total"`
CreatedAt string `json:"created_at"`
}
func (e OrderPlacedEvent) Encode() ([]byte, error) {
return json.Marshal(e)
}
func DecodeOrder(data []byte) (OrderPlacedEvent, error) {
var e OrderPlacedEvent
return e, json.Unmarshal(data, &e)
}
Checkout producer:
// checkout/main.go
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
)
const orderTopic = "order.placed"
func main() {
writer := newWriter(orderTopic)
defer writer.Close()
http.HandleFunc("/checkout", func(w http.ResponseWriter, r *http.Request) {
order := OrderPlacedEvent{
OrderID: uuid.New().String(),
CustomerID: r.Header.Get("X-Customer-ID"),
Items: []OrderItem{
{ProductID: "SKU-001", Quantity: 2, Price: 29.99},
},
Total: 59.98,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
payload, err := order.Encode()
if err != nil {
http.Error(w, "encode error", http.StatusInternalServerError)
return
}
// Key by customerID → same customer's orders go to same partition
if err := publish(writer, order.CustomerID, string(payload)); err != nil {
http.Error(w, "publish error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
fmt.Fprintf(w, `{"order_id":%q}`, order.OrderID)
})
log.Fatal(http.ListenAndServe(":8080", nil))
}
Inventory consumer (same pattern for payment, email, analytics):
// inventory/main.go
package main
import (
"context"
"log"
kafka "github.com/segmentio/kafka-go"
)
func main() {
reader := newReader("order.placed", "inventory-service")
defer reader.Close()
log.Println("Inventory service listening for orders...")
for {
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("fetch error: %v", err)
continue
}
order, err := DecodeOrder(msg.Value)
if err != nil {
log.Printf("decode error: %v — skipping", err)
// commit anyway to avoid infinite retry on bad message
_ = reader.CommitMessages(context.Background(), msg)
continue
}
if err := reserveInventory(order); err != nil {
log.Printf("reserve error for order %s: %v", order.OrderID, err)
// Don't commit — message will be re-delivered
continue
}
// Only commit after successful processing
if err := reader.CommitMessages(context.Background(), msg); err != nil {
log.Printf("commit error: %v", err)
}
}
}
func reserveInventory(order OrderPlacedEvent) error {
for _, item := range order.Items {
log.Printf("Reserving %d x %s for order %s", item.Quantity, item.ProductID, order.OrderID)
// ... call your inventory DB here
}
return nil
}
Why
FetchMessageinstead ofReadMessage?
ReadMessageauto-commits after each read.FetchMessage+CommitMessagesgives you manual control — you only commit once processing succeeds. This prevents data loss on crashes.
Use Case 2: Real-Time Analytics Pipeline
The Problem
You want to track user behavior — page views, clicks, searches — and make it available for dashboards within seconds, not hours. Traditional batch ETL jobs run hourly at best.
The Kafka Solution
Events flow from browser → API → Kafka → stream processor → data store. Multiple processors can tap the same stream for different aggregations.
The Code
Event ingestion API:
// events-api/main.go
package main
import (
"encoding/json"
"io"
"log"
"net/http"
"time"
kafka "github.com/segmentio/kafka-go"
)
type UserEvent struct {
Type string `json:"type"` // "page_view", "click", "search"
UserID string `json:"user_id"`
SessionID string `json:"session_id"`
Payload map[string]string `json:"payload"`
Timestamp string `json:"timestamp"`
}
func main() {
writer := newWriter("user.events")
defer writer.Close()
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
var event UserEvent
if err := json.Unmarshal(body, &event); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest)
return
}
event.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
payload, _ := json.Marshal(event)
// Key by session ID → all events in a session go to same partition
if err := publish(writer, event.SessionID, string(payload)); err != nil {
log.Printf("publish error: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
})
log.Fatal(http.ListenAndServe(":8081", nil))
}
Page view counter (fan-out consumer):
// page-view-counter/main.go
package main
import (
"context"
"encoding/json"
"log"
"sync/atomic"
"time"
kafka "github.com/segmentio/kafka-go"
)
type counter struct {
counts map[string]*atomic.Int64
}
func newCounter() *counter {
return &counter{counts: make(map[string]*atomic.Int64)}
}
func (c *counter) increment(page string) {
if _, ok := c.counts[page]; !ok {
c.counts[page] = &atomic.Int64{}
}
c.counts[page].Add(1)
}
func (c *counter) snapshot() map[string]int64 {
snap := make(map[string]int64, len(c.counts))
for k, v := range c.counts {
snap[k] = v.Load()
}
return snap
}
func main() {
reader := newReader("user.events", "page-views")
defer reader.Close()
c := newCounter()
// Flush counts to Redis every 5 seconds
go func() {
tick := time.NewTicker(5 * time.Second)
for range tick.C {
snap := c.snapshot()
for page, count := range snap {
log.Printf("[dashboard] %s → %d views", page, count)
// rdb.IncrBy(ctx, "views:"+page, count)
}
}
}()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("read error: %v", err)
continue
}
var event struct {
Type string `json:"type"`
Payload map[string]string `json:"payload"`
}{}
if err := json.Unmarshal(msg.Value, &event); err != nil {
continue
}
if event.Type == "page_view" {
page := event.Payload["path"]
c.increment(page)
}
}
}
How multiple consumer groups fan out:
Each group gets a full copy of every message. Within a group, partitions are divided among consumers — three partitions means up to three consumers can process in parallel.
Use Case 3: Microservices Event Coordination
The Problem
You have a distributed workflow: a user uploads a video → it must be transcoded, thumbnail-generated, and indexed for search. Each step depends on the previous one completing. How do you coordinate this without tight coupling or a central orchestrator?
The Kafka Solution
Each service publishes a completion event that the next service listens to. The workflow advances through the event log. No service calls another directly.
The Code
Shared event types:
// events/video.go
package events
type VideoUploaded struct {
VideoID string `json:"video_id"`
S3Key string `json:"s3_key"`
UserID string `json:"user_id"`
SizeBytes int64 `json:"size_bytes"`
}
type VideoTranscoded struct {
VideoID string `json:"video_id"`
Formats []string `json:"formats"` // ["720p", "1080p", "4k"]
Duration int `json:"duration_seconds"`
}
type VideoReady struct {
VideoID string `json:"video_id"`
UserID string `json:"user_id"`
ThumbnailURL string `json:"thumbnail_url"`
}
Transcode service — consumes video.uploaded, publishes video.transcoded:
// transcode/main.go
package main
import (
"context"
"encoding/json"
"log"
"time"
"events"
kafka "github.com/segmentio/kafka-go"
)
func main() {
reader := newReader("video.uploaded", "transcode-service")
writer := newWriter("video.transcoded")
defer reader.Close()
defer writer.Close()
for {
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("fetch error: %v", err)
continue
}
var uploaded events.VideoUploaded
if err := json.Unmarshal(msg.Value, &uploaded); err != nil {
log.Printf("decode error: %v", err)
_ = reader.CommitMessages(context.Background(), msg)
continue
}
formats, err := transcode(uploaded)
if err != nil {
log.Printf("transcode failed for %s: %v", uploaded.VideoID, err)
// Retry by not committing. In production, add retry limit
// and route to a dead-letter topic after N attempts.
continue
}
readyEvent := events.VideoTranscoded{
VideoID: uploaded.VideoID,
Formats: formats,
Duration: 142,
}
payload, _ := json.Marshal(readyEvent)
if err := publish(writer, uploaded.VideoID, string(payload)); err != nil {
log.Printf("publish error: %v", err)
continue
}
_ = reader.CommitMessages(context.Background(), msg)
log.Printf("Transcoded video %s → formats: %v", uploaded.VideoID, formats)
}
}
func transcode(v events.VideoUploaded) ([]string, error) {
log.Printf("Transcoding %s (%d bytes)...", v.VideoID, v.SizeBytes)
time.Sleep(2 * time.Second) // simulate work
return []string{"720p", "1080p"}, nil
}
The chain in one diagram:
This is the choreography pattern: each service reacts to events without a central coordinator. Adding a new step (e.g., a virus scanner) means adding a consumer to video.uploaded — nothing else changes.
Reliability Patterns
Dead-Letter Queue
When a message fails processing repeatedly, don’t block the partition forever. Route it to a dead-letter topic (DLQ) for investigation.
// retry with dead-letter queue
func processWithRetry(
reader *kafka.Reader,
dlqWriter *kafka.Writer,
maxAttempts int,
) {
for {
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("fetch error: %v", err)
continue
}
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
if lastErr = processOrder(msg.Value); lastErr == nil {
break
}
backoff := time.Duration(attempt*attempt) * time.Second
log.Printf("attempt %d failed: %v — retrying in %s", attempt, lastErr, backoff)
time.Sleep(backoff)
}
if lastErr != nil {
// Send to dead-letter topic with error metadata
_ = dlqWriter.WriteMessages(context.Background(), kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: []kafka.Header{
{Key: "error", Value: []byte(lastErr.Error())},
{Key: "original-topic", Value: []byte(reader.Config().Topic)},
},
})
}
_ = reader.CommitMessages(context.Background(), msg)
}
}
func processOrder(data []byte) error {
// your business logic here
return nil
}
Graceful Shutdown
Always handle shutdown signals to avoid losing in-flight messages:
func main() {
reader := newReader("orders", "payment-service")
ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
log.Println("Shutting down...")
cancel()
}()
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
log.Println("Context cancelled, exiting cleanly")
break
}
log.Printf("fetch error: %v", err)
continue
}
if err := processOrder(msg.Value); err == nil {
_ = reader.CommitMessages(ctx, msg)
}
}
_ = reader.Close()
}
Choosing the Right Message Key
Your partition key determines ordering and parallelism. Pick it carefully:
| Key choice | Parallelism | Ordering guarantee |
|---|---|---|
| UserID | Up to partition count | Per-user |
| OrderID | Up to partition count | Per-order |
| Random / nil | Maximum | None |
Summary
Here’s what we built:
| Use Case | Topic(s) | Pattern |
|---|---|---|
| E-commerce checkout | order.placed |
Fan-out to independent services |
| Real-time analytics | user.events |
Multiple consumer groups, same stream |
| Video processing | video.uploaded → .transcoded → .ready |
Event chain / choreography |
The three ideas that make all of these work:
- Publish once, consume many times — different consumer groups get the full stream independently
- Manual commit for reliability — commit only after successful processing
- Key for ordering, not identity — route related messages to the same partition
What to Read Next
- Go Worker Pool Pattern — combine with Kafka consumers for bounded processing concurrency
- Circuit Breaker Pattern — protect downstream services called inside consumer handlers
- Context Pattern — propagate deadlines and cancellation through your Kafka processing pipeline
Resources
Questions or corrections? Reach me at [email protected] or @colossus21.