{{series_nav(current_post=8)}}

RabbitMQ is one of the most popular open-source message brokers, implementing the Advanced Message Queueing Protocol (AMQP). It acts as a reliable intermediary for asynchronous message passing between applications, providing features like message routing, queuing, delivery guarantees, and complex routing patterns.

What is RabbitMQ?

RabbitMQ is a message broker that accepts and forwards messages. Unlike direct communication between services, RabbitMQ provides a buffer that decouples producers from consumers, allowing them to operate independently and scale separately.

Core Concepts

Exchanges: Routing components that receive messages from producers and route them to queues based on routing rules.

Queues: Buffers that store messages until consumers retrieve them.

Bindings: Rules that link exchanges to queues, defining how messages should be routed.

Routing Keys: Message attributes used by exchanges to determine routing destinations.

Virtual Hosts (vhosts): Isolated namespaces for organizing exchanges, queues, and bindings.

Exchange Types

Direct: Routes messages with exact routing key matches.

Fanout: Broadcasts messages to all bound queues, ignoring routing keys.

Topic: Routes based on pattern matching with routing keys.

Headers: Routes based on message header attributes instead of routing keys.

Architecture

graph TB subgraph "Producers" P1[Producer 1] P2[Producer 2] P3[Producer 3] end subgraph "RabbitMQ Broker" subgraph "Virtual Host: /" EX1[Exchange: direct
Type: Direct] EX2[Exchange: logs
Type: Fanout] EX3[Exchange: events
Type: Topic] Q1[Queue: orders] Q2[Queue: notifications] Q3[Queue: analytics] Q4[Queue: errors.critical] Q5[Queue: errors.warning] DLX[Exchange: DLX
Dead Letter] DLQ[Queue: DLQ] EX1 -->|routing_key: order| Q1 EX1 -->|routing_key: notify| Q2 EX2 --> Q2 EX2 --> Q3 EX3 -->|error.critical| Q4 EX3 -->|error.warning| Q5 Q1 -.->|rejected/expired| DLX DLX --> DLQ end end subgraph "Consumers" C1[Consumer 1
Order Processor] C2[Consumer 2
Notification Service] C3[Consumer 3
Analytics] C4[Consumer 4
Error Handler] end P1 -->|Publish order| EX1 P2 -->|Publish log| EX2 P3 -->|Publish error| EX3 Q1 --> C1 Q2 --> C2 Q3 --> C3 Q4 --> C4

Message Flow

sequenceDiagram participant P as Producer participant EX as Exchange participant Q as Queue participant C as Consumer participant DLQ as Dead Letter Queue Note over P,DLQ: Message Publishing P->>EX: Publish (routing_key, message, properties) EX->>EX: Match Routing Rules EX->>Q: Route to Queue(s) Q->>Q: Persist Message EX->>P: ACK (publisher confirms) Note over C,Q: Message Consumption C->>Q: Basic.Consume (prefetch_count) Q->>C: Deliver Message C->>C: Process Message alt Success C->>Q: ACK (acknowledge) Q->>Q: Remove Message else Failure - Requeue C->>Q: NACK (requeue=true) Q->>Q: Requeue Message else Failure - Dead Letter C->>Q: NACK (requeue=false) Q->>DLQ: Send to DLQ end Note over Q: Message Expiration Q->>Q: TTL Exceeded Q->>DLQ: Send to DLQ

Real-World Use Cases

1. Task Queues

Distribute time-consuming tasks across multiple workers for parallel processing.

Perfect for: Image processing, video encoding, report generation, email sending

2. Work Queue Patterns

Implement competing consumers pattern where multiple workers process tasks from a shared queue.

Perfect for: Background jobs, batch processing, data imports

3. Pub/Sub Messaging

Broadcast events to multiple subscribers without coupling producers to consumers.

Perfect for: Event notifications, real-time updates, activity feeds

4. Request/Reply (RPC)

Implement synchronous RPC-style communication over async messaging.

Perfect for: Microservice communication, API gateways, service orchestration

5. Priority Queues

Process high-priority messages before lower-priority ones.

Perfect for: SLA-sensitive operations, urgent notifications, critical alerts

6. Delayed Messages

Schedule message delivery for future processing.

Perfect for: Scheduled tasks, reminder systems, retry mechanisms

7. Message Routing

Route messages to different handlers based on content or routing keys.

Perfect for: Multi-tenant systems, workflow engines, event routing

