{{series_nav(current_post=10)}}

MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and unreliable networks. Originally developed by IBM for oil pipeline monitoring, MQTT has become the de facto standard for IoT communication, powering everything from smart homes to industrial sensors.

What is MQTT?

MQTT is a binary protocol built on TCP/IP that provides publish-subscribe messaging with minimal overhead. It’s designed for scenarios where bandwidth is limited, network connectivity is unreliable, and devices have constrained resources.

Core Concepts

Broker: Central server that receives all messages and distributes them to subscribers.

Topics: Hierarchical message routing addresses (e.g., home/bedroom/temperature).

QoS Levels: Three quality of service levels guaranteeing different delivery semantics.

Retained Messages: Last message on a topic is stored and sent to new subscribers.

Last Will & Testament (LWT): Message sent by broker if client disconnects unexpectedly.

Clean/Persistent Sessions: Control whether subscriptions and queued messages survive disconnects.

Architecture

graph TB subgraph "IoT Devices" D1[Temperature Sensor] D2[Motion Detector] D3[Smart Light] D4[Door Sensor] end subgraph "MQTT Broker" B[Broker
mosquitto/EMQX] subgraph "Topic Tree" T1["home/bedroom/temp"] T2["home/bedroom/motion"] T3["home/living/light"] T4["home/front/door"] end subgraph "Session Store" S1[QoS 1/2 Messages] S2[Retained Messages] S3[Client Subscriptions] end end subgraph "Backend Services" C1[Analytics Service] C2[Alert Service] C3[Dashboard] C4[Automation Engine] end D1 -->|Publish QoS 1| B D2 -->|Publish QoS 0| B D3 -.->|LWT| B D4 -->|Publish QoS 2| B B --> T1 B --> T2 B --> T3 B --> T4 T1 --> S2 T2 --> S1 B -->|Subscribe home/#| C1 B -->|Subscribe +/+/motion| C2 B -->|Subscribe home/+/+| C3 B -->|Subscribe home/bedroom/#| C4

QoS Levels Explained

sequenceDiagram participant P as Publisher participant B as Broker participant S as Subscriber rect rgb(255, 230, 230) Note over P,S: QoS 0: At Most Once (Fire & Forget) P->>B: PUBLISH B->>S: PUBLISH Note over P,S: No acknowledgment, fastest, may lose messages end rect rgb(230, 255, 230) Note over P,S: QoS 1: At Least Once P->>B: PUBLISH B->>P: PUBACK B->>S: PUBLISH S->>B: PUBACK Note over P,S: Acknowledged, guaranteed delivery, may duplicate end rect rgb(230, 230, 255) Note over P,S: QoS 2: Exactly Once P->>B: PUBLISH B->>P: PUBREC P->>B: PUBREL B->>P: PUBCOMP B->>S: PUBLISH S->>B: PUBREC B->>S: PUBREL S->>B: PUBCOMP Note over P,S: 4-way handshake, guaranteed exactly once, slowest end

Topic Wildcards

graph TD A["home/#"] -->|Matches| B[home/bedroom/temp] A -->|Matches| C[home/living/light] A -->|Matches| D[home/front/door/status] E["+/bedroom/+"] -->|Matches| F[home/bedroom/temp] E -->|Matches| G[home/bedroom/humidity] E -->|Matches| H[apartment/bedroom/light] I["home/+/temp"] -->|Matches| J[home/bedroom/temp] I -->|Matches| K[home/living/temp] I -->|NOT Matches| L[home/bedroom/humidity] style A fill:#ffe6e6 style E fill:#e6ffe6 style I fill:#e6e6ff

Real-World Use Cases

1. Smart Home Automation

Monitor and control smart devices with minimal bandwidth and power consumption.

Perfect for: Temperature sensors, smart lights, door locks, security cameras

2. Industrial IoT (IIoT)

Connect machinery, sensors, and control systems in manufacturing environments.

Perfect for: Equipment monitoring, predictive maintenance, supply chain tracking

3. Connected Vehicles

Transmit telemetry data from vehicles to cloud platforms.

Perfect for: Fleet management, vehicle diagnostics, usage analytics

4. Remote Monitoring

Monitor remote assets in areas with poor connectivity.

Perfect for: Oil pipelines, wind farms, agricultural sensors, environmental monitoring

5. Healthcare Devices

Connect medical devices and wearables for patient monitoring.

Perfect for: Heart rate monitors, glucose meters, patient tracking systems

6. Smart Cities

Collect data from city infrastructure and environmental sensors.

Perfect for: Traffic lights, parking sensors, air quality monitors, waste management

7. Mobile Applications

Push notifications and real-time updates with minimal battery drain.

Perfect for: Chat apps, location tracking, delivery notifications

8. Edge Computing

Coordinate between edge devices and cloud services.

Perfect for: Edge analytics, distributed AI inference, local automation

Project Structure

