{{series_nav(current_post=10)}}
MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and unreliable networks. Originally developed by IBM for oil pipeline monitoring, MQTT has become the de facto standard for IoT communication, powering everything from smart homes to industrial sensors.
What is MQTT?
MQTT is a binary protocol built on TCP/IP that provides publish-subscribe messaging with minimal overhead. It’s designed for scenarios where bandwidth is limited, network connectivity is unreliable, and devices have constrained resources.
Core Concepts
Broker: Central server that receives all messages and distributes them to subscribers.
Topics: Hierarchical message routing addresses (e.g., home/bedroom/temperature).
QoS Levels: Three quality of service levels guaranteeing different delivery semantics.
Retained Messages: Last message on a topic is stored and sent to new subscribers.
Last Will & Testament (LWT): Message sent by broker if client disconnects unexpectedly.
Clean/Persistent Sessions: Control whether subscriptions and queued messages survive disconnects.
Architecture
mosquitto/EMQX] subgraph "Topic Tree" T1["home/bedroom/temp"] T2["home/bedroom/motion"] T3["home/living/light"] T4["home/front/door"] end subgraph "Session Store" S1[QoS 1/2 Messages] S2[Retained Messages] S3[Client Subscriptions] end end subgraph "Backend Services" C1[Analytics Service] C2[Alert Service] C3[Dashboard] C4[Automation Engine] end D1 -->|Publish QoS 1| B D2 -->|Publish QoS 0| B D3 -.->|LWT| B D4 -->|Publish QoS 2| B B --> T1 B --> T2 B --> T3 B --> T4 T1 --> S2 T2 --> S1 B -->|Subscribe home/#| C1 B -->|Subscribe +/+/motion| C2 B -->|Subscribe home/+/+| C3 B -->|Subscribe home/bedroom/#| C4
QoS Levels Explained
Topic Wildcards
Real-World Use Cases
1. Smart Home Automation
Monitor and control smart devices with minimal bandwidth and power consumption.
Perfect for: Temperature sensors, smart lights, door locks, security cameras
2. Industrial IoT (IIoT)
Connect machinery, sensors, and control systems in manufacturing environments.
Perfect for: Equipment monitoring, predictive maintenance, supply chain tracking
3. Connected Vehicles
Transmit telemetry data from vehicles to cloud platforms.
Perfect for: Fleet management, vehicle diagnostics, usage analytics
4. Remote Monitoring
Monitor remote assets in areas with poor connectivity.
Perfect for: Oil pipelines, wind farms, agricultural sensors, environmental monitoring
5. Healthcare Devices
Connect medical devices and wearables for patient monitoring.
Perfect for: Heart rate monitors, glucose meters, patient tracking systems
6. Smart Cities
Collect data from city infrastructure and environmental sensors.
Perfect for: Traffic lights, parking sensors, air quality monitors, waste management
7. Mobile Applications
Push notifications and real-time updates with minimal battery drain.
Perfect for: Chat apps, location tracking, delivery notifications
8. Edge Computing
Coordinate between edge devices and cloud services.
Perfect for: Edge analytics, distributed AI inference, local automation
Project Structure
mqtt-service/
├── cmd/
│ ├── publisher/
│ │ └── main.go # Publisher application
│ ├── subscriber/
│ │ └── main.go # Subscriber application
│ └── bridge/
│ └── main.go # MQTT-to-Kafka bridge
├── internal/
│ ├── mqtt/
│ │ ├── client.go # MQTT client wrapper
│ │ ├── publisher.go # Publisher logic
│ │ ├── subscriber.go # Subscriber logic
│ │ └── options.go # Client options
│ ├── sensor/
│ │ ├── sensor.go # Sensor simulator
│ │ └── types.go # Sensor data models
│ ├── handler/
│ │ ├── handler.go # Message handler interface
│ │ └── processors.go # Message processors
│ └── config/
│ └── config.go # Configuration
├── pkg/
│ ├── patterns/
│ │ ├── lwt.go # Last Will & Testament
│ │ ├── retained.go # Retained messages
│ │ └── qos.go # QoS examples
│ └── monitoring/
│ ├── metrics.go # Prometheus metrics
│ └── health.go # Health checks
├── deployments/
│ └── docker-compose.yml # MQTT broker setup
├── go.mod
└── go.sum
Implementation
1. Sensor Data Models
// internal/sensor/types.go
package sensor
import (
"encoding/json"
"time"
)
// SensorReading represents a sensor measurement
type SensorReading struct {
SensorID string `json:"sensor_id"`
Type string `json:"type"`
Value float64 `json:"value"`
Unit string `json:"unit"`
Location string `json:"location"`
Timestamp time.Time `json:"timestamp"`
Battery int `json:"battery_percent,omitempty"`
Quality string `json:"quality,omitempty"` // good, fair, poor
}
// ToJSON converts reading to JSON bytes
func (sr *SensorReading) ToJSON() ([]byte, error) {
return json.Marshal(sr)
}
// FromJSON parses JSON bytes to reading
func FromJSON(data []byte) (*SensorReading, error) {
var reading SensorReading
err := json.Unmarshal(data, &reading)
return &reading, err
}
// DeviceStatus represents device health
type DeviceStatus struct {
DeviceID string `json:"device_id"`
Online bool `json:"online"`
LastSeen time.Time `json:"last_seen"`
FirmwareVer string `json:"firmware_version"`
SignalStrength int `json:"signal_strength"` // RSSI
Uptime int64 `json:"uptime_seconds"`
}
// AlertMessage represents an alert/notification
type AlertMessage struct {
AlertID string `json:"alert_id"`
Severity string `json:"severity"` // critical, warning, info
Message string `json:"message"`
SensorID string `json:"sensor_id"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]string `json:"metadata,omitempty"`
}
2. MQTT Client Wrapper
// internal/mqtt/client.go
package mqtt
import (
"crypto/tls"
"fmt"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// Client wraps the Paho MQTT client
type Client struct {
client MQTT.Client
config *Config
}
// Config holds MQTT client configuration
type Config struct {
Broker string
Port int
ClientID string
Username string
Password string
KeepAlive int // seconds
PingTimeout time.Duration
ConnectTimeout time.Duration
MaxReconnectDelay time.Duration
AutoReconnect bool
CleanSession bool
UseTLS bool
TLSConfig *tls.Config
// Last Will & Testament
WillEnabled bool
WillTopic string
WillPayload string
WillQoS byte
WillRetained bool
// Callbacks
OnConnect func(MQTT.Client)
OnConnectionLost func(MQTT.Client, error)
}
// NewClient creates a new MQTT client
func NewClient(cfg *Config) (*Client, error) {
opts := MQTT.NewClientOptions()
// Broker configuration
brokerURL := fmt.Sprintf("tcp://%s:%d", cfg.Broker, cfg.Port)
if cfg.UseTLS {
brokerURL = fmt.Sprintf("ssl://%s:%d", cfg.Broker, cfg.Port)
if cfg.TLSConfig != nil {
opts.SetTLSConfig(cfg.TLSConfig)
}
}
opts.AddBroker(brokerURL)
// Client configuration
opts.SetClientID(cfg.ClientID)
if cfg.Username != "" {
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
}
// Connection parameters
opts.SetKeepAlive(time.Duration(cfg.KeepAlive) * time.Second)
opts.SetPingTimeout(cfg.PingTimeout)
opts.SetConnectTimeout(cfg.ConnectTimeout)
opts.SetMaxReconnectInterval(cfg.MaxReconnectDelay)
opts.SetAutoReconnect(cfg.AutoReconnect)
opts.SetCleanSession(cfg.CleanSession)
// Last Will & Testament
if cfg.WillEnabled {
opts.SetWill(cfg.WillTopic, cfg.WillPayload, cfg.WillQoS, cfg.WillRetained)
}
// Callbacks
opts.SetOnConnectHandler(func(c MQTT.Client) {
log.Printf("Connected to MQTT broker: %s", brokerURL)
if cfg.OnConnect != nil {
cfg.OnConnect(c)
}
})
opts.SetConnectionLostHandler(func(c MQTT.Client, err error) {
log.Printf("Connection lost: %v", err)
if cfg.OnConnectionLost != nil {
cfg.OnConnectionLost(c, err)
}
})
// Default message handler (for debugging)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
log.Printf("Unexpected message on topic %s: %s", msg.Topic(), string(msg.Payload()))
})
// Create and connect client
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("connect: %w", token.Error())
}
return &Client{
client: client,
config: cfg,
}, nil
}
// Disconnect closes the connection
func (c *Client) Disconnect(quiesce uint) {
c.client.Disconnect(quiesce)
log.Println("Disconnected from MQTT broker")
}
// IsConnected returns connection status
func (c *Client) IsConnected() bool {
return c.client.IsConnected()
}
// GetClient returns the underlying Paho client
func (c *Client) GetClient() MQTT.Client {
return c.client
}
3. Publisher Implementation
// internal/mqtt/publisher.go
package mqtt
import (
"fmt"
"log"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// Publisher publishes messages to MQTT topics
type Publisher struct {
client *Client
}
// NewPublisher creates a new publisher
func NewPublisher(client *Client) *Publisher {
return &Publisher{client: client}
}
// Publish publishes a message to a topic
func (p *Publisher) Publish(topic string, qos byte, retained bool, payload interface{}) error {
if !p.client.IsConnected() {
return fmt.Errorf("client not connected")
}
token := p.client.client.Publish(topic, qos, retained, payload)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("publish: %w", token.Error())
}
log.Printf("Published to %s (QoS %d, retained: %v): %v", topic, qos, retained, payload)
return nil
}
// PublishAsync publishes without waiting for acknowledgment
func (p *Publisher) PublishAsync(topic string, qos byte, retained bool, payload interface{}, callback func(error)) {
if !p.client.IsConnected() {
callback(fmt.Errorf("client not connected"))
return
}
token := p.client.client.Publish(topic, qos, retained, payload)
go func() {
token.Wait()
if token.Error() != nil {
callback(token.Error())
} else {
callback(nil)
}
}()
}
// PublishJSON publishes JSON data
func (p *Publisher) PublishJSON(topic string, qos byte, retained bool, data []byte) error {
return p.Publish(topic, qos, retained, data)
}
// PublishWithRetry publishes with automatic retry on failure
func (p *Publisher) PublishWithRetry(topic string, qos byte, retained bool, payload interface{}, maxRetries int) error {
var err error
backoff := 100 * time.Millisecond
for i := 0; i <= maxRetries; i++ {
err = p.Publish(topic, qos, retained, payload)
if err == nil {
return nil
}
if i < maxRetries {
log.Printf("Publish failed (attempt %d/%d): %v. Retrying in %v", i+1, maxRetries, err, backoff)
time.Sleep(backoff)
backoff *= 2 // Exponential backoff
}
}
return fmt.Errorf("publish failed after %d retries: %w", maxRetries, err)
}
4. Subscriber Implementation
// internal/mqtt/subscriber.go
package mqtt
import (
"fmt"
"log"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// MessageHandler handles incoming messages
type MessageHandler func(topic string, payload []byte) error
// Subscriber subscribes to MQTT topics
type Subscriber struct {
client *Client
}
// NewSubscriber creates a new subscriber
func NewSubscriber(client *Client) *Subscriber {
return &Subscriber{client: client}
}
// Subscribe subscribes to a topic with a handler
func (s *Subscriber) Subscribe(topic string, qos byte, handler MessageHandler) error {
if !s.client.IsConnected() {
return fmt.Errorf("client not connected")
}
callback := func(client MQTT.Client, msg MQTT.Message) {
log.Printf("Received on %s (QoS %d, retained: %v): %s",
msg.Topic(), msg.Qos(), msg.Retained(), string(msg.Payload()))
if err := handler(msg.Topic(), msg.Payload()); err != nil {
log.Printf("Handler error for topic %s: %v", msg.Topic(), err)
}
}
token := s.client.client.Subscribe(topic, qos, callback)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("subscribe: %w", token.Error())
}
log.Printf("Subscribed to %s (QoS %d)", topic, qos)
return nil
}
// SubscribeMultiple subscribes to multiple topics
func (s *Subscriber) SubscribeMultiple(topics map[string]byte, handler MessageHandler) error {
if !s.client.IsConnected() {
return fmt.Errorf("client not connected")
}
filters := make(map[string]byte)
callbacks := make(map[string]MQTT.MessageHandler)
callback := func(client MQTT.Client, msg MQTT.Message) {
log.Printf("Received on %s: %s", msg.Topic(), string(msg.Payload()))
if err := handler(msg.Topic(), msg.Payload()); err != nil {
log.Printf("Handler error: %v", err)
}
}
for topic, qos := range topics {
filters[topic] = qos
callbacks[topic] = callback
}
token := s.client.client.SubscribeMultiple(filters, nil)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("subscribe multiple: %w", token.Error())
}
log.Printf("Subscribed to %d topics", len(topics))
return nil
}
// Unsubscribe unsubscribes from topics
func (s *Subscriber) Unsubscribe(topics ...string) error {
if !s.client.IsConnected() {
return fmt.Errorf("client not connected")
}
token := s.client.client.Unsubscribe(topics...)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("unsubscribe: %w", token.Error())
}
log.Printf("Unsubscribed from %d topics", len(topics))
return nil
}
5. Sensor Simulator
// internal/sensor/sensor.go
package sensor
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"mqtt-service/internal/mqtt"
)
// Simulator simulates IoT sensors
type Simulator struct {
publisher *mqtt.Publisher
sensorID string
location string
}
// NewSimulator creates a new sensor simulator
func NewSimulator(publisher *mqtt.Publisher, sensorID, location string) *Simulator {
return &Simulator{
publisher: publisher,
sensorID: sensorID,
location: location,
}
}
// SimulateTemperature simulates temperature sensor
func (s *Simulator) SimulateTemperature(ctx context.Context, interval time.Duration, qos byte) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
baseTemp := 20.0 + rand.Float64()*10 // 20-30°C base
for {
select {
case <-ctx.Done():
log.Printf("Temperature sensor %s stopped", s.sensorID)
return
case <-ticker.C:
// Simulate temperature with random fluctuation
temp := baseTemp + (rand.Float64()-0.5)*2 // ±1°C variation
reading := &SensorReading{
SensorID: s.sensorID,
Type: "temperature",
Value: temp,
Unit: "celsius",
Location: s.location,
Timestamp: time.Now(),
Battery: 80 + rand.Intn(20), // 80-100%
Quality: "good",
}
data, err := reading.ToJSON()
if err != nil {
log.Printf("Error marshaling reading: %v", err)
continue
}
topic := fmt.Sprintf("sensors/%s/temperature", s.location)
if err := s.publisher.PublishJSON(topic, qos, false, data); err != nil {
log.Printf("Error publishing temperature: %v", err)
}
}
}
}
// SimulateMotion simulates motion detector
func (s *Simulator) SimulateMotion(ctx context.Context, qos byte) {
for {
select {
case <-ctx.Done():
log.Printf("Motion sensor %s stopped", s.sensorID)
return
default:
// Random motion detection (every 10-60 seconds)
time.Sleep(time.Duration(10+rand.Intn(50)) * time.Second)
reading := &SensorReading{
SensorID: s.sensorID,
Type: "motion",
Value: 1, // motion detected
Unit: "boolean",
Location: s.location,
Timestamp: time.Now(),
Quality: "good",
}
data, _ := reading.ToJSON()
topic := fmt.Sprintf("sensors/%s/motion", s.location)
// Use QoS 0 for motion (fire and forget)
s.publisher.PublishJSON(topic, qos, false, data)
}
}
}
// SimulateDoorSensor simulates door open/close events
func (s *Simulator) SimulateDoorSensor(ctx context.Context, qos byte) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
isOpen := false
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
isOpen = !isOpen // Toggle door state
value := 0.0
if isOpen {
value = 1.0
}
reading := &SensorReading{
SensorID: s.sensorID,
Type: "door",
Value: value,
Unit: "boolean",
Location: s.location,
Timestamp: time.Now(),
Battery: 90,
Quality: "good",
}
data, _ := reading.ToJSON()
topic := fmt.Sprintf("sensors/%s/door", s.location)
// Use QoS 2 for door sensor (exactly once)
// Retained message so new subscribers get current state
s.publisher.PublishJSON(topic, qos, true, data)
}
}
}
// PublishStatus publishes device status
func (s *Simulator) PublishStatus(qos byte) error {
status := &DeviceStatus{
DeviceID: s.sensorID,
Online: true,
LastSeen: time.Now(),
FirmwareVer: "1.2.3",
SignalStrength: -50 + rand.Intn(20), // -50 to -30 dBm
Uptime: int64(rand.Intn(86400)),
}
data, err := status.ToJSON()
if err != nil {
return err
}
topic := fmt.Sprintf("devices/%s/status", s.sensorID)
return s.publisher.PublishJSON(topic, qos, true, data)
}
6. Message Handler
// internal/handler/handler.go
package handler
import (
"encoding/json"
"fmt"
"log"
"mqtt-service/internal/sensor"
)
// SensorHandler processes sensor messages
type SensorHandler struct {
// Could connect to database, cache, or other services
}
// NewSensorHandler creates a new sensor handler
func NewSensorHandler() *SensorHandler {
return &SensorHandler{}
}
// HandleMessage processes incoming MQTT messages
func (h *SensorHandler) HandleMessage(topic string, payload []byte) error {
log.Printf("Processing message from topic: %s", topic)
// Parse sensor reading
reading, err := sensor.FromJSON(payload)
if err != nil {
return fmt.Errorf("parse reading: %w", err)
}
// Process based on sensor type
switch reading.Type {
case "temperature":
return h.handleTemperature(reading)
case "motion":
return h.handleMotion(reading)
case "door":
return h.handleDoor(reading)
default:
log.Printf("Unknown sensor type: %s", reading.Type)
return nil
}
}
func (h *SensorHandler) handleTemperature(reading *sensor.SensorReading) error {
log.Printf("Temperature: %.2f°C at %s (sensor: %s)",
reading.Value, reading.Location, reading.SensorID)
// Check for alerts
if reading.Value > 30 {
log.Printf("⚠️ High temperature alert: %.2f°C", reading.Value)
// Could send alert via email, SMS, push notification
} else if reading.Value < 15 {
log.Printf("❄️ Low temperature alert: %.2f°C", reading.Value)
}
// Store in time-series database
// e.g., InfluxDB, TimescaleDB, Prometheus
return nil
}
func (h *SensorHandler) handleMotion(reading *sensor.SensorReading) error {
if reading.Value > 0 {
log.Printf("🚶 Motion detected at %s (sensor: %s)",
reading.Location, reading.SensorID)
// Trigger automation:
// - Turn on lights
// - Send security alert
// - Start recording camera
}
return nil
}
func (h *SensorHandler) handleDoor(reading *sensor.SensorReading) error {
status := "closed"
if reading.Value > 0 {
status = "open"
}
log.Printf("🚪 Door %s at %s (sensor: %s)",
status, reading.Location, reading.SensorID)
// Security logic:
// - Log access
// - Check authorization
// - Send notifications
return nil
}
// AlertHandler processes alert messages
type AlertHandler struct{}
func NewAlertHandler() *AlertHandler {
return &AlertHandler{}
}
func (h *AlertHandler) HandleMessage(topic string, payload []byte) error {
var alert sensor.AlertMessage
if err := json.Unmarshal(payload, &alert); err != nil {
return err
}
log.Printf("🚨 Alert [%s]: %s (sensor: %s)",
alert.Severity, alert.Message, alert.SensorID)
// Route alert based on severity
switch alert.Severity {
case "critical":
// Send immediate notification (SMS, phone call)
log.Println("Sending critical alert notification")
case "warning":
// Send email or push notification
log.Println("Sending warning notification")
case "info":
// Log to dashboard
log.Println("Logging info alert")
}
return nil
}
7. Publisher Application
// cmd/publisher/main.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"mqtt-service/internal/mqtt"
"mqtt-service/internal/sensor"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Device offline status for LWT
deviceID := "sensor-001"
lwtPayload := fmt.Sprintf(`{"device_id":"%s","online":false,"timestamp":"%s"}`,
deviceID, time.Now().Format(time.RFC3339))
// Create MQTT client with Last Will & Testament
client, err := mqtt.NewClient(&mqtt.Config{
Broker: "localhost",
Port: 1883,
ClientID: deviceID,
Username: "",
Password: "",
KeepAlive: 60,
PingTimeout: 10 * time.Second,
ConnectTimeout: 30 * time.Second,
MaxReconnectDelay: 60 * time.Second,
AutoReconnect: true,
CleanSession: false, // Persistent session
// Last Will & Testament
WillEnabled: true,
WillTopic: fmt.Sprintf("devices/%s/status", deviceID),
WillPayload: lwtPayload,
WillQoS: 1,
WillRetained: true,
OnConnect: func(c MQTT.Client) {
log.Println("Publisher connected to broker")
// Publish online status
publisher := mqtt.NewPublisher(&mqtt.Client{GetClient: func() MQTT.Client { return c }})
onlinePayload := fmt.Sprintf(`{"device_id":"%s","online":true,"timestamp":"%s"}`,
deviceID, time.Now().Format(time.RFC3339))
publisher.Publish(fmt.Sprintf("devices/%s/status", deviceID), 1, true, onlinePayload)
},
OnConnectionLost: func(c MQTT.Client, err error) {
log.Printf("Connection lost: %v. Will auto-reconnect...", err)
},
})
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(1000)
// Create publisher
publisher := mqtt.NewPublisher(client)
// Create sensor simulators
tempSensor := sensor.NewSimulator(publisher, "temp-sensor-001", "bedroom")
motionSensor := sensor.NewSimulator(publisher, "motion-sensor-001", "bedroom")
doorSensor := sensor.NewSimulator(publisher, "door-sensor-001", "front")
// Start simulators
go tempSensor.SimulateTemperature(ctx, 5*time.Second, 1) // QoS 1
go motionSensor.SimulateMotion(ctx, 0) // QoS 0
go doorSensor.SimulateDoorSensor(ctx, 2) // QoS 2
// Publish device status periodically
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tempSensor.PublishStatus(1)
}
}
}()
log.Println("Publisher started. Publishing sensor data...")
log.Println("Press Ctrl+C to stop")
// Wait for shutdown signal
<-sigChan
log.Println("Shutting down publisher...")
cancel()
// Give time for graceful shutdown
time.Sleep(2 * time.Second)
}
8. Subscriber Application
// cmd/subscriber/main.go
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"mqtt-service/internal/handler"
"mqtt-service/internal/mqtt"
)
func main() {
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create MQTT client
client, err := mqtt.NewClient(&mqtt.Config{
Broker: "localhost",
Port: 1883,
ClientID: "subscriber-001",
Username: "",
Password: "",
KeepAlive: 60,
PingTimeout: 10 * time.Second,
ConnectTimeout: 30 * time.Second,
MaxReconnectDelay: 60 * time.Second,
AutoReconnect: true,
CleanSession: false, // Persistent session to receive offline messages
OnConnect: func(c MQTT.Client) {
log.Println("Subscriber connected to broker")
},
})
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(1000)
// Create subscriber
subscriber := mqtt.NewSubscriber(client)
// Create message handlers
sensorHandler := handler.NewSensorHandler()
alertHandler := handler.NewAlertHandler()
// Subscribe to multiple topics with wildcards
subscriptions := map[string]byte{
"sensors/+/temperature": 1, // All temperature sensors, QoS 1
"sensors/+/motion": 0, // All motion sensors, QoS 0
"sensors/+/door": 2, // All door sensors, QoS 2
"devices/+/status": 1, // All device status, QoS 1
"alerts/#": 1, // All alerts, QoS 1
}
// Subscribe to sensor topics
if err := subscriber.SubscribeMultiple(subscriptions, func(topic string, payload []byte) error {
// Route to appropriate handler based on topic
if topic[:7] == "sensors" {
return sensorHandler.HandleMessage(topic, payload)
} else if topic[:6] == "alerts" {
return alertHandler.HandleMessage(topic, payload)
} else if topic[:7] == "devices" {
log.Printf("Device status update: %s", string(payload))
return nil
}
return nil
}); err != nil {
log.Fatal(err)
}
// Also subscribe to specific critical alerts with higher QoS
if err := subscriber.Subscribe("alerts/critical/#", 2, func(topic string, payload []byte) error {
log.Printf("🚨 CRITICAL ALERT on %s: %s", topic, string(payload))
// Send immediate notification
return nil
}); err != nil {
log.Fatal(err)
}
log.Println("Subscriber started. Listening for messages...")
log.Printf("Subscribed to %d topic patterns", len(subscriptions)+1)
log.Println("Press Ctrl+C to stop")
// Wait for shutdown signal
<-sigChan
log.Println("Shutting down subscriber...")
}
Advanced Patterns
1. Last Will & Testament (LWT)
// Detecting device disconnects
config := &mqtt.Config{
// ... other config ...
WillEnabled: true,
WillTopic: "devices/sensor-001/status",
WillPayload: `{"online":false,"reason":"unexpected_disconnect"}`,
WillQoS: 1,
WillRetained: true, // New subscribers get last known state
}
// Monitor device status
subscriber.Subscribe("devices/+/status", 1, func(topic string, payload []byte) error {
var status sensor.DeviceStatus
json.Unmarshal(payload, &status)
if !status.Online {
log.Printf("Device %s went offline!", status.DeviceID)
// Trigger alerts, failover, etc.
}
return nil
})
2. Retained Messages
// Publish current state as retained
publisher.Publish("home/thermostat/setpoint", 1, true, "22.5")
// New subscribers immediately receive the last retained message
subscriber.Subscribe("home/thermostat/setpoint", 1, func(topic string, payload []byte) error {
log.Printf("Current setpoint: %s°C", string(payload))
return nil
})
// Clear retained message
publisher.Publish("home/thermostat/setpoint", 1, true, "")
3. Request-Response Pattern
// Request-response using MQTT
type RequestHandler struct {
publisher *mqtt.Publisher
responses map[string]chan []byte
mu sync.Mutex
}
func (h *RequestHandler) Request(ctx context.Context, topic string, payload []byte) ([]byte, error) {
// Generate correlation ID
correlationID := uuid.New().String()
responseTopic := fmt.Sprintf("responses/%s", correlationID)
// Create response channel
respChan := make(chan []byte, 1)
h.mu.Lock()
h.responses[correlationID] = respChan
h.mu.Unlock()
defer func() {
h.mu.Lock()
delete(h.responses, correlationID)
h.mu.Unlock()
}()
// Publish request with response topic in message
requestMsg := map[string]interface{}{
"correlation_id": correlationID,
"response_topic": responseTopic,
"payload": payload,
}
data, _ := json.Marshal(requestMsg)
if err := h.publisher.PublishJSON(topic, 1, false, data); err != nil {
return nil, err
}
// Wait for response
select {
case response := <-respChan:
return response, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request timeout")
}
}
4. Topic-based Routing
// Smart routing based on topic hierarchy
type Router struct {
handlers map[string]mqtt.MessageHandler
mu sync.RWMutex
}
func (r *Router) Register(pattern string, handler mqtt.MessageHandler) {
r.mu.Lock()
defer r.mu.Unlock()
r.handlers[pattern] = handler
}
func (r *Router) Route(topic string, payload []byte) error {
r.mu.RLock()
defer r.mu.RUnlock()
// Match exact topics
if handler, ok := r.handlers[topic]; ok {
return handler(topic, payload)
}
// Match wildcard patterns
for pattern, handler := range r.handlers {
if matchTopic(pattern, topic) {
return handler(topic, payload)
}
}
return fmt.Errorf("no handler for topic: %s", topic)
}
func matchTopic(pattern, topic string) bool {
// Implement MQTT topic matching with + and # wildcards
// + matches single level, # matches multiple levels
// Implementation left as exercise
return false
}
5. Shared Subscriptions (MQTT 5.0)
// Load balancing across multiple subscribers
// Topic: $share/group1/sensors/+/temperature
subscriber1.Subscribe("$share/workers/sensors/+/data", 1, handler1)
subscriber2.Subscribe("$share/workers/sensors/+/data", 1, handler2)
subscriber3.Subscribe("$share/workers/sensors/+/data", 1, handler3)
// Broker distributes messages across worker1, worker2, worker3
// Each message delivered to only one subscriber in the group
Dependencies
// go.mod
module mqtt-service
go 1.21
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.19.0
)
Docker Compose Setup
# deployments/docker-compose.yml
version: '3.8'
services:
# Mosquitto MQTT Broker
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT
- "9001:9001" # WebSocket
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stopped
# EMQX MQTT Broker (alternative - more features)
emqx:
image: emqx/emqx:5.3.2
ports:
- "1883:1883" # MQTT
- "8083:8083" # WebSocket
- "8084:8084" # WSS
- "8883:8883" # MQTT/SSL
- "18083:18083" # Dashboard
environment:
- EMQX_NAME=emqx
- EMQX_HOST=127.0.0.1
volumes:
- emqx_data:/opt/emqx/data
- emqx_log:/opt/emqx/log
restart: unless-stopped
# MQTT Explorer (GUI for debugging)
mqtt-explorer:
image: smeagolworms4/mqtt-explorer
ports:
- "4000:4000"
restart: unless-stopped
volumes:
emqx_data:
emqx_log:
Mosquitto Config:
# mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
# WebSocket support
listener 9001
protocol websockets
# Logging
log_dest stdout
log_type all
# Persistence
persistence true
persistence_location /mosquitto/data/
Best Practices
1. QoS Selection
QoS 0 (At Most Once): Use for high-frequency, non-critical data
// Examples: Motion detection, heartbeats, metrics
publisher.Publish("sensors/motion", 0, false, payload)
QoS 1 (At Least Once): Use for important data where duplicates are acceptable
// Examples: Temperature readings, status updates
publisher.Publish("sensors/temperature", 1, false, payload)
QoS 2 (Exactly Once): Use for critical data requiring exactly-once delivery
// Examples: Financial transactions, access control, alarms
publisher.Publish("alerts/critical", 2, false, payload)
2. Topic Design
Good Hierarchy:
sensors/
├── location/
│ ├── bedroom/
│ │ ├── temperature
│ │ ├── humidity
│ │ └── motion
│ └── kitchen/
│ └── temperature
└── type/
├── temperature
└── motion
devices/
└── {device-id}/
├── status
├── config
└── telemetry
alerts/
├── critical/
├── warning/
└── info/
Avoid:
- Too deep hierarchies (> 7 levels)
- Generic topics like “data” or “message”
- Mixing different data types in same topic
3. Connection Management
// Implement exponential backoff
config := &mqtt.Config{
AutoReconnect: true,
MaxReconnectDelay: 60 * time.Second,
OnConnectionLost: func(c MQTT.Client, err error) {
log.Printf("Connection lost: %v", err)
// Metrics, alerts, etc.
},
}
// Use persistent sessions for critical clients
config.CleanSession = false
// Set appropriate keep-alive
config.KeepAlive = 60 // seconds
4. Security
// Use TLS
config := &mqtt.Config{
UseTLS: true,
TLSConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: false,
},
}
// Use authentication
config.Username = "device-001"
config.Password = os.Getenv("MQTT_PASSWORD")
// Use client certificates for device authentication
5. Monitoring
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
messagesPublished = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mqtt_messages_published_total",
Help: "Total messages published",
},
[]string{"topic", "qos"},
)
messagesReceived = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mqtt_messages_received_total",
Help: "Total messages received",
},
[]string{"topic"},
)
connectionStatus = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "mqtt_connection_status",
Help: "MQTT connection status (1=connected, 0=disconnected)",
},
)
)
// Track metrics in callbacks
config.OnConnect = func(c MQTT.Client) {
connectionStatus.Set(1)
}
config.OnConnectionLost = func(c MQTT.Client, err error) {
connectionStatus.Set(0)
}
When to Use MQTT
✅ Perfect for:
- IoT devices with constrained resources
- Unreliable networks (mobile, satellite)
- Battery-powered devices
- Publish-subscribe messaging
- Scenarios requiring QoS guarantees
- Simple device-to-cloud communication
- M2M (machine-to-machine) communication
❌ Not ideal for:
- High-throughput event streaming (use Kafka)
- Complex request-response patterns (use gRPC)
- Browser-based applications (use WebSockets or SSE)
- Video/audio streaming (use WebRTC)
- When you need message ordering across topics
- Large message payloads (> 256 KB)
Conclusion
MQTT is the gold standard for IoT communication, offering lightweight messaging with sophisticated features like QoS levels, retained messages, and Last Will & Testament. Its small footprint, low bandwidth requirements, and resilience to network issues make it ideal for connecting millions of devices.
Key takeaways:
- Choose appropriate QoS levels for different data types
- Design hierarchical topic structures for scalability
- Leverage Last Will & Testament for device monitoring
- Use retained messages for state synchronization
- Implement persistent sessions for critical subscribers
- Monitor connection health and message delivery
MQTT shines when connecting resource-constrained devices over unreliable networks, making it the perfect choice for IoT, industrial automation, and mobile applications.
{{series_nav(current_post=10)}}