8. Dead Letter Handling

Capture and analyze failed messages for debugging and manual intervention.

Perfect for: Error analysis, poison message handling, audit trails

Project Structure

rabbitmq-service/
├── cmd/
│   ├── producer/
│   │   └── main.go           # Producer application
│   └── consumer/
│       └── main.go           # Consumer application
├── internal/
│   ├── broker/
│   │   ├── connection.go     # Connection pool
│   │   └── channel.go        # Channel management
│   ├── publisher/
│   │   ├── publisher.go      # Message publisher
│   │   └── confirms.go       # Publisher confirms
│   ├── consumer/
│   │   ├── consumer.go       # Message consumer
│   │   ├── worker.go         # Worker pool
│   │   └── handler.go        # Message handler
│   ├── models/
│   │   └── message.go        # Message types
│   └── config/
│       └── config.go         # Configuration
├── pkg/
│   ├── middleware/
│   │   ├── retry.go          # Retry middleware
│   │   ├── circuit.go        # Circuit breaker
│   │   └── metrics.go        # Metrics
│   └── patterns/
│       ├── rpc.go            # RPC pattern
│       ├── priority.go       # Priority queue
│       └── delayed.go        # Delayed messaging
├── deployments/
│   ├── docker-compose.yml    # RabbitMQ setup
│   └── rabbitmq/
│       └── definitions.json  # Queue definitions
├── go.mod
└── go.sum

Implementation

1. Message Models

// internal/models/message.go
package models

import (
    "encoding/json"
    "time"

    "github.com/google/uuid"
)

// Message represents a generic message
type Message struct {
    ID          string                 `json:"id"`
    Type        string                 `json:"type"`
    Timestamp   time.Time              `json:"timestamp"`
    Payload     interface{}            `json:"payload"`
    Metadata    map[string]interface{} `json:"metadata"`
    Priority    uint8                  `json:"priority"`    // 0-9
    Retry       int                    `json:"retry"`
    MaxRetries  int                    `json:"max_retries"`
}

// NewMessage creates a new message
func NewMessage(msgType string, payload interface{}) *Message {
    return &Message{
        ID:        uuid.New().String(),
        Type:      msgType,
        Timestamp: time.Now(),
        Payload:   payload,
        Metadata:  make(map[string]interface{}),
        Priority:  0,
    }
}

// ToJSON serializes message to JSON
func (m *Message) ToJSON() ([]byte, error) {
    return json.Marshal(m)
}

// FromJSON deserializes message from JSON
func FromJSON(data []byte) (*Message, error) {
    var msg Message
    err := json.Unmarshal(data, &msg)
    return &msg, err
}

// OrderMessage represents an order
type OrderMessage struct {
    OrderID     string    `json:"order_id"`
    CustomerID  string    `json:"customer_id"`
    Items       []Item    `json:"items"`
    TotalAmount float64   `json:"total_amount"`
    Status      string    `json:"status"`
    CreatedAt   time.Time `json:"created_at"`
}