mqtt-service/
├── cmd/
│   ├── publisher/
│   │   └── main.go           # Publisher application
│   ├── subscriber/
│   │   └── main.go           # Subscriber application
│   └── bridge/
│       └── main.go           # MQTT-to-Kafka bridge
├── internal/
│   ├── mqtt/
│   │   ├── client.go         # MQTT client wrapper
│   │   ├── publisher.go      # Publisher logic
│   │   ├── subscriber.go     # Subscriber logic
│   │   └── options.go        # Client options
│   ├── sensor/
│   │   ├── sensor.go         # Sensor simulator
│   │   └── types.go          # Sensor data models
│   ├── handler/
│   │   ├── handler.go        # Message handler interface
│   │   └── processors.go     # Message processors
│   └── config/
│       └── config.go         # Configuration
├── pkg/
│   ├── patterns/
│   │   ├── lwt.go            # Last Will & Testament
│   │   ├── retained.go       # Retained messages
│   │   └── qos.go            # QoS examples
│   └── monitoring/
│       ├── metrics.go        # Prometheus metrics
│       └── health.go         # Health checks
├── deployments/
│   └── docker-compose.yml    # MQTT broker setup
├── go.mod
└── go.sum

Implementation

1. Sensor Data Models

// internal/sensor/types.go
package sensor

import (
	"encoding/json"
	"time"
)

// SensorReading represents a sensor measurement
type SensorReading struct {
	SensorID  string    `json:"sensor_id"`
	Type      string    `json:"type"`
	Value     float64   `json:"value"`
	Unit      string    `json:"unit"`
	Location  string    `json:"location"`
	Timestamp time.Time `json:"timestamp"`
	Battery   int       `json:"battery_percent,omitempty"`
	Quality   string    `json:"quality,omitempty"` // good, fair, poor
}

// ToJSON converts reading to JSON bytes
func (sr *SensorReading) ToJSON() ([]byte, error) {
	return json.Marshal(sr)
}

// FromJSON parses JSON bytes to reading
func FromJSON(data []byte) (*SensorReading, error) {
	var reading SensorReading
	err := json.Unmarshal(data, &reading)
	return &reading, err
}

// DeviceStatus represents device health
type DeviceStatus struct {
	DeviceID     string    `json:"device_id"`
	Online       bool      `json:"online"`
	LastSeen     time.Time `json:"last_seen"`
	FirmwareVer  string    `json:"firmware_version"`
	SignalStrength int     `json:"signal_strength"` // RSSI
	Uptime       int64     `json:"uptime_seconds"`
}

// AlertMessage represents an alert/notification
type AlertMessage struct {
	AlertID   string    `json:"alert_id"`
	Severity  string    `json:"severity"` // critical, warning, info
	Message   string    `json:"message"`
	SensorID  string    `json:"sensor_id"`
	Timestamp time.Time `json:"timestamp"`
	Metadata  map[string]string `json:"metadata,omitempty"`
}

2. MQTT Client Wrapper

// internal/mqtt/client.go
package mqtt

import (
	"crypto/tls"
	"fmt"
	"log"
	"time"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

// Client wraps the Paho MQTT client
type Client struct {
	client MQTT.Client
	config *Config
}

// Config holds MQTT client configuration
type Config struct {
	Broker            string
	Port              int
	ClientID          string
	Username          string
	Password          string
	KeepAlive         int           // seconds
	PingTimeout       time.Duration
	ConnectTimeout    time.Duration
	MaxReconnectDelay time.Duration
	AutoReconnect     bool
	CleanSession      bool
	UseTLS            bool
	TLSConfig         *tls.Config

	// Last Will & Testament
	WillEnabled bool
	WillTopic   string
	WillPayload string
	WillQoS     byte
	WillRetained bool

	// Callbacks
	OnConnect    func(MQTT.Client)
	OnConnectionLost func(MQTT.Client, error)
}

// NewClient creates a new MQTT client
func NewClient(cfg *Config) (*Client, error) {
	opts := MQTT.NewClientOptions()

	// Broker configuration
	brokerURL := fmt.Sprintf("tcp://%s:%d", cfg.Broker, cfg.Port)
	if cfg.UseTLS {
		brokerURL = fmt.Sprintf("ssl://%s:%d", cfg.Broker, cfg.Port)
		if cfg.TLSConfig != nil {
			opts.SetTLSConfig(cfg.TLSConfig)
		}
	}
	opts.AddBroker(brokerURL)

	// Client configuration
	opts.SetClientID(cfg.ClientID)
	if cfg.Username != "" {
		opts.SetUsername(cfg.Username)
		opts.SetPassword(cfg.Password)
	}

	// Connection parameters
	opts.SetKeepAlive(time.Duration(cfg.KeepAlive) * time.Second)
	opts.SetPingTimeout(cfg.PingTimeout)
	opts.SetConnectTimeout(cfg.ConnectTimeout)
	opts.SetMaxReconnectInterval(cfg.MaxReconnectDelay)
	opts.SetAutoReconnect(cfg.AutoReconnect)
	opts.SetCleanSession(cfg.CleanSession)

	// Last Will & Testament
	if cfg.WillEnabled {
		opts.SetWill(cfg.WillTopic, cfg.WillPayload, cfg.WillQoS, cfg.WillRetained)
	}

	// Callbacks
	opts.SetOnConnectHandler(func(c MQTT.Client) {
		log.Printf("Connected to MQTT broker: %s", brokerURL)
		if cfg.OnConnect != nil {
			cfg.OnConnect(c)
		}
	})

	opts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
		log.Printf("Connection lost: %v", err)
		if cfg.OnConnectionLost != nil {
			cfg.OnConnectionLost(c, err)
		}
	})

	// Default message handler (for debugging)
	opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
		log.Printf("Unexpected message on topic %s: %s", msg.Topic(), string(msg.Payload()))
	})

	// Create and connect client
	client := MQTT.NewClient(opts)

	if token := client.Connect(); token.Wait() && token.Error() != nil {
		return nil, fmt.Errorf("connect: %w", token.Error())
	}

	return &Client{
		client: client,
		config: cfg,
	}, nil
}

