Backend Communication
Current: NATS & JetStream

What is NATS?

NATS is a high-performance, cloud-native messaging system designed for microservices, IoT, and edge computing. It provides a simple yet powerful pub/sub model with subject-based addressing, making it ideal for building distributed systems that require fast, reliable communication.

JetStream is NATS 2.0’s persistence layer, adding streaming, message replay, and exactly-once delivery guarantees on top of NATS Core’s at-most-once delivery.

Key characteristics:

  • Subject-based addressing - Hierarchical topic naming (e.g., orders.us.east.123)
  • High performance - Millions of messages per second
  • Multiple delivery modes - At-most-once (Core) and at-least-once (JetStream)
  • Lightweight - Single binary, minimal resource usage
  • Built-in clustering - High availability and scalability

Architecture Overview

graph TB subgraph "NATS Core (At-Most-Once)" P1[Publisher] -->|orders.created| NC[NATS Core Server] NC -->|orders.created| S1[Subscriber 1] NC -->|orders.created| S2[Subscriber 2] NC -->|orders.>| S3[Wildcard Sub] end subgraph "JetStream (Persistence)" P2[Publisher] -->|events.user.*| JS[JetStream] JS -->|Persist| DISK[Disk Storage] JS -->|Replay| C1[Consumer 1] JS -->|Replay| C2[Consumer 2] end subgraph "Queue Groups (Load Balancing)" P3[Publisher] -->|tasks.process| QG[Queue Group] QG -->|Round Robin| W1[Worker 1] QG -->|Round Robin| W2[Worker 2] QG -->|Round Robin| W3[Worker 3] end style NC fill:#e1f5ff style JS fill:#ffe1e1 style QG fill:#fff4e1

Subject Hierarchies and Wildcards

NATS uses a dot-separated subject hierarchy:

orders.created         # Exact subject
orders.*.east          # Single token wildcard (*)
orders.>               # Multi-token wildcard (>)

Examples:

  • sensors.temperature.us.west.1 - Specific sensor
  • sensors.*.us.> - All US sensors
  • sensors.> - All sensors

Real-World Use Cases

NATS Core:

  • Service mesh communication - Fast RPC between microservices
  • Real-time notifications - Fire-and-forget events
  • Metrics collection - High-frequency telemetry data
  • Command distribution - Request-response patterns

JetStream:

  • Event sourcing - Persistent event log
  • Message queuing - Reliable background jobs
  • Audit logging - Compliance and traceability
  • Data replication - Multi-region event streaming

Implementation in Go

Project Structure

nats-demo/
├── core/
│   ├── pub_sub.go
│   ├── request_reply.go
│   └── queue_groups.go
├── jetstream/
│   ├── producer.go
│   ├── consumer.go
│   └── streams.go
└── go.mod

Dependencies

go.mod

module nats-demo

go 1.21

require (
    github.com/nats-io/nats.go v1.31.0
)

1. NATS Core - Publish/Subscribe

core/pub_sub.go

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // Connect to NATS server
    nc, err := nats.Connect(nats.DefaultURL,
        nats.Name("PubSub Demo"),
        nats.MaxReconnects(-1),
        nats.ReconnectWait(2*time.Second),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Simple subscriber
    sub1, err := nc.Subscribe("orders.>", func(msg *nats.Msg) {
        log.Printf("[Sub1] Received on %s: %s", msg.Subject, string(msg.Data))
    })
    if err != nil {
        log.Fatal(err)
    }
    defer sub1.Unsubscribe()

    // Wildcard subscriber
    sub2, err := nc.Subscribe("orders.*.created", func(msg *nats.Msg) {
        log.Printf("[Sub2] New order on %s: %s", msg.Subject, string(msg.Data))
    })
    if err != nil {
        log.Fatal(err)
    }
    defer sub2.Unsubscribe()

    // Ensure subscriptions are active
    if err := nc.Flush(); err != nil {
        log.Fatal(err)
    }

    // Publish messages
    subjects := []string{
        "orders.us.created",
        "orders.eu.created",
        "orders.us.updated",
        "orders.us.deleted",
    }

    for i, subject := range subjects {
        data := fmt.Sprintf("Order #%d", i+1)
        if err := nc.Publish(subject, []byte(data)); err != nil {
            log.Printf("Error publishing: %v", err)
        }
        log.Printf("[Publisher] Sent to %s: %s", subject, data)
        time.Sleep(500 * time.Millisecond)
    }

    // Wait for messages to be processed
    time.Sleep(2 * time.Second)

    // Print stats
    log.Printf("\nSubscription Stats:")
    log.Printf("Sub1: %d messages, %d pending", sub1.Delivered, sub1.Pending)
    log.Printf("Sub2: %d messages, %d pending", sub2.Delivered, sub2.Pending)
}

