What is NATS?
NATS is a high-performance, cloud-native messaging system designed for microservices, IoT, and edge computing. It provides a simple yet powerful pub/sub model with subject-based addressing, making it ideal for building distributed systems that require fast, reliable communication.
JetStream is NATS 2.0’s persistence layer, adding streaming, message replay, and exactly-once delivery guarantees on top of NATS Core’s at-most-once delivery.
Key characteristics:
- Subject-based addressing - Hierarchical topic naming (e.g.,
orders.us.east.123) - High performance - Millions of messages per second
- Multiple delivery modes - At-most-once (Core) and at-least-once (JetStream)
- Lightweight - Single binary, minimal resource usage
- Built-in clustering - High availability and scalability
Architecture Overview
Subject Hierarchies and Wildcards
NATS uses a dot-separated subject hierarchy:
orders.created # Exact subject
orders.*.east # Single token wildcard (*)
orders.> # Multi-token wildcard (>)
Examples:
sensors.temperature.us.west.1- Specific sensorsensors.*.us.>- All US sensorssensors.>- All sensors
Real-World Use Cases
NATS Core:
- Service mesh communication - Fast RPC between microservices
- Real-time notifications - Fire-and-forget events
- Metrics collection - High-frequency telemetry data
- Command distribution - Request-response patterns
JetStream:
- Event sourcing - Persistent event log
- Message queuing - Reliable background jobs
- Audit logging - Compliance and traceability
- Data replication - Multi-region event streaming
Implementation in Go
Project Structure
nats-demo/
├── core/
│ ├── pub_sub.go
│ ├── request_reply.go
│ └── queue_groups.go
├── jetstream/
│ ├── producer.go
│ ├── consumer.go
│ └── streams.go
└── go.mod
Dependencies
go.mod
module nats-demo
go 1.21
require (
github.com/nats-io/nats.go v1.31.0
)
1. NATS Core - Publish/Subscribe
core/pub_sub.go
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// Connect to NATS server
nc, err := nats.Connect(nats.DefaultURL,
nats.Name("PubSub Demo"),
nats.MaxReconnects(-1),
nats.ReconnectWait(2*time.Second),
)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Simple subscriber
sub1, err := nc.Subscribe("orders.>", func(msg *nats.Msg) {
log.Printf("[Sub1] Received on %s: %s", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer sub1.Unsubscribe()
// Wildcard subscriber
sub2, err := nc.Subscribe("orders.*.created", func(msg *nats.Msg) {
log.Printf("[Sub2] New order on %s: %s", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
defer sub2.Unsubscribe()
// Ensure subscriptions are active
if err := nc.Flush(); err != nil {
log.Fatal(err)
}
// Publish messages
subjects := []string{
"orders.us.created",
"orders.eu.created",
"orders.us.updated",
"orders.us.deleted",
}
for i, subject := range subjects {
data := fmt.Sprintf("Order #%d", i+1)
if err := nc.Publish(subject, []byte(data)); err != nil {
log.Printf("Error publishing: %v", err)
}
log.Printf("[Publisher] Sent to %s: %s", subject, data)
time.Sleep(500 * time.Millisecond)
}
// Wait for messages to be processed
time.Sleep(2 * time.Second)
// Print stats
log.Printf("\nSubscription Stats:")
log.Printf("Sub1: %d messages, %d pending", sub1.Delivered, sub1.Pending)
log.Printf("Sub2: %d messages, %d pending", sub2.Delivered, sub2.Pending)
}
2. Request-Reply Pattern
core/request_reply.go
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
type Request struct {
OrderID string `json:"order_id"`
Action string `json:"action"`
}
type Response struct {
Status string `json:"status"`
Message string `json:"message"`
}
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Start responder
go startResponder(nc)
// Give responder time to subscribe
time.Sleep(100 * time.Millisecond)
// Make requests
for i := 1; i <= 5; i++ {
req := Request{
OrderID: fmt.Sprintf("ORD-%03d", i),
Action: "process",
}
reqData, _ := json.Marshal(req)
// Request with timeout
msg, err := nc.Request("orders.process", reqData, 2*time.Second)
if err != nil {
log.Printf("Request failed: %v", err)
continue
}
var resp Response
if err := json.Unmarshal(msg.Data, &resp); err != nil {
log.Printf("Invalid response: %v", err)
continue
}
log.Printf("Request %s: %s - %s", req.OrderID, resp.Status, resp.Message)
time.Sleep(500 * time.Millisecond)
}
time.Sleep(1 * time.Second)
}
func startResponder(nc *nats.Conn) {
nc.Subscribe("orders.process", func(msg *nats.Msg) {
var req Request
if err := json.Unmarshal(msg.Data, &req); err != nil {
log.Printf("Invalid request: %v", err)
return
}
log.Printf("[Responder] Processing %s", req.OrderID)
// Simulate processing
time.Sleep(200 * time.Millisecond)
resp := Response{
Status: "success",
Message: fmt.Sprintf("Order %s processed", req.OrderID),
}
respData, _ := json.Marshal(resp)
msg.Respond(respData)
})
log.Println("[Responder] Ready to handle requests")
}
3. Queue Groups (Load Balancing)
core/queue_groups.go
package main
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Start multiple workers in same queue group
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go startWorker(nc, i, &wg)
}
// Give workers time to subscribe
time.Sleep(100 * time.Millisecond)
// Publish tasks
for i := 1; i <= 20; i++ {
task := fmt.Sprintf("Task #%d", i)
if err := nc.Publish("tasks.process", []byte(task)); err != nil {
log.Printf("Error publishing: %v", err)
}
log.Printf("[Publisher] Sent: %s", task)
time.Sleep(100 * time.Millisecond)
}
// Wait for processing
time.Sleep(5 * time.Second)
}
func startWorker(nc *nats.Conn, id int, wg *sync.WaitGroup) {
defer wg.Done()
// Queue subscription - only one subscriber in group receives each message
sub, err := nc.QueueSubscribe("tasks.process", "workers", func(msg *nats.Msg) {
log.Printf("[Worker %d] Received: %s", id, string(msg.Data))
// Simulate work
processingTime := time.Duration(rand.Intn(500)+500) * time.Millisecond
time.Sleep(processingTime)
log.Printf("[Worker %d] Completed: %s (took %v)", id, string(msg.Data), processingTime)
})
if err != nil {
log.Printf("Worker %d subscribe error: %v", id, err)
return
}
defer sub.Unsubscribe()
log.Printf("[Worker %d] Started", id)
// Keep running
time.Sleep(10 * time.Second)
}
4. JetStream - Creating Streams
jetstream/streams.go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// Connect to NATS with JetStream enabled
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Create JetStream context
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Create stream
streamName := "ORDERS"
stream, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"orders.*"},
Storage: nats.FileStorage,
MaxAge: 24 * time.Hour,
MaxBytes: 1024 * 1024 * 1024, // 1GB
Retention: nats.LimitsPolicy,
})
if err != nil {
log.Printf("Stream may already exist: %v", err)
// Get existing stream info
stream, err = js.StreamInfo(streamName)
if err != nil {
log.Fatal(err)
}
}
log.Printf("Stream %s created/exists", stream.Config.Name)
log.Printf(" Subjects: %v", stream.Config.Subjects)
log.Printf(" Storage: %v", stream.Config.Storage)
log.Printf(" Current messages: %d", stream.State.Msgs)
log.Printf(" Current bytes: %d", stream.State.Bytes)
// Publish messages to stream
for i := 1; i <= 10; i++ {
subject := "orders.created"
if i%2 == 0 {
subject = "orders.updated"
}
msg := fmt.Sprintf("Order event #%d", i)
ack, err := js.Publish(subject, []byte(msg))
if err != nil {
log.Printf("Error publishing: %v", err)
continue
}
log.Printf("Published to %s (seq: %d, stream: %s)", subject, ack.Sequence, ack.Stream)
time.Sleep(200 * time.Millisecond)
}
// Stream info after publishing
stream, _ = js.StreamInfo(streamName)
log.Printf("\nStream stats after publishing:")
log.Printf(" Messages: %d", stream.State.Msgs)
log.Printf(" Bytes: %d", stream.State.Bytes)
log.Printf(" First Seq: %d", stream.State.FirstSeq)
log.Printf(" Last Seq: %d", stream.State.LastSeq)
}
5. JetStream - Pull Consumer
jetstream/consumer.go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
streamName := "ORDERS"
consumerName := "order-processor"
// Create durable pull consumer
consumer, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
AckWait: 30 * time.Second,
MaxDeliver: 3,
FilterSubject: "orders.>",
DeliverPolicy: nats.DeliverAllPolicy,
})
if err != nil {
log.Printf("Consumer may already exist: %v", err)
} else {
log.Printf("Consumer %s created", consumer.Name)
}
// Pull-based consumer
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sub, err := js.PullSubscribe("orders.>", consumerName,
nats.Bind(streamName, consumerName),
)
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
log.Printf("Pull consumer started, processing messages...")
processed := 0
for {
select {
case <-ctx.Done():
log.Printf("Context done, processed %d messages", processed)
return
default:
// Fetch batch of messages
msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
if err != nil {
if err == nats.ErrTimeout {
log.Println("No new messages, waiting...")
continue
}
log.Printf("Fetch error: %v", err)
continue
}
for _, msg := range msgs {
log.Printf("Processing: %s (subject: %s, seq: %d)",
string(msg.Data), msg.Subject, mustGetSequence(msg))
// Simulate processing
time.Sleep(100 * time.Millisecond)
// Acknowledge message
if err := msg.Ack(); err != nil {
log.Printf("Ack error: %v", err)
} else {
processed++
}
}
}
}
}
func mustGetSequence(msg *nats.Msg) uint64 {
meta, _ := msg.Metadata()
if meta != nil {
return meta.Sequence.Stream
}
return 0
}
6. JetStream - Push Consumer
jetstream/push_consumer.go
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Subscribe with push consumer (auto-created)
sub, err := js.Subscribe("orders.>", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
log.Printf("Received: %s", string(msg.Data))
log.Printf(" Subject: %s", msg.Subject)
log.Printf(" Stream Seq: %d", meta.Sequence.Stream)
log.Printf(" Consumer Seq: %d", meta.Sequence.Consumer)
log.Printf(" Delivered: %d", meta.NumDelivered)
// Simulate processing
time.Sleep(100 * time.Millisecond)
// Acknowledge
if err := msg.Ack(); err != nil {
log.Printf("Ack error: %v", err)
}
},
nats.Durable("push-consumer"),
nats.AckExplicit(),
nats.MaxDeliver(3),
nats.AckWait(30*time.Second),
)
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
log.Println("Push consumer started, press Ctrl+C to exit")
// Keep running
select {}
}
7. Key-Value Store (Built on JetStream)
jetstream/kv_store.go
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Create key-value bucket
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "configs",
Description: "Application configuration",
MaxBytes: 1024 * 1024, // 1MB
TTL: 1 * time.Hour,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
// Bucket may exist
kv, err = js.KeyValue("configs")
if err != nil {
log.Fatal(err)
}
}
log.Printf("KV bucket 'configs' ready")
// Put values
keys := map[string]string{
"app.name": "OrderService",
"app.version": "1.0.0",
"app.env": "production",
"db.host": "localhost",
"db.port": "5432",
}
for key, value := range keys {
seq, err := kv.Put(key, []byte(value))
if err != nil {
log.Printf("Error putting %s: %v", key, err)
continue
}
log.Printf("Put: %s = %s (revision: %d)", key, value, seq)
}
// Get values
for key := range keys {
entry, err := kv.Get(key)
if err != nil {
log.Printf("Error getting %s: %v", key, err)
continue
}
log.Printf("Get: %s = %s (revision: %d)", key, string(entry.Value()), entry.Revision())
}
// Watch for changes
watcher, err := kv.WatchAll()
if err != nil {
log.Fatal(err)
}
defer watcher.Stop()
go func() {
for entry := range watcher.Updates() {
if entry == nil {
break
}
log.Printf("Watch: %s changed to %s (revision: %d)",
entry.Key(), string(entry.Value()), entry.Revision())
}
}()
// Update a value
time.Sleep(1 * time.Second)
kv.Put("app.version", []byte("1.1.0"))
// Delete a key
time.Sleep(1 * time.Second)
if err := kv.Delete("db.port"); err != nil {
log.Printf("Error deleting: %v", err)
}
time.Sleep(2 * time.Second)
// List all keys
keys, err := kv.Keys()
if err != nil {
log.Fatal(err)
}
log.Printf("\nAll keys: %v", keys)
// Get bucket status
status, err := kv.Status()
if err != nil {
log.Fatal(err)
}
log.Printf("\nBucket status:")
log.Printf(" Bucket: %s", status.Bucket())
log.Printf(" Values: %d", status.Values())
log.Printf(" Bytes: %d", status.Bytes())
}
Delivery Guarantees
| Mode | Guarantee | Use Case | Performance |
|---|---|---|---|
| NATS Core | At-most-once | High-speed, fire-and-forget | Highest |
| JetStream (no ack) | At-most-once | Fast, with persistence | High |
| JetStream (ack) | At-least-once | Reliable messaging | Medium |
| JetStream (dedup) | Exactly-once | Critical transactions | Lower |
Best Practices
1. Connection Resilience
nc, err := nats.Connect(
nats.DefaultURL,
nats.Name("MyService"),
nats.MaxReconnects(-1), // Reconnect forever
nats.ReconnectWait(2*time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Disconnected: %v", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Reconnected to %s", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("Connection closed: %v", nc.LastError())
}),
)
2. Structured Subjects
// Good: hierarchical and predictable
"orders.us.east.created"
"sensors.temp.building1.floor2"
"metrics.cpu.server01"
// Bad: flat and hard to filter
"order_created_us_east"
"temp_sensor_b1_f2"
3. Consumer ACK Policy
// Explicit ack for reliability
nats.AckExplicit()
// Ack automatically (fire-and-forget)
nats.AckNone()
// Ack after processing all
nats.AckAll()
4. Error Handling
sub, err := js.Subscribe("orders.>", func(msg *nats.Msg) {
if err := processMessage(msg); err != nil {
log.Printf("Processing error: %v", err)
// Negative ack - redeliver
msg.Nak()
return
}
msg.Ack()
},
nats.MaxDeliver(3), // Retry up to 3 times
nats.AckWait(30*time.Second),
)
5. Monitoring
// Stream stats
stream, _ := js.StreamInfo("ORDERS")
log.Printf("Messages: %d, Bytes: %d, Consumers: %d",
stream.State.Msgs,
stream.State.Bytes,
stream.State.Consumers)
// Consumer stats
info, _ := sub.ConsumerInfo()
log.Printf("Pending: %d, Delivered: %d, AckPending: %d",
info.NumPending,
info.Delivered.Consumer,
info.NumAckPending)
Common Pitfalls
1. Not Handling Reconnections
// Bad: assumes connection is always valid
nc.Publish("subject", data)
// Good: check if connected
if !nc.IsConnected() {
return errors.New("not connected")
}
nc.Publish("subject", data)
2. Forgetting to Ack Messages
// Bad: message will be redelivered
js.Subscribe("orders.>", func(msg *nats.Msg) {
processMessage(msg)
// Missing msg.Ack()!
})
// Good: always ack
js.Subscribe("orders.>", func(msg *nats.Msg) {
if err := processMessage(msg); err != nil {
msg.Nak()
} else {
msg.Ack()
}
})
3. Not Setting Stream Limits
// Bad: unlimited growth
js.AddStream(&nats.StreamConfig{
Name: "LOGS",
})
// Good: set limits
js.AddStream(&nats.StreamConfig{
Name: "LOGS",
MaxAge: 24 * time.Hour,
MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
Retention: nats.LimitsPolicy,
})
When to Use NATS/JetStream
✅ Use NATS When:
- Microservices communication - Fast, lightweight messaging
- High throughput required - Millions of messages/second
- Fire-and-forget ok - At-most-once delivery sufficient
- Cloud-native architecture - Kubernetes-friendly
- Low resource usage - Minimal memory/CPU footprint
✅ Use JetStream When:
- Persistence required - Message durability
- At-least-once delivery - Guaranteed processing
- Message replay - Event sourcing, auditing
- Consumer groups - Load balancing with acknowledgments
- Stream processing - Time-series data
❌ Avoid When:
- Complex routing - Use RabbitMQ instead
- Long-term archival - Use Kafka instead
- Transactions - Use traditional message queues
- Legacy protocols - AMQP, MQTT required
Advantages
- High Performance - Millions of messages per second
- Lightweight - Single binary, minimal resources
- Simple API - Easy to use and understand
- Built-in Clustering - High availability
- Cloud Native - Kubernetes integration
- Multiple Patterns - Pub/sub, request/reply, queue groups
- JetStream - Adds persistence without complexity
Disadvantages
- No Complex Routing - Unlike RabbitMQ exchanges
- Limited Query - Can’t query message history easily
- No Transactions - No distributed transactions
- Newer Technology - Less mature than Kafka/RabbitMQ
- Memory Based - Core NATS is in-memory only
Performance Characteristics
NATS Core:
- Latency: < 1ms
- Throughput: 10M+ msg/sec
- Delivery: At-most-once
JetStream:
- Latency: < 10ms
- Throughput: 1M+ msg/sec
- Delivery: At-least-once / Exactly-once
Running NATS Server
Docker:
# NATS Core
docker run -p 4222:4222 nats:latest
# NATS with JetStream
docker run -p 4222:4222 nats:latest -js
# With monitoring
docker run -p 4222:4222 -p 8222:8222 nats:latest -js -m 8222
Docker Compose:
version: '3'
services:
nats:
image: nats:latest
ports:
- "4222:4222" # Client
- "8222:8222" # Monitoring
command: ["-js", "-m", "8222"]