// Disconnect closes the connection
func (c *Client) Disconnect(quiesce uint) {
	c.client.Disconnect(quiesce)
	log.Println("Disconnected from MQTT broker")
}

// IsConnected returns connection status
func (c *Client) IsConnected() bool {
	return c.client.IsConnected()
}

// GetClient returns the underlying Paho client
func (c *Client) GetClient() MQTT.Client {
	return c.client
}

3. Publisher Implementation

// internal/mqtt/publisher.go
package mqtt

import (
	"fmt"
	"log"
	"time"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

// Publisher publishes messages to MQTT topics
type Publisher struct {
	client *Client
}

// NewPublisher creates a new publisher
func NewPublisher(client *Client) *Publisher {
	return &Publisher{client: client}
}

// Publish publishes a message to a topic
func (p *Publisher) Publish(topic string, qos byte, retained bool, payload interface{}) error {
	if !p.client.IsConnected() {
		return fmt.Errorf("client not connected")
	}

	token := p.client.client.Publish(topic, qos, retained, payload)
	token.Wait()

	if token.Error() != nil {
		return fmt.Errorf("publish: %w", token.Error())
	}

	log.Printf("Published to %s (QoS %d, retained: %v): %v", topic, qos, retained, payload)
	return nil
}

// PublishAsync publishes without waiting for acknowledgment
func (p *Publisher) PublishAsync(topic string, qos byte, retained bool, payload interface{}, callback func(error)) {
	if !p.client.IsConnected() {
		callback(fmt.Errorf("client not connected"))
		return
	}

	token := p.client.client.Publish(topic, qos, retained, payload)

	go func() {
		token.Wait()
		if token.Error() != nil {
			callback(token.Error())
		} else {
			callback(nil)
		}
	}()
}

// PublishJSON publishes JSON data
func (p *Publisher) PublishJSON(topic string, qos byte, retained bool, data []byte) error {
	return p.Publish(topic, qos, retained, data)
}

// PublishWithRetry publishes with automatic retry on failure
func (p *Publisher) PublishWithRetry(topic string, qos byte, retained bool, payload interface{}, maxRetries int) error {
	var err error
	backoff := 100 * time.Millisecond

	for i := 0; i <= maxRetries; i++ {
		err = p.Publish(topic, qos, retained, payload)
		if err == nil {
			return nil
		}

		if i < maxRetries {
			log.Printf("Publish failed (attempt %d/%d): %v. Retrying in %v", i+1, maxRetries, err, backoff)
			time.Sleep(backoff)
			backoff *= 2 // Exponential backoff
		}
	}

	return fmt.Errorf("publish failed after %d retries: %w", maxRetries, err)
}

4. Subscriber Implementation

// internal/mqtt/subscriber.go
package mqtt

import (
	"fmt"
	"log"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

// MessageHandler handles incoming messages
type MessageHandler func(topic string, payload []byte) error

// Subscriber subscribes to MQTT topics
type Subscriber struct {
	client *Client
}

// NewSubscriber creates a new subscriber
func NewSubscriber(client *Client) *Subscriber {
	return &Subscriber{client: client}
}

// Subscribe subscribes to a topic with a handler
func (s *Subscriber) Subscribe(topic string, qos byte, handler MessageHandler) error {
	if !s.client.IsConnected() {
		return fmt.Errorf("client not connected")
	}

	callback := func(client MQTT.Client, msg MQTT.Message) {
		log.Printf("Received on %s (QoS %d, retained: %v): %s",
			msg.Topic(), msg.Qos(), msg.Retained(), string(msg.Payload()))

		if err := handler(msg.Topic(), msg.Payload()); err != nil {
			log.Printf("Handler error for topic %s: %v", msg.Topic(), err)
		}
	}

	token := s.client.client.Subscribe(topic, qos, callback)
	token.Wait()

	if token.Error() != nil {
		return fmt.Errorf("subscribe: %w", token.Error())
	}

	log.Printf("Subscribed to %s (QoS %d)", topic, qos)
	return nil
}

// SubscribeMultiple subscribes to multiple topics
func (s *Subscriber) SubscribeMultiple(topics map[string]byte, handler MessageHandler) error {
	if !s.client.IsConnected() {
		return fmt.Errorf("client not connected")
	}

	filters := make(map[string]byte)
	callbacks := make(map[string]MQTT.MessageHandler)

	callback := func(client MQTT.Client, msg MQTT.Message) {
		log.Printf("Received on %s: %s", msg.Topic(), string(msg.Payload()))
		if err := handler(msg.Topic(), msg.Payload()); err != nil {
			log.Printf("Handler error: %v", err)
		}
	}

	for topic, qos := range topics {
		filters[topic] = qos
		callbacks[topic] = callback
	}

	token := s.client.client.SubscribeMultiple(filters, nil)
	token.Wait()

	if token.Error() != nil {
		return fmt.Errorf("subscribe multiple: %w", token.Error())
	}

	log.Printf("Subscribed to %d topics", len(topics))
	return nil
}

// Unsubscribe unsubscribes from topics
func (s *Subscriber) Unsubscribe(topics ...string) error {
	if !s.client.IsConnected() {
		return fmt.Errorf("client not connected")
	}

	token := s.client.client.Unsubscribe(topics...)
	token.Wait()

	if token.Error() != nil {
		return fmt.Errorf("unsubscribe: %w", token.Error())
	}

	log.Printf("Unsubscribed from %d topics", len(topics))
	return nil
}

5. Sensor Simulator

// internal/sensor/sensor.go
package sensor

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"mqtt-service/internal/mqtt"
)

// Simulator simulates IoT sensors
type Simulator struct {
	publisher *mqtt.Publisher
	sensorID  string
	location  string
}

// NewSimulator creates a new sensor simulator
func NewSimulator(publisher *mqtt.Publisher, sensorID, location string) *Simulator {
	return &Simulator{
		publisher: publisher,
		sensorID:  sensorID,
		location:  location,
	}
}

// SimulateTemperature simulates temperature sensor
func (s *Simulator) SimulateTemperature(ctx context.Context, interval time.Duration, qos byte) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	baseTemp := 20.0 + rand.Float64()*10 // 20-30°C base

	for {
		select {
		case <-ctx.Done():
			log.Printf("Temperature sensor %s stopped", s.sensorID)
			return

		case <-ticker.C:
			// Simulate temperature with random fluctuation
			temp := baseTemp + (rand.Float64()-0.5)*2 // ±1°C variation

			reading := &SensorReading{
				SensorID:  s.sensorID,
				Type:      "temperature",
				Value:     temp,
				Unit:      "celsius",
				Location:  s.location,
				Timestamp: time.Now(),
				Battery:   80 + rand.Intn(20), // 80-100%
				Quality:   "good",
			}

			data, err := reading.ToJSON()
			if err != nil {
				log.Printf("Error marshaling reading: %v", err)
				continue
			}

			topic := fmt.Sprintf("sensors/%s/temperature", s.location)
			if err := s.publisher.PublishJSON(topic, qos, false, data); err != nil {
				log.Printf("Error publishing temperature: %v", err)
			}
		}
	}
}