2. Request-Reply Pattern

core/request_reply.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

type Request struct {
    OrderID string `json:"order_id"`
    Action  string `json:"action"`
}

type Response struct {
    Status  string `json:"status"`
    Message string `json:"message"`
}

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Start responder
    go startResponder(nc)

    // Give responder time to subscribe
    time.Sleep(100 * time.Millisecond)

    // Make requests
    for i := 1; i <= 5; i++ {
        req := Request{
            OrderID: fmt.Sprintf("ORD-%03d", i),
            Action:  "process",
        }

        reqData, _ := json.Marshal(req)

        // Request with timeout
        msg, err := nc.Request("orders.process", reqData, 2*time.Second)
        if err != nil {
            log.Printf("Request failed: %v", err)
            continue
        }

        var resp Response
        if err := json.Unmarshal(msg.Data, &resp); err != nil {
            log.Printf("Invalid response: %v", err)
            continue
        }

        log.Printf("Request %s: %s - %s", req.OrderID, resp.Status, resp.Message)
        time.Sleep(500 * time.Millisecond)
    }

    time.Sleep(1 * time.Second)
}

func startResponder(nc *nats.Conn) {
    nc.Subscribe("orders.process", func(msg *nats.Msg) {
        var req Request
        if err := json.Unmarshal(msg.Data, &req); err != nil {
            log.Printf("Invalid request: %v", err)
            return
        }

        log.Printf("[Responder] Processing %s", req.OrderID)

        // Simulate processing
        time.Sleep(200 * time.Millisecond)

        resp := Response{
            Status:  "success",
            Message: fmt.Sprintf("Order %s processed", req.OrderID),
        }

        respData, _ := json.Marshal(resp)
        msg.Respond(respData)
    })

    log.Println("[Responder] Ready to handle requests")
}

3. Queue Groups (Load Balancing)

core/queue_groups.go

package main

import (
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Start multiple workers in same queue group
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go startWorker(nc, i, &wg)
    }

    // Give workers time to subscribe
    time.Sleep(100 * time.Millisecond)

    // Publish tasks
    for i := 1; i <= 20; i++ {
        task := fmt.Sprintf("Task #%d", i)
        if err := nc.Publish("tasks.process", []byte(task)); err != nil {
            log.Printf("Error publishing: %v", err)
        }
        log.Printf("[Publisher] Sent: %s", task)
        time.Sleep(100 * time.Millisecond)
    }

    // Wait for processing
    time.Sleep(5 * time.Second)
}

func startWorker(nc *nats.Conn, id int, wg *sync.WaitGroup) {
    defer wg.Done()

    // Queue subscription - only one subscriber in group receives each message
    sub, err := nc.QueueSubscribe("tasks.process", "workers", func(msg *nats.Msg) {
        log.Printf("[Worker %d] Received: %s", id, string(msg.Data))

        // Simulate work
        processingTime := time.Duration(rand.Intn(500)+500) * time.Millisecond
        time.Sleep(processingTime)

        log.Printf("[Worker %d] Completed: %s (took %v)", id, string(msg.Data), processingTime)
    })

    if err != nil {
        log.Printf("Worker %d subscribe error: %v", id, err)
        return
    }
    defer sub.Unsubscribe()

    log.Printf("[Worker %d] Started", id)

    // Keep running
    time.Sleep(10 * time.Second)
}

4. JetStream - Creating Streams

