{{series_nav(current_post=8)}}
RabbitMQ is one of the most popular open-source message brokers, implementing the Advanced Message Queueing Protocol (AMQP). It acts as a reliable intermediary for asynchronous message passing between applications, providing features like message routing, queuing, delivery guarantees, and complex routing patterns.
What is RabbitMQ?
RabbitMQ is a message broker that accepts and forwards messages. Unlike direct communication between services, RabbitMQ provides a buffer that decouples producers from consumers, allowing them to operate independently and scale separately.
Core Concepts
Exchanges: Routing components that receive messages from producers and route them to queues based on routing rules.
Queues: Buffers that store messages until consumers retrieve them.
Bindings: Rules that link exchanges to queues, defining how messages should be routed.
Routing Keys: Message attributes used by exchanges to determine routing destinations.
Virtual Hosts (vhosts): Isolated namespaces for organizing exchanges, queues, and bindings.
Exchange Types
Direct: Routes messages with exact routing key matches.
Fanout: Broadcasts messages to all bound queues, ignoring routing keys.
Topic: Routes based on pattern matching with routing keys.
Headers: Routes based on message header attributes instead of routing keys.
Architecture
Type: Direct] EX2[Exchange: logs
Type: Fanout] EX3[Exchange: events
Type: Topic] Q1[Queue: orders] Q2[Queue: notifications] Q3[Queue: analytics] Q4[Queue: errors.critical] Q5[Queue: errors.warning] DLX[Exchange: DLX
Dead Letter] DLQ[Queue: DLQ] EX1 -->|routing_key: order| Q1 EX1 -->|routing_key: notify| Q2 EX2 --> Q2 EX2 --> Q3 EX3 -->|error.critical| Q4 EX3 -->|error.warning| Q5 Q1 -.->|rejected/expired| DLX DLX --> DLQ end end subgraph "Consumers" C1[Consumer 1
Order Processor] C2[Consumer 2
Notification Service] C3[Consumer 3
Analytics] C4[Consumer 4
Error Handler] end P1 -->|Publish order| EX1 P2 -->|Publish log| EX2 P3 -->|Publish error| EX3 Q1 --> C1 Q2 --> C2 Q3 --> C3 Q4 --> C4
Message Flow
Real-World Use Cases
1. Task Queues
Distribute time-consuming tasks across multiple workers for parallel processing.
Perfect for: Image processing, video encoding, report generation, email sending
2. Work Queue Patterns
Implement competing consumers pattern where multiple workers process tasks from a shared queue.
Perfect for: Background jobs, batch processing, data imports
3. Pub/Sub Messaging
Broadcast events to multiple subscribers without coupling producers to consumers.
Perfect for: Event notifications, real-time updates, activity feeds
4. Request/Reply (RPC)
Implement synchronous RPC-style communication over async messaging.
Perfect for: Microservice communication, API gateways, service orchestration
5. Priority Queues
Process high-priority messages before lower-priority ones.
Perfect for: SLA-sensitive operations, urgent notifications, critical alerts
6. Delayed Messages
Schedule message delivery for future processing.
Perfect for: Scheduled tasks, reminder systems, retry mechanisms
7. Message Routing
Route messages to different handlers based on content or routing keys.
Perfect for: Multi-tenant systems, workflow engines, event routing
8. Dead Letter Handling
Capture and analyze failed messages for debugging and manual intervention.
Perfect for: Error analysis, poison message handling, audit trails
Project Structure
rabbitmq-service/
├── cmd/
│ ├── producer/
│ │ └── main.go # Producer application
│ └── consumer/
│ └── main.go # Consumer application
├── internal/
│ ├── broker/
│ │ ├── connection.go # Connection pool
│ │ └── channel.go # Channel management
│ ├── publisher/
│ │ ├── publisher.go # Message publisher
│ │ └── confirms.go # Publisher confirms
│ ├── consumer/
│ │ ├── consumer.go # Message consumer
│ │ ├── worker.go # Worker pool
│ │ └── handler.go # Message handler
│ ├── models/
│ │ └── message.go # Message types
│ └── config/
│ └── config.go # Configuration
├── pkg/
│ ├── middleware/
│ │ ├── retry.go # Retry middleware
│ │ ├── circuit.go # Circuit breaker
│ │ └── metrics.go # Metrics
│ └── patterns/
│ ├── rpc.go # RPC pattern
│ ├── priority.go # Priority queue
│ └── delayed.go # Delayed messaging
├── deployments/
│ ├── docker-compose.yml # RabbitMQ setup
│ └── rabbitmq/
│ └── definitions.json # Queue definitions
├── go.mod
└── go.sum
Implementation
1. Message Models
// internal/models/message.go
package models
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// Message represents a generic message
type Message struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Payload interface{} `json:"payload"`
Metadata map[string]interface{} `json:"metadata"`
Priority uint8 `json:"priority"` // 0-9
Retry int `json:"retry"`
MaxRetries int `json:"max_retries"`
}
// NewMessage creates a new message
func NewMessage(msgType string, payload interface{}) *Message {
return &Message{
ID: uuid.New().String(),
Type: msgType,
Timestamp: time.Now(),
Payload: payload,
Metadata: make(map[string]interface{}),
Priority: 0,
}
}
// ToJSON serializes message to JSON
func (m *Message) ToJSON() ([]byte, error) {
return json.Marshal(m)
}
// FromJSON deserializes message from JSON
func FromJSON(data []byte) (*Message, error) {
var msg Message
err := json.Unmarshal(data, &msg)
return &msg, err
}
// OrderMessage represents an order
type OrderMessage struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []Item `json:"items"`
TotalAmount float64 `json:"total_amount"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
type Item struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
2. Connection Management
// internal/broker/connection.go
package broker
import (
"fmt"
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// ConnectionPool manages RabbitMQ connections
type ConnectionPool struct {
url string
conn *amqp.Connection
mu sync.RWMutex
reconnect chan bool
closed bool
}
// Config holds connection configuration
type Config struct {
URL string
ReconnectDelay time.Duration
MaxReconnects int
}
// NewConnectionPool creates a new connection pool
func NewConnectionPool(cfg Config) (*ConnectionPool, error) {
pool := &ConnectionPool{
url: cfg.URL,
reconnect: make(chan bool),
}
conn, err := pool.connect()
if err != nil {
return nil, err
}
pool.conn = conn
// Monitor connection and reconnect if needed
go pool.monitorConnection(cfg.ReconnectDelay, cfg.MaxReconnects)
return pool, nil
}
func (p *ConnectionPool) connect() (*amqp.Connection, error) {
conn, err := amqp.Dial(p.url)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
log.Println("Connected to RabbitMQ")
return conn, nil
}
func (p *ConnectionPool) monitorConnection(delay time.Duration, maxRetries int) {
for {
reason, ok := <-p.conn.NotifyClose(make(chan *amqp.Error))
if !ok {
log.Println("Connection closed normally")
return
}
log.Printf("Connection closed: %v, reconnecting...", reason)
retries := 0
for retries < maxRetries {
time.Sleep(delay * time.Duration(1<<uint(retries)))
conn, err := p.connect()
if err == nil {
p.mu.Lock()
p.conn = conn
p.mu.Unlock()
p.reconnect <- true
break
}
retries++
log.Printf("Reconnection attempt %d/%d failed: %v", retries, maxRetries, err)
}
if retries >= maxRetries {
log.Fatal("Max reconnection attempts exceeded")
}
}
}
// GetConnection returns the current connection
func (p *ConnectionPool) GetConnection() *amqp.Connection {
p.mu.RLock()
defer p.mu.RUnlock()
return p.conn
}
// CreateChannel creates a new channel
func (p *ConnectionPool) CreateChannel() (*amqp.Channel, error) {
p.mu.RLock()
defer p.mu.RUnlock()
ch, err := p.conn.Channel()
if err != nil {
return nil, fmt.Errorf("create channel: %w", err)
}
return ch, nil
}
// Close closes the connection
func (p *ConnectionPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
p.closed = true
if p.conn != nil {
return p.conn.Close()
}
return nil
}
3. Publisher Implementation
// internal/publisher/publisher.go
package publisher
import (
"context"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"rabbitmq-service/internal/broker"
"rabbitmq-service/internal/models"
)
// Publisher publishes messages to RabbitMQ
type Publisher struct {
pool *broker.ConnectionPool
ch *amqp.Channel
confirms chan amqp.Confirmation
exchange string
}
// Config holds publisher configuration
type Config struct {
Exchange string
ExchangeType string
Durable bool
AutoDelete bool
Confirms bool // Publisher confirms
}
// NewPublisher creates a new publisher
func NewPublisher(pool *broker.ConnectionPool, cfg Config) (*Publisher, error) {
ch, err := pool.CreateChannel()
if err != nil {
return nil, err
}
// Declare exchange
err = ch.ExchangeDeclare(
cfg.Exchange,
cfg.ExchangeType,
cfg.Durable,
cfg.AutoDelete,
false, // internal
false, // no-wait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("declare exchange: %w", err)
}
pub := &Publisher{
pool: pool,
ch: ch,
exchange: cfg.Exchange,
}
// Enable publisher confirms
if cfg.Confirms {
if err := ch.Confirm(false); err != nil {
return nil, fmt.Errorf("enable confirms: %w", err)
}
pub.confirms = ch.NotifyPublish(make(chan amqp.Confirmation, 100))
}
return pub, nil
}
// Publish publishes a message
func (p *Publisher) Publish(ctx context.Context, routingKey string, msg *models.Message) error {
body, err := msg.ToJSON()
if err != nil {
return fmt.Errorf("serialize message: %w", err)
}
publishing := amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
MessageId: msg.ID,
Timestamp: time.Now(),
Priority: msg.Priority,
Headers: amqp.Table{
"x-message-type": msg.Type,
},
}
// Set TTL if specified
if ttl, ok := msg.Metadata["ttl"].(time.Duration); ok {
publishing.Expiration = fmt.Sprintf("%d", ttl.Milliseconds())
}
err = p.ch.PublishWithContext(
ctx,
p.exchange,
routingKey,
false, // mandatory
false, // immediate
publishing,
)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
// Wait for confirmation if enabled
if p.confirms != nil {
select {
case confirm := <-p.confirms:
if !confirm.Ack {
return fmt.Errorf("message not confirmed")
}
log.Printf("Message confirmed: %s", msg.ID)
case <-time.After(5 * time.Second):
return fmt.Errorf("confirmation timeout")
case <-ctx.Done():
return ctx.Err()
}
}
log.Printf("Published message: id=%s, routing_key=%s", msg.ID, routingKey)
return nil
}
// PublishWithDelay publishes a message with delay (requires rabbitmq_delayed_message_exchange plugin)
func (p *Publisher) PublishWithDelay(ctx context.Context, routingKey string, msg *models.Message, delay time.Duration) error {
body, err := msg.ToJSON()
if err != nil {
return fmt.Errorf("serialize message: %w", err)
}
publishing := amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
MessageId: msg.ID,
Timestamp: time.Now(),
Headers: amqp.Table{
"x-delay": delay.Milliseconds(),
"x-message-type": msg.Type,
},
}
return p.ch.PublishWithContext(ctx, p.exchange, routingKey, false, false, publishing)
}
// Close closes the publisher
func (p *Publisher) Close() error {
if p.ch != nil {
return p.ch.Close()
}
return nil
}
4. Consumer Implementation
// internal/consumer/consumer.go
package consumer
import (
"context"
"fmt"
"log"
"sync"
amqp "github.com/rabbitmq/amqp091-go"
"rabbitmq-service/internal/broker"
"rabbitmq-service/internal/models"
)
// Consumer consumes messages from RabbitMQ
type Consumer struct {
pool *broker.ConnectionPool
ch *amqp.Channel
queue string
handler MessageHandler
workers int
}
// MessageHandler processes messages
type MessageHandler interface {
Handle(ctx context.Context, msg *models.Message) error
}
// Config holds consumer configuration
type Config struct {
Queue string
Exchange string
RoutingKeys []string
Durable bool
AutoDelete bool
Exclusive bool
PrefetchCount int
Workers int
AutoAck bool
}
// NewConsumer creates a new consumer
func NewConsumer(pool *broker.ConnectionPool, cfg Config, handler MessageHandler) (*Consumer, error) {
ch, err := pool.CreateChannel()
if err != nil {
return nil, err
}
// Declare queue
_, err = ch.QueueDeclare(
cfg.Queue,
cfg.Durable,
cfg.AutoDelete,
cfg.Exclusive,
false, // no-wait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("declare queue: %w", err)
}
// Bind queue to exchange
for _, routingKey := range cfg.RoutingKeys {
err = ch.QueueBind(
cfg.Queue,
routingKey,
cfg.Exchange,
false, // no-wait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("bind queue: %w", err)
}
}
// Set QoS (prefetch)
err = ch.Qos(
cfg.PrefetchCount,
0, // prefetch size
false, // global
)
if err != nil {
return nil, fmt.Errorf("set qos: %w", err)
}
return &Consumer{
pool: pool,
ch: ch,
queue: cfg.Queue,
handler: handler,
workers: cfg.Workers,
}, nil
}
// Start starts consuming messages
func (c *Consumer) Start(ctx context.Context) error {
msgs, err := c.ch.Consume(
c.queue,
"", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("consume: %w", err)
}
log.Printf("Starting consumer with %d workers for queue: %s", c.workers, c.queue)
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
wg.Add(1)
go c.worker(ctx, &wg, msgs, i)
}
wg.Wait()
return nil
}
func (c *Consumer) worker(ctx context.Context, wg *sync.WaitGroup, msgs <-chan amqp.Delivery, id int) {
defer wg.Done()
log.Printf("Worker %d started", id)
for {
select {
case <-ctx.Done():
log.Printf("Worker %d stopped", id)
return
case delivery, ok := <-msgs:
if !ok {
log.Printf("Worker %d: channel closed", id)
return
}
if err := c.processMessage(ctx, delivery); err != nil {
log.Printf("Worker %d: error processing message: %v", id, err)
// Check retry count
retryCount := int32(0)
if val, ok := delivery.Headers["x-retry-count"].(int32); ok {
retryCount = val
}
if retryCount < 3 {
// Requeue with incremented retry count
delivery.Nack(false, true)
} else {
// Max retries exceeded, reject to DLQ
log.Printf("Max retries exceeded for message: %s", delivery.MessageId)
delivery.Nack(false, false)
}
} else {
delivery.Ack(false)
log.Printf("Worker %d: message processed: %s", id, delivery.MessageId)
}
}
}
}
func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) error {
msg, err := models.FromJSON(delivery.Body)
if err != nil {
return fmt.Errorf("deserialize message: %w", err)
}
// Add delivery metadata
msg.Metadata["routing_key"] = delivery.RoutingKey
msg.Metadata["exchange"] = delivery.Exchange
msg.Metadata["redelivered"] = delivery.Redelivered
return c.handler.Handle(ctx, msg)
}
// Close closes the consumer
func (c *Consumer) Close() error {
if c.ch != nil {
return c.ch.Close()
}
return nil
}
5. Message Handler
// internal/consumer/handler.go
package consumer
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"rabbitmq-service/internal/models"
)
// OrderHandler processes order messages
type OrderHandler struct {
processingTime time.Duration
}
func NewOrderHandler(processingTime time.Duration) *OrderHandler {
return &OrderHandler{processingTime: processingTime}
}
// Handle processes a message
func (h *OrderHandler) Handle(ctx context.Context, msg *models.Message) error {
log.Printf("Handling message: type=%s, id=%s", msg.Type, msg.ID)
switch msg.Type {
case "order.created":
return h.handleOrderCreated(ctx, msg)
case "order.payment":
return h.handleOrderPayment(ctx, msg)
case "order.shipped":
return h.handleOrderShipped(ctx, msg)
default:
log.Printf("Unknown message type: %s", msg.Type)
return nil
}
}
func (h *OrderHandler) handleOrderCreated(ctx context.Context, msg *models.Message) error {
var order models.OrderMessage
data, _ := json.Marshal(msg.Payload)
if err := json.Unmarshal(data, &order); err != nil {
return fmt.Errorf("unmarshal order: %w", err)
}
log.Printf("Processing order: id=%s, customer=%s, amount=%.2f",
order.OrderID, order.CustomerID, order.TotalAmount)
// Simulate processing
time.Sleep(h.processingTime)
// Business logic:
// - Validate order
// - Check inventory
// - Reserve stock
// - Create payment intent
return nil
}
func (h *OrderHandler) handleOrderPayment(ctx context.Context, msg *models.Message) error {
log.Printf("Processing payment for order: %s", msg.ID)
time.Sleep(h.processingTime)
return nil
}
func (h *OrderHandler) handleOrderShipped(ctx context.Context, msg *models.Message) error {
log.Printf("Order shipped: %s", msg.ID)
time.Sleep(h.processingTime)
return nil
}
// CircuitBreakerHandler wraps handler with circuit breaker
type CircuitBreakerHandler struct {
handler MessageHandler
failureCount int
failureLimit int
resetTimeout time.Duration
state string // "closed", "open", "half-open"
lastFailure time.Time
mu sync.Mutex
}
func NewCircuitBreakerHandler(handler MessageHandler, failureLimit int, resetTimeout time.Duration) *CircuitBreakerHandler {
return &CircuitBreakerHandler{
handler: handler,
failureLimit: failureLimit,
resetTimeout: resetTimeout,
state: "closed",
}
}
func (h *CircuitBreakerHandler) Handle(ctx context.Context, msg *models.Message) error {
h.mu.Lock()
defer h.mu.Unlock()
// Check if circuit is open
if h.state == "open" {
if time.Since(h.lastFailure) > h.resetTimeout {
h.state = "half-open"
h.failureCount = 0
} else {
return fmt.Errorf("circuit breaker open")
}
}
// Try to process message
err := h.handler.Handle(ctx, msg)
if err != nil {
h.failureCount++
h.lastFailure = time.Now()
if h.failureCount >= h.failureLimit {
h.state = "open"
log.Printf("Circuit breaker opened after %d failures", h.failureCount)
}
return err
}
// Success - close circuit if half-open
if h.state == "half-open" {
h.state = "closed"
h.failureCount = 0
log.Println("Circuit breaker closed")
}
return nil
}
6. Producer Application
// cmd/producer/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
"rabbitmq-service/internal/broker"
"rabbitmq-service/internal/models"
"rabbitmq-service/internal/publisher"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create connection pool
pool, err := broker.NewConnectionPool(broker.Config{
URL: "amqp://guest:guest@localhost:5672/",
ReconnectDelay: 2 * time.Second,
MaxReconnects: 5,
})
if err != nil {
log.Fatal(err)
}
defer pool.Close()
// Create publisher
pub, err := publisher.NewPublisher(pool, publisher.Config{
Exchange: "orders",
ExchangeType: "topic",
Durable: true,
AutoDelete: false,
Confirms: true,
})
if err != nil {
log.Fatal(err)
}
defer pub.Close()
// Start producing
go produceOrders(ctx, pub)
// Wait for shutdown
<-sigChan
log.Println("Shutting down producer...")
cancel()
}
func produceOrders(ctx context.Context, pub *publisher.Publisher) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
order := &models.OrderMessage{
OrderID: uuid.New().String(),
CustomerID: fmt.Sprintf("cust-%d", time.Now().Unix()%100),
TotalAmount: float64(time.Now().Unix()%500) + 9.99,
Status: "pending",
CreatedAt: time.Now(),
Items: []models.Item{
{
ProductID: "prod-001",
Quantity: 2,
Price: 49.99,
},
},
}
msg := models.NewMessage("order.created", order)
msg.Priority = 5
routingKey := "order.created"
if err := pub.Publish(ctx, routingKey, msg); err != nil {
log.Printf("Failed to publish: %v", err)
} else {
log.Printf("Published order: %s", order.OrderID)
}
}
}
}
7. Consumer Application
// cmd/consumer/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"rabbitmq-service/internal/broker"
"rabbitmq-service/internal/consumer"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create connection pool
pool, err := broker.NewConnectionPool(broker.Config{
URL: "amqp://guest:guest@localhost:5672/",
ReconnectDelay: 2 * time.Second,
MaxReconnects: 5,
})
if err != nil {
log.Fatal(err)
}
defer pool.Close()
// Create handler with circuit breaker
baseHandler := consumer.NewOrderHandler(100 * time.Millisecond)
handler := consumer.NewCircuitBreakerHandler(baseHandler, 5, 30*time.Second)
// Create consumer
cons, err := consumer.NewConsumer(pool, consumer.Config{
Queue: "order-processing",
Exchange: "orders",
RoutingKeys: []string{"order.#"},
Durable: true,
AutoDelete: false,
Exclusive: false,
PrefetchCount: 10,
Workers: 3,
AutoAck: false,
}, handler)
if err != nil {
log.Fatal(err)
}
defer cons.Close()
// Start consuming
go func() {
if err := cons.Start(ctx); err != nil {
log.Printf("Consumer error: %v", err)
}
}()
// Wait for shutdown
<-sigChan
log.Println("Shutting down consumer...")
cancel()
time.Sleep(2 * time.Second) // Grace period
}
Advanced Patterns
1. Dead Letter Exchange (DLX)
// Configure queue with DLX
func declareQueueWithDLX(ch *amqp.Channel, queueName, dlxName string) error {
args := amqp.Table{
"x-dead-letter-exchange": dlxName,
"x-dead-letter-routing-key": "dlq",
"x-message-ttl": 60000, // 60 seconds
}
_, err := ch.QueueDeclare(
queueName,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
args,
)
return err
}
// Declare DLX and DLQ
func setupDLX(ch *amqp.Channel) error {
// Declare DLX
err := ch.ExchangeDeclare(
"dlx",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Declare DLQ
_, err = ch.QueueDeclare(
"dlq",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Bind DLQ to DLX
return ch.QueueBind("dlq", "dlq", "dlx", false, nil)
}
2. Priority Queues
// Declare priority queue
func declarePriorityQueue(ch *amqp.Channel, queueName string, maxPriority uint8) error {
args := amqp.Table{
"x-max-priority": maxPriority,
}
_, err := ch.QueueDeclare(
queueName,
true,
false,
false,
false,
args,
)
return err
}
// Publish with priority
func publishWithPriority(ch *amqp.Channel, exchange, routingKey string, body []byte, priority uint8) error {
return ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
Priority: priority, // 0-9 (higher is more important)
},
)
}
3. RPC Pattern
// RPC Client
type RPCClient struct {
ch *amqp.Channel
replyQueue string
pendingCalls map[string]chan []byte
mu sync.Mutex
}
func NewRPCClient(ch *amqp.Channel) (*RPCClient, error) {
// Declare exclusive reply queue
q, err := ch.QueueDeclare(
"", // generate name
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil,
)
if err != nil {
return nil, err
}
client := &RPCClient{
ch: ch,
replyQueue: q.Name,
pendingCalls: make(map[string]chan []byte),
}
// Start consuming replies
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return nil, err
}
go client.handleReplies(msgs)
return client, nil
}
func (c *RPCClient) Call(ctx context.Context, exchange, routingKey string, body []byte, timeout time.Duration) ([]byte, error) {
corrID := uuid.New().String()
replyChan := make(chan []byte, 1)
c.mu.Lock()
c.pendingCalls[corrID] = replyChan
c.mu.Unlock()
defer func() {
c.mu.Lock()
delete(c.pendingCalls, corrID)
c.mu.Unlock()
}()
err := c.ch.PublishWithContext(
ctx,
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
ReplyTo: c.replyQueue,
CorrelationId: corrID,
Expiration: fmt.Sprintf("%d", timeout.Milliseconds()),
},
)
if err != nil {
return nil, err
}
select {
case reply := <-replyChan:
return reply, nil
case <-time.After(timeout):
return nil, fmt.Errorf("rpc timeout")
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (c *RPCClient) handleReplies(msgs <-chan amqp.Delivery) {
for msg := range msgs {
c.mu.Lock()
if ch, ok := c.pendingCalls[msg.CorrelationId]; ok {
ch <- msg.Body
}
c.mu.Unlock()
}
}
// RPC Server
func handleRPCRequest(ch *amqp.Channel, queueName string, handler func([]byte) ([]byte, error)) error {
msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
return err
}
for msg := range msgs {
response, err := handler(msg.Body)
if err != nil {
log.Printf("RPC handler error: %v", err)
msg.Nack(false, false)
continue
}
err = ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: response,
CorrelationId: msg.CorrelationId,
},
)
if err != nil {
log.Printf("Failed to send RPC reply: %v", err)
}
msg.Ack(false)
}
return nil
}
4. Consistent Hashing Exchange
// Use consistent hashing for load distribution
func declareConsistentHashExchange(ch *amqp.Channel, exchangeName string) error {
return ch.ExchangeDeclare(
exchangeName,
"x-consistent-hash", // requires rabbitmq_consistent_hash_exchange plugin
true,
false,
false,
false,
nil,
)
}
// Bind queues with weights
func bindQueueWithWeight(ch *amqp.Channel, queueName, exchangeName string, weight int) error {
return ch.QueueBind(
queueName,
fmt.Sprintf("%d", weight), // routing key is the weight
exchangeName,
false,
nil,
)
}
// Publish with routing key for hashing
func publishToConsistentHash(ch *amqp.Channel, exchange, hashKey string, body []byte) error {
return ch.Publish(
exchange,
hashKey, // e.g., user ID for user-specific routing
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
},
)
}
5. Shovel Pattern (Message Forwarding)
// Configure shovel to forward messages between exchanges/queues
func configureShovel(managementURL, name, srcExchange, destExchange string) error {
shovelConfig := map[string]interface{}{
"value": map[string]interface{}{
"src-uri": "amqp://localhost",
"src-exchange": srcExchange,
"dest-uri": "amqp://remote-server",
"dest-exchange": destExchange,
},
}
// Use RabbitMQ management API to configure shovel
// Implementation depends on management HTTP API client
return nil
}
6. Metrics Collection
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
messagesPublished = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rabbitmq_messages_published_total",
Help: "Total messages published",
},
[]string{"exchange", "routing_key", "status"},
)
messagesConsumed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "rabbitmq_messages_consumed_total",
Help: "Total messages consumed",
},
[]string{"queue", "status"},
)
messageProcessingTime = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "rabbitmq_message_processing_seconds",
Help: "Message processing time",
Buckets: prometheus.DefBuckets,
},
[]string{"queue", "message_type"},
)
queueDepth = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rabbitmq_queue_depth",
Help: "Current queue depth",
},
[]string{"queue"},
)
)
// Track metrics in handler
type MetricsHandler struct {
handler MessageHandler
queueName string
}
func (h *MetricsHandler) Handle(ctx context.Context, msg *models.Message) error {
start := time.Now()
err := h.handler.Handle(ctx, msg)
duration := time.Since(start).Seconds()
messageProcessingTime.WithLabelValues(h.queueName, msg.Type).Observe(duration)
status := "success"
if err != nil {
status = "error"
}
messagesConsumed.WithLabelValues(h.queueName, status).Inc()
return err
}
Dependencies
// go.mod
module rabbitmq-service
go 1.21
require (
github.com/rabbitmq/amqp091-go v1.9.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.0
)
Docker Compose Setup
# deployments/docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.13-management-alpine
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 10s
retries: 5
volumes:
rabbitmq_data:
# deployments/rabbitmq/rabbitmq.conf
# Management
management.load_definitions = /etc/rabbitmq/definitions.json
# Performance
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
# Clustering (if needed)
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
Best Practices
1. Message Durability
Persistent Messages: Mark messages as persistent for durability.
Durable Queues: Declare queues as durable to survive broker restarts.
Publisher Confirms: Enable confirms for guaranteed delivery.
2. Error Handling
Dead Letter Queues: Route failed messages to DLQ for analysis.
Retry Logic: Implement exponential backoff for transient failures.
Idempotency: Design handlers to safely process duplicate messages.
3. Performance Optimization
Prefetch Count: Set appropriate prefetch to balance load.
ch.Qos(10, 0, false) // Prefetch 10 messages
Worker Pools: Use multiple workers for parallel processing.
Batch Operations: Batch publishes when possible.
4. Monitoring
Queue Metrics: Monitor queue depth, consumer count, message rates.
Consumer Performance: Track processing time, error rates.
Connection Health: Monitor connection status and reconnections.
5. Security
vhosts: Use virtual hosts to isolate environments.
User Permissions: Configure granular permissions per user.
TLS: Enable TLS for encrypted communication.
6. Scalability
Clustering: Use RabbitMQ clustering for high availability.
Queue Sharding: Distribute load across multiple queues.
Federation/Shovel: Connect multiple RabbitMQ instances.
When to Use RabbitMQ
✅ Perfect for:
- Task queues and background job processing
- Request/reply patterns (RPC)
- Complex routing scenarios
- When you need message delivery guarantees
- Microservices communication
❌ Not ideal for:
- Ultra-high throughput streaming (use Kafka)
- Simple pub/sub with minimal features (use Redis)
- Stateful stream processing
- Real-time bidirectional communication (use WebSockets)
Conclusion
RabbitMQ with AMQP provides a powerful, flexible messaging system with excellent delivery guarantees and routing capabilities. The Go amqp091 library offers a robust client for building production-ready message-driven applications.
Key takeaways:
- Use exchanges strategically for routing flexibility
- Implement proper error handling with DLQs
- Enable publisher confirms for reliability
- Monitor queue depth and consumer performance
- Design for idempotency and fault tolerance
For enterprise deployments, consider RabbitMQ clustering, federation, and plugins like delayed message exchange for advanced use cases.