// SimulateMotion simulates motion detector
func (s *Simulator) SimulateMotion(ctx context.Context, qos byte) {
	for {
		select {
		case <-ctx.Done():
			log.Printf("Motion sensor %s stopped", s.sensorID)
			return

		default:
			// Random motion detection (every 10-60 seconds)
			time.Sleep(time.Duration(10+rand.Intn(50)) * time.Second)

			reading := &SensorReading{
				SensorID:  s.sensorID,
				Type:      "motion",
				Value:     1, // motion detected
				Unit:      "boolean",
				Location:  s.location,
				Timestamp: time.Now(),
				Quality:   "good",
			}

			data, _ := reading.ToJSON()
			topic := fmt.Sprintf("sensors/%s/motion", s.location)

			// Use QoS 0 for motion (fire and forget)
			s.publisher.PublishJSON(topic, qos, false, data)
		}
	}
}

// SimulateDoorSensor simulates door open/close events
func (s *Simulator) SimulateDoorSensor(ctx context.Context, qos byte) {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()

	isOpen := false

	for {
		select {
		case <-ctx.Done():
			return

		case <-ticker.C:
			isOpen = !isOpen // Toggle door state

			value := 0.0
			if isOpen {
				value = 1.0
			}

			reading := &SensorReading{
				SensorID:  s.sensorID,
				Type:      "door",
				Value:     value,
				Unit:      "boolean",
				Location:  s.location,
				Timestamp: time.Now(),
				Battery:   90,
				Quality:   "good",
			}

			data, _ := reading.ToJSON()
			topic := fmt.Sprintf("sensors/%s/door", s.location)

			// Use QoS 2 for door sensor (exactly once)
			// Retained message so new subscribers get current state
			s.publisher.PublishJSON(topic, qos, true, data)
		}
	}
}