jetstream/streams.go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // Connect to NATS with JetStream enabled
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Create JetStream context
    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // Create stream
    streamName := "ORDERS"
    stream, err := js.AddStream(&nats.StreamConfig{
        Name:     streamName,
        Subjects: []string{"orders.*"},
        Storage:  nats.FileStorage,
        MaxAge:   24 * time.Hour,
        MaxBytes: 1024 * 1024 * 1024, // 1GB
        Retention: nats.LimitsPolicy,
    })
    if err != nil {
        log.Printf("Stream may already exist: %v", err)

        // Get existing stream info
        stream, err = js.StreamInfo(streamName)
        if err != nil {
            log.Fatal(err)
        }
    }

    log.Printf("Stream %s created/exists", stream.Config.Name)
    log.Printf("  Subjects: %v", stream.Config.Subjects)
    log.Printf("  Storage: %v", stream.Config.Storage)
    log.Printf("  Current messages: %d", stream.State.Msgs)
    log.Printf("  Current bytes: %d", stream.State.Bytes)

    // Publish messages to stream
    for i := 1; i <= 10; i++ {
        subject := "orders.created"
        if i%2 == 0 {
            subject = "orders.updated"
        }

        msg := fmt.Sprintf("Order event #%d", i)
        ack, err := js.Publish(subject, []byte(msg))
        if err != nil {
            log.Printf("Error publishing: %v", err)
            continue
        }

        log.Printf("Published to %s (seq: %d, stream: %s)", subject, ack.Sequence, ack.Stream)
        time.Sleep(200 * time.Millisecond)
    }

    // Stream info after publishing
    stream, _ = js.StreamInfo(streamName)
    log.Printf("\nStream stats after publishing:")
    log.Printf("  Messages: %d", stream.State.Msgs)
    log.Printf("  Bytes: %d", stream.State.Bytes)
    log.Printf("  First Seq: %d", stream.State.FirstSeq)
    log.Printf("  Last Seq: %d", stream.State.LastSeq)
}

5. JetStream - Pull Consumer

jetstream/consumer.go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    streamName := "ORDERS"
    consumerName := "order-processor"

    // Create durable pull consumer
    consumer, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
        Durable:       consumerName,
        AckPolicy:     nats.AckExplicitPolicy,
        AckWait:       30 * time.Second,
        MaxDeliver:    3,
        FilterSubject: "orders.>",
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    if err != nil {
        log.Printf("Consumer may already exist: %v", err)
    } else {
        log.Printf("Consumer %s created", consumer.Name)
    }

    // Pull-based consumer
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    sub, err := js.PullSubscribe("orders.>", consumerName,
        nats.Bind(streamName, consumerName),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer sub.Unsubscribe()

    log.Printf("Pull consumer started, processing messages...")

    processed := 0
    for {
        select {
        case <-ctx.Done():
            log.Printf("Context done, processed %d messages", processed)
            return
        default:
            // Fetch batch of messages
            msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
            if err != nil {
                if err == nats.ErrTimeout {
                    log.Println("No new messages, waiting...")
                    continue
                }
                log.Printf("Fetch error: %v", err)
                continue
            }

            for _, msg := range msgs {
                log.Printf("Processing: %s (subject: %s, seq: %d)",
                    string(msg.Data), msg.Subject, mustGetSequence(msg))

                // Simulate processing
                time.Sleep(100 * time.Millisecond)

                // Acknowledge message
                if err := msg.Ack(); err != nil {
                    log.Printf("Ack error: %v", err)
                } else {
                    processed++
                }
            }
        }
    }
}

func mustGetSequence(msg *nats.Msg) uint64 {
    meta, _ := msg.Metadata()
    if meta != nil {
        return meta.Sequence.Stream
    }
    return 0
}

6. JetStream - Push Consumer

jetstream/push_consumer.go

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // Subscribe with push consumer (auto-created)
    sub, err := js.Subscribe("orders.>", func(msg *nats.Msg) {
        meta, _ := msg.Metadata()

        log.Printf("Received: %s", string(msg.Data))
        log.Printf("  Subject: %s", msg.Subject)
        log.Printf("  Stream Seq: %d", meta.Sequence.Stream)
        log.Printf("  Consumer Seq: %d", meta.Sequence.Consumer)
        log.Printf("  Delivered: %d", meta.NumDelivered)

        // Simulate processing
        time.Sleep(100 * time.Millisecond)

        // Acknowledge
        if err := msg.Ack(); err != nil {
            log.Printf("Ack error: %v", err)
        }
    },
        nats.Durable("push-consumer"),
        nats.AckExplicit(),
        nats.MaxDeliver(3),
        nats.AckWait(30*time.Second),
    )

    if err != nil {
        log.Fatal(err)
    }
    defer sub.Unsubscribe()

    log.Println("Push consumer started, press Ctrl+C to exit")

    // Keep running
    select {}
}