type Item struct {
    ProductID string  `json:"product_id"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

2. Connection Management

// internal/broker/connection.go
package broker

import (
    "fmt"
    "log"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

// ConnectionPool manages RabbitMQ connections
type ConnectionPool struct {
    url        string
    conn       *amqp.Connection
    mu         sync.RWMutex
    reconnect  chan bool
    closed     bool
}

// Config holds connection configuration
type Config struct {
    URL             string
    ReconnectDelay  time.Duration
    MaxReconnects   int
}

// NewConnectionPool creates a new connection pool
func NewConnectionPool(cfg Config) (*ConnectionPool, error) {
    pool := &ConnectionPool{
        url:       cfg.URL,
        reconnect: make(chan bool),
    }

    conn, err := pool.connect()
    if err != nil {
        return nil, err
    }

    pool.conn = conn

    // Monitor connection and reconnect if needed
    go pool.monitorConnection(cfg.ReconnectDelay, cfg.MaxReconnects)

    return pool, nil
}

func (p *ConnectionPool) connect() (*amqp.Connection, error) {
    conn, err := amqp.Dial(p.url)
    if err != nil {
        return nil, fmt.Errorf("dial: %w", err)
    }

    log.Println("Connected to RabbitMQ")
    return conn, nil
}

func (p *ConnectionPool) monitorConnection(delay time.Duration, maxRetries int) {
    for {
        reason, ok := <-p.conn.NotifyClose(make(chan *amqp.Error))
        if !ok {
            log.Println("Connection closed normally")
            return
        }

        log.Printf("Connection closed: %v, reconnecting...", reason)

        retries := 0
        for retries < maxRetries {
            time.Sleep(delay * time.Duration(1<<uint(retries)))

            conn, err := p.connect()
            if err == nil {
                p.mu.Lock()
                p.conn = conn
                p.mu.Unlock()

                p.reconnect <- true
                break
            }

            retries++
            log.Printf("Reconnection attempt %d/%d failed: %v", retries, maxRetries, err)
        }

        if retries >= maxRetries {
            log.Fatal("Max reconnection attempts exceeded")
        }
    }
}

// GetConnection returns the current connection
func (p *ConnectionPool) GetConnection() *amqp.Connection {
    p.mu.RLock()
    defer p.mu.RUnlock()
    return p.conn
}

// CreateChannel creates a new channel
func (p *ConnectionPool) CreateChannel() (*amqp.Channel, error) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    ch, err := p.conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("create channel: %w", err)
    }

    return ch, nil
}

// Close closes the connection
func (p *ConnectionPool) Close() error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        return nil
    }

    p.closed = true
    if p.conn != nil {
        return p.conn.Close()
    }

    return nil
}

3. Publisher Implementation

// internal/publisher/publisher.go
package publisher

import (
    "context"
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
    "rabbitmq-service/internal/broker"
    "rabbitmq-service/internal/models"
)

// Publisher publishes messages to RabbitMQ
type Publisher struct {
    pool     *broker.ConnectionPool
    ch       *amqp.Channel
    confirms chan amqp.Confirmation
    exchange string
}

// Config holds publisher configuration
type Config struct {
    Exchange         string
    ExchangeType     string
    Durable          bool
    AutoDelete       bool
    Confirms         bool   // Publisher confirms
}

// NewPublisher creates a new publisher
func NewPublisher(pool *broker.ConnectionPool, cfg Config) (*Publisher, error) {
    ch, err := pool.CreateChannel()
    if err != nil {
        return nil, err
    }

    // Declare exchange
    err = ch.ExchangeDeclare(
        cfg.Exchange,
        cfg.ExchangeType,
        cfg.Durable,
        cfg.AutoDelete,
        false, // internal
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        return nil, fmt.Errorf("declare exchange: %w", err)
    }

    pub := &Publisher{
        pool:     pool,
        ch:       ch,
        exchange: cfg.Exchange,
    }

    // Enable publisher confirms
    if cfg.Confirms {
        if err := ch.Confirm(false); err != nil {
            return nil, fmt.Errorf("enable confirms: %w", err)
        }

        pub.confirms = ch.NotifyPublish(make(chan amqp.Confirmation, 100))
    }

    return pub, nil
}

// Publish publishes a message
func (p *Publisher) Publish(ctx context.Context, routingKey string, msg *models.Message) error {
    body, err := msg.ToJSON()
    if err != nil {
        return fmt.Errorf("serialize message: %w", err)
    }

    publishing := amqp.Publishing{
        ContentType:  "application/json",
        Body:         body,
        DeliveryMode: amqp.Persistent,
        MessageId:    msg.ID,
        Timestamp:    time.Now(),
        Priority:     msg.Priority,
        Headers:      amqp.Table{
            "x-message-type": msg.Type,
        },
    }

    // Set TTL if specified
    if ttl, ok := msg.Metadata["ttl"].(time.Duration); ok {
        publishing.Expiration = fmt.Sprintf("%d", ttl.Milliseconds())
    }

    err = p.ch.PublishWithContext(
        ctx,
        p.exchange,
        routingKey,
        false, // mandatory
        false, // immediate
        publishing,
    )

    if err != nil {
        return fmt.Errorf("publish: %w", err)
    }

    // Wait for confirmation if enabled
    if p.confirms != nil {
        select {
        case confirm := <-p.confirms:
            if !confirm.Ack {
                return fmt.Errorf("message not confirmed")
            }
            log.Printf("Message confirmed: %s", msg.ID)
        case <-time.After(5 * time.Second):
            return fmt.Errorf("confirmation timeout")
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    log.Printf("Published message: id=%s, routing_key=%s", msg.ID, routingKey)
    return nil
}

// PublishWithDelay publishes a message with delay (requires rabbitmq_delayed_message_exchange plugin)
func (p *Publisher) PublishWithDelay(ctx context.Context, routingKey string, msg *models.Message, delay time.Duration) error {
    body, err := msg.ToJSON()
    if err != nil {
        return fmt.Errorf("serialize message: %w", err)
    }

    publishing := amqp.Publishing{
        ContentType:  "application/json",
        Body:         body,
        DeliveryMode: amqp.Persistent,
        MessageId:    msg.ID,
        Timestamp:    time.Now(),
        Headers: amqp.Table{
            "x-delay":        delay.Milliseconds(),
            "x-message-type": msg.Type,
        },
    }

    return p.ch.PublishWithContext(ctx, p.exchange, routingKey, false, false, publishing)
}

// Close closes the publisher
func (p *Publisher) Close() error {
    if p.ch != nil {
        return p.ch.Close()
    }
    return nil
}

4. Consumer Implementation

// internal/consumer/consumer.go
package consumer

import (
    "context"
    "fmt"
    "log"
    "sync"

    amqp "github.com/rabbitmq/amqp091-go"
    "rabbitmq-service/internal/broker"
    "rabbitmq-service/internal/models"
)

// Consumer consumes messages from RabbitMQ
type Consumer struct {
    pool     *broker.ConnectionPool
    ch       *amqp.Channel
    queue    string
    handler  MessageHandler
    workers  int
}

// MessageHandler processes messages
type MessageHandler interface {
    Handle(ctx context.Context, msg *models.Message) error
}

// Config holds consumer configuration
type Config struct {
    Queue          string
    Exchange       string
    RoutingKeys    []string
    Durable        bool
    AutoDelete     bool
    Exclusive      bool
    PrefetchCount  int
    Workers        int
    AutoAck        bool
}

// NewConsumer creates a new consumer
func NewConsumer(pool *broker.ConnectionPool, cfg Config, handler MessageHandler) (*Consumer, error) {
    ch, err := pool.CreateChannel()
    if err != nil {
        return nil, err
    }

    // Declare queue
    _, err = ch.QueueDeclare(
        cfg.Queue,
        cfg.Durable,
        cfg.AutoDelete,
        cfg.Exclusive,
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        return nil, fmt.Errorf("declare queue: %w", err)
    }

    // Bind queue to exchange
    for _, routingKey := range cfg.RoutingKeys {
        err = ch.QueueBind(
            cfg.Queue,
            routingKey,
            cfg.Exchange,
            false, // no-wait
            nil,   // args
        )
        if err != nil {
            return nil, fmt.Errorf("bind queue: %w", err)
        }
    }

    // Set QoS (prefetch)
    err = ch.Qos(
        cfg.PrefetchCount,
        0,     // prefetch size
        false, // global
    )
    if err != nil {
        return nil, fmt.Errorf("set qos: %w", err)
    }

    return &Consumer{
        pool:    pool,
        ch:      ch,
        queue:   cfg.Queue,
        handler: handler,
        workers: cfg.Workers,
    }, nil
}

// Start starts consuming messages
func (c *Consumer) Start(ctx context.Context) error {
    msgs, err := c.ch.Consume(
        c.queue,
        "",    // consumer tag
        false, // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        return fmt.Errorf("consume: %w", err)
    }

    log.Printf("Starting consumer with %d workers for queue: %s", c.workers, c.queue)

    var wg sync.WaitGroup
    for i := 0; i < c.workers; i++ {
        wg.Add(1)
        go c.worker(ctx, &wg, msgs, i)
    }

    wg.Wait()
    return nil
}

func (c *Consumer) worker(ctx context.Context, wg *sync.WaitGroup, msgs <-chan amqp.Delivery, id int) {
    defer wg.Done()

    log.Printf("Worker %d started", id)

    for {
        select {
        case <-ctx.Done():
            log.Printf("Worker %d stopped", id)
            return
        case delivery, ok := <-msgs:
            if !ok {
                log.Printf("Worker %d: channel closed", id)
                return
            }

            if err := c.processMessage(ctx, delivery); err != nil {
                log.Printf("Worker %d: error processing message: %v", id, err)

                // Check retry count
                retryCount := int32(0)
                if val, ok := delivery.Headers["x-retry-count"].(int32); ok {
                    retryCount = val
                }

                if retryCount < 3 {
                    // Requeue with incremented retry count
                    delivery.Nack(false, true)
                } else {
                    // Max retries exceeded, reject to DLQ
                    log.Printf("Max retries exceeded for message: %s", delivery.MessageId)
                    delivery.Nack(false, false)
                }
            } else {
                delivery.Ack(false)
                log.Printf("Worker %d: message processed: %s", id, delivery.MessageId)
            }
        }
    }
}

func (c *Consumer) processMessage(ctx context.Context, delivery amqp.Delivery) error {
    msg, err := models.FromJSON(delivery.Body)
    if err != nil {
        return fmt.Errorf("deserialize message: %w", err)
    }

    // Add delivery metadata
    msg.Metadata["routing_key"] = delivery.RoutingKey
    msg.Metadata["exchange"] = delivery.Exchange
    msg.Metadata["redelivered"] = delivery.Redelivered

    return c.handler.Handle(ctx, msg)
}

// Close closes the consumer
func (c *Consumer) Close() error {
    if c.ch != nil {
        return c.ch.Close()
    }
    return nil
}

5. Message Handler

// internal/consumer/handler.go
package consumer

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "rabbitmq-service/internal/models"
)

// OrderHandler processes order messages
type OrderHandler struct {
    processingTime time.Duration
}

func NewOrderHandler(processingTime time.Duration) *OrderHandler {
    return &OrderHandler{processingTime: processingTime}
}

// Handle processes a message
func (h *OrderHandler) Handle(ctx context.Context, msg *models.Message) error {
    log.Printf("Handling message: type=%s, id=%s", msg.Type, msg.ID)

    switch msg.Type {
    case "order.created":
        return h.handleOrderCreated(ctx, msg)
    case "order.payment":
        return h.handleOrderPayment(ctx, msg)
    case "order.shipped":
        return h.handleOrderShipped(ctx, msg)
    default:
        log.Printf("Unknown message type: %s", msg.Type)
        return nil
    }
}

func (h *OrderHandler) handleOrderCreated(ctx context.Context, msg *models.Message) error {
    var order models.OrderMessage
    data, _ := json.Marshal(msg.Payload)
    if err := json.Unmarshal(data, &order); err != nil {
        return fmt.Errorf("unmarshal order: %w", err)
    }

    log.Printf("Processing order: id=%s, customer=%s, amount=%.2f",
        order.OrderID, order.CustomerID, order.TotalAmount)

    // Simulate processing
    time.Sleep(h.processingTime)

    // Business logic:
    // - Validate order
    // - Check inventory
    // - Reserve stock
    // - Create payment intent

    return nil
}

func (h *OrderHandler) handleOrderPayment(ctx context.Context, msg *models.Message) error {
    log.Printf("Processing payment for order: %s", msg.ID)
    time.Sleep(h.processingTime)
    return nil
}

func (h *OrderHandler) handleOrderShipped(ctx context.Context, msg *models.Message) error {
    log.Printf("Order shipped: %s", msg.ID)
    time.Sleep(h.processingTime)
    return nil
}

// CircuitBreakerHandler wraps handler with circuit breaker
type CircuitBreakerHandler struct {
    handler       MessageHandler
    failureCount  int
    failureLimit  int
    resetTimeout  time.Duration
    state         string // "closed", "open", "half-open"
    lastFailure   time.Time
    mu            sync.Mutex
}

func NewCircuitBreakerHandler(handler MessageHandler, failureLimit int, resetTimeout time.Duration) *CircuitBreakerHandler {
    return &CircuitBreakerHandler{
        handler:      handler,
        failureLimit: failureLimit,
        resetTimeout: resetTimeout,
        state:        "closed",
    }
}

func (h *CircuitBreakerHandler) Handle(ctx context.Context, msg *models.Message) error {
    h.mu.Lock()
    defer h.mu.Unlock()

    // Check if circuit is open
    if h.state == "open" {
        if time.Since(h.lastFailure) > h.resetTimeout {
            h.state = "half-open"
            h.failureCount = 0
        } else {
            return fmt.Errorf("circuit breaker open")
        }
    }

    // Try to process message
    err := h.handler.Handle(ctx, msg)
    if err != nil {
        h.failureCount++
        h.lastFailure = time.Now()

        if h.failureCount >= h.failureLimit {
            h.state = "open"
            log.Printf("Circuit breaker opened after %d failures", h.failureCount)
        }

        return err
    }

    // Success - close circuit if half-open
    if h.state == "half-open" {
        h.state = "closed"
        h.failureCount = 0
        log.Println("Circuit breaker closed")
    }

    return nil
}

6. Producer Application

// cmd/producer/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/google/uuid"
    "rabbitmq-service/internal/broker"
    "rabbitmq-service/internal/models"
    "rabbitmq-service/internal/publisher"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create connection pool
    pool, err := broker.NewConnectionPool(broker.Config{
        URL:            "amqp://guest:guest@localhost:5672/",
        ReconnectDelay: 2 * time.Second,
        MaxReconnects:  5,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()

    // Create publisher
    pub, err := publisher.NewPublisher(pool, publisher.Config{
        Exchange:     "orders",
        ExchangeType: "topic",
        Durable:      true,
        AutoDelete:   false,
        Confirms:     true,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pub.Close()

    // Start producing
    go produceOrders(ctx, pub)

    // Wait for shutdown
    <-sigChan
    log.Println("Shutting down producer...")
    cancel()
}

func produceOrders(ctx context.Context, pub *publisher.Publisher) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            order := &models.OrderMessage{
                OrderID:     uuid.New().String(),
                CustomerID:  fmt.Sprintf("cust-%d", time.Now().Unix()%100),
                TotalAmount: float64(time.Now().Unix()%500) + 9.99,
                Status:      "pending",
                CreatedAt:   time.Now(),
                Items: []models.Item{
                    {
                        ProductID: "prod-001",
                        Quantity:  2,
                        Price:     49.99,
                    },
                },
            }

            msg := models.NewMessage("order.created", order)
            msg.Priority = 5

            routingKey := "order.created"
            if err := pub.Publish(ctx, routingKey, msg); err != nil {
                log.Printf("Failed to publish: %v", err)
            } else {
                log.Printf("Published order: %s", order.OrderID)
            }
        }
    }
}

7. Consumer Application

// cmd/consumer/main.go
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "rabbitmq-service/internal/broker"
    "rabbitmq-service/internal/consumer"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    // Create connection pool
    pool, err := broker.NewConnectionPool(broker.Config{
        URL:            "amqp://guest:guest@localhost:5672/",
        ReconnectDelay: 2 * time.Second,
        MaxReconnects:  5,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()

    // Create handler with circuit breaker
    baseHandler := consumer.NewOrderHandler(100 * time.Millisecond)
    handler := consumer.NewCircuitBreakerHandler(baseHandler, 5, 30*time.Second)

    // Create consumer
    cons, err := consumer.NewConsumer(pool, consumer.Config{
        Queue:         "order-processing",
        Exchange:      "orders",
        RoutingKeys:   []string{"order.#"},
        Durable:       true,
        AutoDelete:    false,
        Exclusive:     false,
        PrefetchCount: 10,
        Workers:       3,
        AutoAck:       false,
    }, handler)
    if err != nil {
        log.Fatal(err)
    }
    defer cons.Close()

    // Start consuming
    go func() {
        if err := cons.Start(ctx); err != nil {
            log.Printf("Consumer error: %v", err)
        }
    }()

    // Wait for shutdown
    <-sigChan
    log.Println("Shutting down consumer...")
    cancel()

    time.Sleep(2 * time.Second) // Grace period
}

Advanced Patterns

1. Dead Letter Exchange (DLX)

// Configure queue with DLX
func declareQueueWithDLX(ch *amqp.Channel, queueName, dlxName string) error {
    args := amqp.Table{
        "x-dead-letter-exchange":    dlxName,
        "x-dead-letter-routing-key": "dlq",
        "x-message-ttl":             60000, // 60 seconds
    }

    _, err := ch.QueueDeclare(
        queueName,
        true,  // durable
        false, // auto-delete
        false, // exclusive
        false, // no-wait
        args,
    )

    return err
}

// Declare DLX and DLQ
func setupDLX(ch *amqp.Channel) error {
    // Declare DLX
    err := ch.ExchangeDeclare(
        "dlx",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Declare DLQ
    _, err = ch.QueueDeclare(
        "dlq",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Bind DLQ to DLX
    return ch.QueueBind("dlq", "dlq", "dlx", false, nil)
}

2. Priority Queues

// Declare priority queue
func declarePriorityQueue(ch *amqp.Channel, queueName string, maxPriority uint8) error {
    args := amqp.Table{
        "x-max-priority": maxPriority,
    }

    _, err := ch.QueueDeclare(
        queueName,
        true,
        false,
        false,
        false,
        args,
    )

    return err
}

// Publish with priority
func publishWithPriority(ch *amqp.Channel, exchange, routingKey string, body []byte, priority uint8) error {
    return ch.Publish(
        exchange,
        routingKey,
        false,
        false,
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent,
            Priority:     priority, // 0-9 (higher is more important)
        },
    )
}

3. RPC Pattern

// RPC Client
type RPCClient struct {
    ch          *amqp.Channel
    replyQueue  string
    pendingCalls map[string]chan []byte
    mu          sync.Mutex
}

func NewRPCClient(ch *amqp.Channel) (*RPCClient, error) {
    // Declare exclusive reply queue
    q, err := ch.QueueDeclare(
        "",    // generate name
        false, // durable
        true,  // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,
    )
    if err != nil {
        return nil, err
    }

    client := &RPCClient{
        ch:          ch,
        replyQueue:  q.Name,
        pendingCalls: make(map[string]chan []byte),
    }

    // Start consuming replies
    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return nil, err
    }

    go client.handleReplies(msgs)

    return client, nil
}

func (c *RPCClient) Call(ctx context.Context, exchange, routingKey string, body []byte, timeout time.Duration) ([]byte, error) {
    corrID := uuid.New().String()

    replyChan := make(chan []byte, 1)
    c.mu.Lock()
    c.pendingCalls[corrID] = replyChan
    c.mu.Unlock()

    defer func() {
        c.mu.Lock()
        delete(c.pendingCalls, corrID)
        c.mu.Unlock()
    }()

    err := c.ch.PublishWithContext(
        ctx,
        exchange,
        routingKey,
        false,
        false,
        amqp.Publishing{
            ContentType:   "application/json",
            Body:          body,
            ReplyTo:       c.replyQueue,
            CorrelationId: corrID,
            Expiration:    fmt.Sprintf("%d", timeout.Milliseconds()),
        },
    )
    if err != nil {
        return nil, err
    }

    select {
    case reply := <-replyChan:
        return reply, nil
    case <-time.After(timeout):
        return nil, fmt.Errorf("rpc timeout")
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (c *RPCClient) handleReplies(msgs <-chan amqp.Delivery) {
    for msg := range msgs {
        c.mu.Lock()
        if ch, ok := c.pendingCalls[msg.CorrelationId]; ok {
            ch <- msg.Body
        }
        c.mu.Unlock()
    }
}

// RPC Server
func handleRPCRequest(ch *amqp.Channel, queueName string, handler func([]byte) ([]byte, error)) error {
    msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
    if err != nil {
        return err
    }

    for msg := range msgs {
        response, err := handler(msg.Body)
        if err != nil {
            log.Printf("RPC handler error: %v", err)
            msg.Nack(false, false)
            continue
        }

        err = ch.Publish(
            "",
            msg.ReplyTo,
            false,
            false,
            amqp.Publishing{
                ContentType:   "application/json",
                Body:          response,
                CorrelationId: msg.CorrelationId,
            },
        )

        if err != nil {
            log.Printf("Failed to send RPC reply: %v", err)
        }

        msg.Ack(false)
    }

    return nil
}

4. Consistent Hashing Exchange

// Use consistent hashing for load distribution
func declareConsistentHashExchange(ch *amqp.Channel, exchangeName string) error {
    return ch.ExchangeDeclare(
        exchangeName,
        "x-consistent-hash", // requires rabbitmq_consistent_hash_exchange plugin
        true,
        false,
        false,
        false,
        nil,
    )
}

// Bind queues with weights
func bindQueueWithWeight(ch *amqp.Channel, queueName, exchangeName string, weight int) error {
    return ch.QueueBind(
        queueName,
        fmt.Sprintf("%d", weight), // routing key is the weight
        exchangeName,
        false,
        nil,
    )
}

// Publish with routing key for hashing
func publishToConsistentHash(ch *amqp.Channel, exchange, hashKey string, body []byte) error {
    return ch.Publish(
        exchange,
        hashKey, // e.g., user ID for user-specific routing
        false,
        false,
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent,
        },
    )
}

5. Shovel Pattern (Message Forwarding)

// Configure shovel to forward messages between exchanges/queues
func configureShovel(managementURL, name, srcExchange, destExchange string) error {
    shovelConfig := map[string]interface{}{
        "value": map[string]interface{}{
            "src-uri":      "amqp://localhost",
            "src-exchange": srcExchange,
            "dest-uri":     "amqp://remote-server",
            "dest-exchange": destExchange,
        },
    }

    // Use RabbitMQ management API to configure shovel
    // Implementation depends on management HTTP API client
    return nil
}

6. Metrics Collection

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    messagesPublished = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rabbitmq_messages_published_total",
            Help: "Total messages published",
        },
        []string{"exchange", "routing_key", "status"},
    )

    messagesConsumed = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rabbitmq_messages_consumed_total",
            Help: "Total messages consumed",
        },
        []string{"queue", "status"},
    )

    messageProcessingTime = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "rabbitmq_message_processing_seconds",
            Help:    "Message processing time",
            Buckets: prometheus.DefBuckets,
        },
        []string{"queue", "message_type"},
    )

    queueDepth = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "rabbitmq_queue_depth",
            Help: "Current queue depth",
        },
        []string{"queue"},
    )
)

// Track metrics in handler
type MetricsHandler struct {
    handler   MessageHandler
    queueName string
}

func (h *MetricsHandler) Handle(ctx context.Context, msg *models.Message) error {
    start := time.Now()

    err := h.handler.Handle(ctx, msg)

    duration := time.Since(start).Seconds()
    messageProcessingTime.WithLabelValues(h.queueName, msg.Type).Observe(duration)

    status := "success"
    if err != nil {
        status = "error"
    }
    messagesConsumed.WithLabelValues(h.queueName, status).Inc()

    return err
}

Dependencies

// go.mod
module rabbitmq-service

go 1.21

require (
    github.com/rabbitmq/amqp091-go v1.9.0
    github.com/google/uuid v1.6.0
    github.com/prometheus/client_golang v1.19.0
)

Docker Compose Setup

# deployments/docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.13-management-alpine
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
      - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 30s
      timeout: 10s
      retries: 5

volumes:
  rabbitmq_data:
# deployments/rabbitmq/rabbitmq.conf
# Management
management.load_definitions = /etc/rabbitmq/definitions.json

# Performance
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB

# Clustering (if needed)
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config

Best Practices

1. Message Durability

Persistent Messages: Mark messages as persistent for durability.

Durable Queues: Declare queues as durable to survive broker restarts.

Publisher Confirms: Enable confirms for guaranteed delivery.

2. Error Handling

Dead Letter Queues: Route failed messages to DLQ for analysis.

Retry Logic: Implement exponential backoff for transient failures.

Idempotency: Design handlers to safely process duplicate messages.

3. Performance Optimization

Prefetch Count: Set appropriate prefetch to balance load.

ch.Qos(10, 0, false) // Prefetch 10 messages

Worker Pools: Use multiple workers for parallel processing.

Batch Operations: Batch publishes when possible.

4. Monitoring

Queue Metrics: Monitor queue depth, consumer count, message rates.

Consumer Performance: Track processing time, error rates.

Connection Health: Monitor connection status and reconnections.

5. Security

vhosts: Use virtual hosts to isolate environments.

User Permissions: Configure granular permissions per user.

TLS: Enable TLS for encrypted communication.

6. Scalability

Clustering: Use RabbitMQ clustering for high availability.

Queue Sharding: Distribute load across multiple queues.

Federation/Shovel: Connect multiple RabbitMQ instances.

When to Use RabbitMQ

Perfect for:

  • Task queues and background job processing
  • Request/reply patterns (RPC)
  • Complex routing scenarios
  • When you need message delivery guarantees
  • Microservices communication

Not ideal for:

  • Ultra-high throughput streaming (use Kafka)
  • Simple pub/sub with minimal features (use Redis)
  • Stateful stream processing
  • Real-time bidirectional communication (use WebSockets)

Conclusion

RabbitMQ with AMQP provides a powerful, flexible messaging system with excellent delivery guarantees and routing capabilities. The Go amqp091 library offers a robust client for building production-ready message-driven applications.

Key takeaways:

  • Use exchanges strategically for routing flexibility
  • Implement proper error handling with DLQs
  • Enable publisher confirms for reliability
  • Monitor queue depth and consumer performance
  • Design for idempotency and fault tolerance

For enterprise deployments, consider RabbitMQ clustering, federation, and plugins like delayed message exchange for advanced use cases.