// PublishStatus publishes device status
func (s *Simulator) PublishStatus(qos byte) error {
	status := &DeviceStatus{
		DeviceID:       s.sensorID,
		Online:         true,
		LastSeen:       time.Now(),
		FirmwareVer:    "1.2.3",
		SignalStrength: -50 + rand.Intn(20), // -50 to -30 dBm
		Uptime:         int64(rand.Intn(86400)),
	}

	data, err := status.ToJSON()
	if err != nil {
		return err
	}

	topic := fmt.Sprintf("devices/%s/status", s.sensorID)
	return s.publisher.PublishJSON(topic, qos, true, data)
}

6. Message Handler

// internal/handler/handler.go
package handler

import (
	"encoding/json"
	"fmt"
	"log"

	"mqtt-service/internal/sensor"
)

// SensorHandler processes sensor messages
type SensorHandler struct {
	// Could connect to database, cache, or other services
}

// NewSensorHandler creates a new sensor handler
func NewSensorHandler() *SensorHandler {
	return &SensorHandler{}
}

// HandleMessage processes incoming MQTT messages
func (h *SensorHandler) HandleMessage(topic string, payload []byte) error {
	log.Printf("Processing message from topic: %s", topic)

	// Parse sensor reading
	reading, err := sensor.FromJSON(payload)
	if err != nil {
		return fmt.Errorf("parse reading: %w", err)
	}

	// Process based on sensor type
	switch reading.Type {
	case "temperature":
		return h.handleTemperature(reading)
	case "motion":
		return h.handleMotion(reading)
	case "door":
		return h.handleDoor(reading)
	default:
		log.Printf("Unknown sensor type: %s", reading.Type)
		return nil
	}
}

func (h *SensorHandler) handleTemperature(reading *sensor.SensorReading) error {
	log.Printf("Temperature: %.2f°C at %s (sensor: %s)",
		reading.Value, reading.Location, reading.SensorID)

	// Check for alerts
	if reading.Value > 30 {
		log.Printf("⚠️  High temperature alert: %.2f°C", reading.Value)
		// Could send alert via email, SMS, push notification
	} else if reading.Value < 15 {
		log.Printf("❄️  Low temperature alert: %.2f°C", reading.Value)
	}

	// Store in time-series database
	// e.g., InfluxDB, TimescaleDB, Prometheus

	return nil
}

func (h *SensorHandler) handleMotion(reading *sensor.SensorReading) error {
	if reading.Value > 0 {
		log.Printf("🚶 Motion detected at %s (sensor: %s)",
			reading.Location, reading.SensorID)

		// Trigger automation:
		// - Turn on lights
		// - Send security alert
		// - Start recording camera
	}

	return nil
}

func (h *SensorHandler) handleDoor(reading *sensor.SensorReading) error {
	status := "closed"
	if reading.Value > 0 {
		status = "open"
	}

	log.Printf("🚪 Door %s at %s (sensor: %s)",
		status, reading.Location, reading.SensorID)

	// Security logic:
	// - Log access
	// - Check authorization
	// - Send notifications

	return nil
}

// AlertHandler processes alert messages
type AlertHandler struct{}

func NewAlertHandler() *AlertHandler {
	return &AlertHandler{}
}

func (h *AlertHandler) HandleMessage(topic string, payload []byte) error {
	var alert sensor.AlertMessage
	if err := json.Unmarshal(payload, &alert); err != nil {
		return err
	}

	log.Printf("🚨 Alert [%s]: %s (sensor: %s)",
		alert.Severity, alert.Message, alert.SensorID)

	// Route alert based on severity
	switch alert.Severity {
	case "critical":
		// Send immediate notification (SMS, phone call)
		log.Println("Sending critical alert notification")
	case "warning":
		// Send email or push notification
		log.Println("Sending warning notification")
	case "info":
		// Log to dashboard
		log.Println("Logging info alert")
	}

	return nil
}

7. Publisher Application