7. Key-Value Store (Built on JetStream)

jetstream/kv_store.go

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // Create key-value bucket
    kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
        Bucket:      "configs",
        Description: "Application configuration",
        MaxBytes:    1024 * 1024, // 1MB
        TTL:         1 * time.Hour,
        Storage:     nats.FileStorage,
        Replicas:    1,
    })
    if err != nil {
        // Bucket may exist
        kv, err = js.KeyValue("configs")
        if err != nil {
            log.Fatal(err)
        }
    }

    log.Printf("KV bucket 'configs' ready")

    // Put values
    keys := map[string]string{
        "app.name":     "OrderService",
        "app.version":  "1.0.0",
        "app.env":      "production",
        "db.host":      "localhost",
        "db.port":      "5432",
    }

    for key, value := range keys {
        seq, err := kv.Put(key, []byte(value))
        if err != nil {
            log.Printf("Error putting %s: %v", key, err)
            continue
        }
        log.Printf("Put: %s = %s (revision: %d)", key, value, seq)
    }

    // Get values
    for key := range keys {
        entry, err := kv.Get(key)
        if err != nil {
            log.Printf("Error getting %s: %v", key, err)
            continue
        }
        log.Printf("Get: %s = %s (revision: %d)", key, string(entry.Value()), entry.Revision())
    }

    // Watch for changes
    watcher, err := kv.WatchAll()
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Stop()

    go func() {
        for entry := range watcher.Updates() {
            if entry == nil {
                break
            }
            log.Printf("Watch: %s changed to %s (revision: %d)",
                entry.Key(), string(entry.Value()), entry.Revision())
        }
    }()

    // Update a value
    time.Sleep(1 * time.Second)
    kv.Put("app.version", []byte("1.1.0"))

    // Delete a key
    time.Sleep(1 * time.Second)
    if err := kv.Delete("db.port"); err != nil {
        log.Printf("Error deleting: %v", err)
    }

    time.Sleep(2 * time.Second)

    // List all keys
    keys, err := kv.Keys()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("\nAll keys: %v", keys)

    // Get bucket status
    status, err := kv.Status()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("\nBucket status:")
    log.Printf("  Bucket: %s", status.Bucket())
    log.Printf("  Values: %d", status.Values())
    log.Printf("  Bytes: %d", status.Bytes())
}

Delivery Guarantees

Mode Guarantee Use Case Performance
NATS Core At-most-once High-speed, fire-and-forget Highest
JetStream (no ack) At-most-once Fast, with persistence High
JetStream (ack) At-least-once Reliable messaging Medium
JetStream (dedup) Exactly-once Critical transactions Lower

Best Practices

1. Connection Resilience

nc, err := nats.Connect(
    nats.DefaultURL,
    nats.Name("MyService"),
    nats.MaxReconnects(-1),           // Reconnect forever
    nats.ReconnectWait(2*time.Second),
    nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
        log.Printf("Disconnected: %v", err)
    }),
    nats.ReconnectHandler(func(nc *nats.Conn) {
        log.Printf("Reconnected to %s", nc.ConnectedUrl())
    }),
    nats.ClosedHandler(func(nc *nats.Conn) {
        log.Printf("Connection closed: %v", nc.LastError())
    }),
)

2. Structured Subjects

// Good: hierarchical and predictable
"orders.us.east.created"
"sensors.temp.building1.floor2"
"metrics.cpu.server01"

// Bad: flat and hard to filter
"order_created_us_east"
"temp_sensor_b1_f2"

3. Consumer ACK Policy

// Explicit ack for reliability
nats.AckExplicit()

// Ack automatically (fire-and-forget)
nats.AckNone()

// Ack after processing all
nats.AckAll()