// cmd/publisher/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"mqtt-service/internal/mqtt"
	"mqtt-service/internal/sensor"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Handle graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// Device offline status for LWT
	deviceID := "sensor-001"
	lwtPayload := fmt.Sprintf(`{"device_id":"%s","online":false,"timestamp":"%s"}`,
		deviceID, time.Now().Format(time.RFC3339))

	// Create MQTT client with Last Will & Testament
	client, err := mqtt.NewClient(&mqtt.Config{
		Broker:            "localhost",
		Port:              1883,
		ClientID:          deviceID,
		Username:          "",
		Password:          "",
		KeepAlive:         60,
		PingTimeout:       10 * time.Second,
		ConnectTimeout:    30 * time.Second,
		MaxReconnectDelay: 60 * time.Second,
		AutoReconnect:     true,
		CleanSession:      false, // Persistent session

		// Last Will & Testament
		WillEnabled:  true,
		WillTopic:    fmt.Sprintf("devices/%s/status", deviceID),
		WillPayload:  lwtPayload,
		WillQoS:      1,
		WillRetained: true,

		OnConnect: func(c MQTT.Client) {
			log.Println("Publisher connected to broker")
			// Publish online status
			publisher := mqtt.NewPublisher(&mqtt.Client{GetClient: func() MQTT.Client { return c }})
			onlinePayload := fmt.Sprintf(`{"device_id":"%s","online":true,"timestamp":"%s"}`,
				deviceID, time.Now().Format(time.RFC3339))
			publisher.Publish(fmt.Sprintf("devices/%s/status", deviceID), 1, true, onlinePayload)
		},
		OnConnectionLost: func(c MQTT.Client, err error) {
			log.Printf("Connection lost: %v. Will auto-reconnect...", err)
		},
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(1000)

	// Create publisher
	publisher := mqtt.NewPublisher(client)

	// Create sensor simulators
	tempSensor := sensor.NewSimulator(publisher, "temp-sensor-001", "bedroom")
	motionSensor := sensor.NewSimulator(publisher, "motion-sensor-001", "bedroom")
	doorSensor := sensor.NewSimulator(publisher, "door-sensor-001", "front")

	// Start simulators
	go tempSensor.SimulateTemperature(ctx, 5*time.Second, 1)  // QoS 1
	go motionSensor.SimulateMotion(ctx, 0)                     // QoS 0
	go doorSensor.SimulateDoorSensor(ctx, 2)                   // QoS 2

	// Publish device status periodically
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				tempSensor.PublishStatus(1)
			}
		}
	}()

	log.Println("Publisher started. Publishing sensor data...")
	log.Println("Press Ctrl+C to stop")

	// Wait for shutdown signal
	<-sigChan
	log.Println("Shutting down publisher...")
	cancel()

	// Give time for graceful shutdown
	time.Sleep(2 * time.Second)
}

8. Subscriber Application

// cmd/subscriber/main.go
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"mqtt-service/internal/handler"
	"mqtt-service/internal/mqtt"
)

func main() {
	// Handle graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// Create MQTT client
	client, err := mqtt.NewClient(&mqtt.Config{
		Broker:            "localhost",
		Port:              1883,
		ClientID:          "subscriber-001",
		Username:          "",
		Password:          "",
		KeepAlive:         60,
		PingTimeout:       10 * time.Second,
		ConnectTimeout:    30 * time.Second,
		MaxReconnectDelay: 60 * time.Second,
		AutoReconnect:     true,
		CleanSession:      false, // Persistent session to receive offline messages

		OnConnect: func(c MQTT.Client) {
			log.Println("Subscriber connected to broker")
		},
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(1000)

	// Create subscriber
	subscriber := mqtt.NewSubscriber(client)

	// Create message handlers
	sensorHandler := handler.NewSensorHandler()
	alertHandler := handler.NewAlertHandler()

	// Subscribe to multiple topics with wildcards
	subscriptions := map[string]byte{
		"sensors/+/temperature": 1, // All temperature sensors, QoS 1
		"sensors/+/motion":      0, // All motion sensors, QoS 0
		"sensors/+/door":        2, // All door sensors, QoS 2
		"devices/+/status":      1, // All device status, QoS 1
		"alerts/#":              1, // All alerts, QoS 1
	}

	// Subscribe to sensor topics
	if err := subscriber.SubscribeMultiple(subscriptions, func(topic string, payload []byte) error {
		// Route to appropriate handler based on topic
		if topic[:7] == "sensors" {
			return sensorHandler.HandleMessage(topic, payload)
		} else if topic[:6] == "alerts" {
			return alertHandler.HandleMessage(topic, payload)
		} else if topic[:7] == "devices" {
			log.Printf("Device status update: %s", string(payload))
			return nil
		}
		return nil
	}); err != nil {
		log.Fatal(err)
	}

	// Also subscribe to specific critical alerts with higher QoS
	if err := subscriber.Subscribe("alerts/critical/#", 2, func(topic string, payload []byte) error {
		log.Printf("🚨 CRITICAL ALERT on %s: %s", topic, string(payload))
		// Send immediate notification
		return nil
	}); err != nil {
		log.Fatal(err)
	}

	log.Println("Subscriber started. Listening for messages...")
	log.Printf("Subscribed to %d topic patterns", len(subscriptions)+1)
	log.Println("Press Ctrl+C to stop")

	// Wait for shutdown signal
	<-sigChan
	log.Println("Shutting down subscriber...")
}

Advanced Patterns

1. Last Will & Testament (LWT)

// Detecting device disconnects
config := &mqtt.Config{
	// ... other config ...
	WillEnabled:  true,
	WillTopic:    "devices/sensor-001/status",
	WillPayload:  `{"online":false,"reason":"unexpected_disconnect"}`,
	WillQoS:      1,
	WillRetained: true, // New subscribers get last known state
}

// Monitor device status
subscriber.Subscribe("devices/+/status", 1, func(topic string, payload []byte) error {
	var status sensor.DeviceStatus
	json.Unmarshal(payload, &status)

	if !status.Online {
		log.Printf("Device %s went offline!", status.DeviceID)
		// Trigger alerts, failover, etc.
	}

	return nil
})

2. Retained Messages

// Publish current state as retained
publisher.Publish("home/thermostat/setpoint", 1, true, "22.5")

// New subscribers immediately receive the last retained message
subscriber.Subscribe("home/thermostat/setpoint", 1, func(topic string, payload []byte) error {
	log.Printf("Current setpoint: %s°C", string(payload))
	return nil
})

// Clear retained message
publisher.Publish("home/thermostat/setpoint", 1, true, "")

3. Request-Response Pattern

// Request-response using MQTT
type RequestHandler struct {
	publisher *mqtt.Publisher
	responses map[string]chan []byte
	mu        sync.Mutex
}

func (h *RequestHandler) Request(ctx context.Context, topic string, payload []byte) ([]byte, error) {
	// Generate correlation ID
	correlationID := uuid.New().String()
	responseTopic := fmt.Sprintf("responses/%s", correlationID)

	// Create response channel
	respChan := make(chan []byte, 1)
	h.mu.Lock()
	h.responses[correlationID] = respChan
	h.mu.Unlock()

	defer func() {
		h.mu.Lock()
		delete(h.responses, correlationID)
		h.mu.Unlock()
	}()

	// Publish request with response topic in message
	requestMsg := map[string]interface{}{
		"correlation_id": correlationID,
		"response_topic": responseTopic,
		"payload":        payload,
	}
	data, _ := json.Marshal(requestMsg)

	if err := h.publisher.PublishJSON(topic, 1, false, data); err != nil {
		return nil, err
	}

	// Wait for response
	select {
	case response := <-respChan:
		return response, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-time.After(30 * time.Second):
		return nil, fmt.Errorf("request timeout")
	}
}

4. Topic-based Routing

// Smart routing based on topic hierarchy
type Router struct {
	handlers map[string]mqtt.MessageHandler
	mu       sync.RWMutex
}

func (r *Router) Register(pattern string, handler mqtt.MessageHandler) {
	r.mu.Lock()
	defer r.mu.Unlock()
	r.handlers[pattern] = handler
}

func (r *Router) Route(topic string, payload []byte) error {
	r.mu.RLock()
	defer r.mu.RUnlock()

	// Match exact topics
	if handler, ok := r.handlers[topic]; ok {
		return handler(topic, payload)
	}

	// Match wildcard patterns
	for pattern, handler := range r.handlers {
		if matchTopic(pattern, topic) {
			return handler(topic, payload)
		}
	}

	return fmt.Errorf("no handler for topic: %s", topic)
}

func matchTopic(pattern, topic string) bool {
	// Implement MQTT topic matching with + and # wildcards
	// + matches single level, # matches multiple levels
	// Implementation left as exercise
	return false
}

5. Shared Subscriptions (MQTT 5.0)

// Load balancing across multiple subscribers
// Topic: $share/group1/sensors/+/temperature

subscriber1.Subscribe("$share/workers/sensors/+/data", 1, handler1)
subscriber2.Subscribe("$share/workers/sensors/+/data", 1, handler2)
subscriber3.Subscribe("$share/workers/sensors/+/data", 1, handler3)

// Broker distributes messages across worker1, worker2, worker3
// Each message delivered to only one subscriber in the group

Dependencies

// go.mod
module mqtt-service

go 1.21

require (
	github.com/eclipse/paho.mqtt.golang v1.4.3
	github.com/google/uuid v1.6.0
	github.com/prometheus/client_golang v1.19.0
)

Docker Compose Setup

# deployments/docker-compose.yml
version: '3.8'

services:
  # Mosquitto MQTT Broker
  mosquitto:
    image: eclipse-mosquitto:2.0
    ports:
      - "1883:1883"  # MQTT
      - "9001:9001"  # WebSocket
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    restart: unless-stopped

  # EMQX MQTT Broker (alternative - more features)
  emqx:
    image: emqx/emqx:5.3.2
    ports:
      - "1883:1883"   # MQTT
      - "8083:8083"   # WebSocket
      - "8084:8084"   # WSS
      - "8883:8883"   # MQTT/SSL
      - "18083:18083" # Dashboard
    environment:
      - EMQX_NAME=emqx
      - EMQX_HOST=127.0.0.1
    volumes:
      - emqx_data:/opt/emqx/data
      - emqx_log:/opt/emqx/log
    restart: unless-stopped

  # MQTT Explorer (GUI for debugging)
  mqtt-explorer:
    image: smeagolworms4/mqtt-explorer
    ports:
      - "4000:4000"
    restart: unless-stopped

volumes:
  emqx_data:
  emqx_log:

Mosquitto Config:

# mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true

# WebSocket support
listener 9001
protocol websockets

# Logging
log_dest stdout
log_type all

# Persistence
persistence true
persistence_location /mosquitto/data/

Best Practices

1. QoS Selection

QoS 0 (At Most Once): Use for high-frequency, non-critical data

// Examples: Motion detection, heartbeats, metrics
publisher.Publish("sensors/motion", 0, false, payload)

QoS 1 (At Least Once): Use for important data where duplicates are acceptable

// Examples: Temperature readings, status updates
publisher.Publish("sensors/temperature", 1, false, payload)

QoS 2 (Exactly Once): Use for critical data requiring exactly-once delivery

// Examples: Financial transactions, access control, alarms
publisher.Publish("alerts/critical", 2, false, payload)

2. Topic Design

Good Hierarchy:

sensors/
  ├── location/
  │   ├── bedroom/
  │   │   ├── temperature
  │   │   ├── humidity
  │   │   └── motion
  │   └── kitchen/
  │       └── temperature
  └── type/
      ├── temperature
      └── motion

devices/
  └── {device-id}/
      ├── status
      ├── config
      └── telemetry

alerts/
  ├── critical/
  ├── warning/
  └── info/

Avoid:

  • Too deep hierarchies (> 7 levels)
  • Generic topics like “data” or “message”
  • Mixing different data types in same topic

3. Connection Management

// Implement exponential backoff
config := &mqtt.Config{
	AutoReconnect:     true,
	MaxReconnectDelay: 60 * time.Second,
	OnConnectionLost: func(c MQTT.Client, err error) {
		log.Printf("Connection lost: %v", err)
		// Metrics, alerts, etc.
	},
}

// Use persistent sessions for critical clients
config.CleanSession = false

// Set appropriate keep-alive
config.KeepAlive = 60 // seconds

4. Security

// Use TLS
config := &mqtt.Config{
	UseTLS: true,
	TLSConfig: &tls.Config{
		RootCAs:            caCertPool,
		Certificates:       []tls.Certificate{cert},
		InsecureSkipVerify: false,
	},
}

// Use authentication
config.Username = "device-001"
config.Password = os.Getenv("MQTT_PASSWORD")

// Use client certificates for device authentication

5. Monitoring

import (
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
	messagesPublished = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "mqtt_messages_published_total",
			Help: "Total messages published",
		},
		[]string{"topic", "qos"},
	)

	messagesReceived = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "mqtt_messages_received_total",
			Help: "Total messages received",
		},
		[]string{"topic"},
	)

	connectionStatus = promauto.NewGauge(
		prometheus.GaugeOpts{
			Name: "mqtt_connection_status",
			Help: "MQTT connection status (1=connected, 0=disconnected)",
		},
	)
)