4. Error Handling

sub, err := js.Subscribe("orders.>", func(msg *nats.Msg) {
    if err := processMessage(msg); err != nil {
        log.Printf("Processing error: %v", err)
        // Negative ack - redeliver
        msg.Nak()
        return
    }
    msg.Ack()
},
    nats.MaxDeliver(3),    // Retry up to 3 times
    nats.AckWait(30*time.Second),
)

5. Monitoring

// Stream stats
stream, _ := js.StreamInfo("ORDERS")
log.Printf("Messages: %d, Bytes: %d, Consumers: %d",
    stream.State.Msgs,
    stream.State.Bytes,
    stream.State.Consumers)

// Consumer stats
info, _ := sub.ConsumerInfo()
log.Printf("Pending: %d, Delivered: %d, AckPending: %d",
    info.NumPending,
    info.Delivered.Consumer,
    info.NumAckPending)

Common Pitfalls

1. Not Handling Reconnections

// Bad: assumes connection is always valid
nc.Publish("subject", data)

// Good: check if connected
if !nc.IsConnected() {
    return errors.New("not connected")
}
nc.Publish("subject", data)

2. Forgetting to Ack Messages

// Bad: message will be redelivered
js.Subscribe("orders.>", func(msg *nats.Msg) {
    processMessage(msg)
    // Missing msg.Ack()!
})

// Good: always ack
js.Subscribe("orders.>", func(msg *nats.Msg) {
    if err := processMessage(msg); err != nil {
        msg.Nak()
    } else {
        msg.Ack()
    }
})

3. Not Setting Stream Limits

// Bad: unlimited growth
js.AddStream(&nats.StreamConfig{
    Name: "LOGS",
})

// Good: set limits
js.AddStream(&nats.StreamConfig{
    Name:     "LOGS",
    MaxAge:   24 * time.Hour,
    MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
    Retention: nats.LimitsPolicy,
})

When to Use NATS/JetStream

✅ Use NATS When:

  • Microservices communication - Fast, lightweight messaging
  • High throughput required - Millions of messages/second
  • Fire-and-forget ok - At-most-once delivery sufficient
  • Cloud-native architecture - Kubernetes-friendly
  • Low resource usage - Minimal memory/CPU footprint

✅ Use JetStream When:

  • Persistence required - Message durability
  • At-least-once delivery - Guaranteed processing
  • Message replay - Event sourcing, auditing
  • Consumer groups - Load balancing with acknowledgments
  • Stream processing - Time-series data

❌ Avoid When:

  • Complex routing - Use RabbitMQ instead
  • Long-term archival - Use Kafka instead
  • Transactions - Use traditional message queues
  • Legacy protocols - AMQP, MQTT required

Advantages

  1. High Performance - Millions of messages per second
  2. Lightweight - Single binary, minimal resources
  3. Simple API - Easy to use and understand
  4. Built-in Clustering - High availability
  5. Cloud Native - Kubernetes integration
  6. Multiple Patterns - Pub/sub, request/reply, queue groups
  7. JetStream - Adds persistence without complexity

Disadvantages

  1. No Complex Routing - Unlike RabbitMQ exchanges
  2. Limited Query - Can’t query message history easily
  3. No Transactions - No distributed transactions
  4. Newer Technology - Less mature than Kafka/RabbitMQ
  5. Memory Based - Core NATS is in-memory only

Performance Characteristics

NATS Core:
- Latency: < 1ms
- Throughput: 10M+ msg/sec
- Delivery: At-most-once

JetStream:
- Latency: < 10ms
- Throughput: 1M+ msg/sec
- Delivery: At-least-once / Exactly-once

Running NATS Server

Docker:

# NATS Core
docker run -p 4222:4222 nats:latest

# NATS with JetStream
docker run -p 4222:4222 nats:latest -js

# With monitoring
docker run -p 4222:4222 -p 8222:8222 nats:latest -js -m 8222

Docker Compose:

version: '3'
services:
  nats:
    image: nats:latest
    ports:
      - "4222:4222"  # Client
      - "8222:8222"  # Monitoring
    command: ["-js", "-m", "8222"]

Backend Communication
Current: NATS & JetStream