// Track metrics in callbacks
config.OnConnect = func(c MQTT.Client) {
	connectionStatus.Set(1)
}

config.OnConnectionLost = func(c MQTT.Client, err error) {
	connectionStatus.Set(0)
}

When to Use MQTT

Perfect for:

  • IoT devices with constrained resources
  • Unreliable networks (mobile, satellite)
  • Battery-powered devices
  • Publish-subscribe messaging
  • Scenarios requiring QoS guarantees
  • Simple device-to-cloud communication
  • M2M (machine-to-machine) communication

Not ideal for:

  • High-throughput event streaming (use Kafka)
  • Complex request-response patterns (use gRPC)
  • Browser-based applications (use WebSockets or SSE)
  • Video/audio streaming (use WebRTC)
  • When you need message ordering across topics
  • Large message payloads (> 256 KB)

Conclusion

MQTT is the gold standard for IoT communication, offering lightweight messaging with sophisticated features like QoS levels, retained messages, and Last Will & Testament. Its small footprint, low bandwidth requirements, and resilience to network issues make it ideal for connecting millions of devices.

Key takeaways:

  • Choose appropriate QoS levels for different data types
  • Design hierarchical topic structures for scalability
  • Leverage Last Will & Testament for device monitoring
  • Use retained messages for state synchronization
  • Implement persistent sessions for critical subscribers
  • Monitor connection health and message delivery

MQTT shines when connecting resource-constrained devices over unreliable networks, making it the perfect choice for IoT, industrial automation, and mobile applications.


{{series_nav(current_post=10)}}