MQTT in Go: IoT & Low-Bandwidth Communication

    {{series_nav(current_post=10)}} MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and unreliable networks. Originally developed by IBM for oil pipeline monitoring, MQTT has become the de facto standard for IoT communication, powering everything from smart homes to industrial sensors. What is MQTT? MQTT is a binary protocol built on TCP/IP that provides publish-subscribe messaging with minimal overhead. It’s designed for scenarios where bandwidth is limited, network connectivity is unreliable, and devices have constrained resources. ...

    February 15, 2025 · 20 min · Rafiul Alam

    CQRS Pattern in Go: Separating Reads and Writes

    Go Architecture Patterns Series: ← Previous: Event-Driven Architecture | Series Overview | Next: Event Sourcing → What is CQRS Pattern? Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands). This separation allows you to optimize each side independently for better performance, scalability, and maintainability. Key Principles: Command Model: Handles writes, updates, and business logic validation Query Model: Handles reads, optimized for data retrieval Separate Data Stores: Optional use of different databases for reads and writes Eventual Consistency: Query model may lag behind command model Single Responsibility: Each model focused on its specific task Independent Scaling: Scale read and write operations separately Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% graph TB subgraph "Client Layer" Client[Client Application] end subgraph "Command Side" CommandAPI[Command API] CommandHandler[Command Handlers] CommandModel[Command Model] WriteDB[(Write Database)] end subgraph "Query Side" QueryAPI[Query API] QueryHandler[Query Handlers] QueryModel[Read Model] ReadDB[(Read Database)] end subgraph "Synchronization" EventBus[Event Bus] Projections[Projections/Denormalizers] end Client -->|Commands| CommandAPI Client -->|Queries| QueryAPI CommandAPI --> CommandHandler CommandHandler --> CommandModel CommandModel --> WriteDB QueryAPI --> QueryHandler QueryHandler --> QueryModel QueryModel --> ReadDB CommandHandler -->|Events| EventBus EventBus --> Projections Projections --> ReadDB style CommandAPI fill:#78350f,color:#fff style QueryAPI fill:#1e3a5f,color:#fff style EventBus fill:#134e4a,color:#fff style Projections fill:#4c1d95,color:#fff CQRS Flow Pattern %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% sequenceDiagram participant Client participant CommandAPI participant CommandHandler participant WriteDB participant EventBus participant Projection participant ReadDB participant QueryAPI Note over Client,QueryAPI: Write Operation Client->>CommandAPI: Create Order Command CommandAPI->>CommandHandler: Handle Command CommandHandler->>CommandHandler: Validate Business Rules CommandHandler->>WriteDB: Save Order WriteDB-->>CommandHandler: Success CommandHandler->>EventBus: Publish OrderCreated Event CommandHandler-->>Client: Command Accepted (202) Note over EventBus,ReadDB: Background Synchronization EventBus->>Projection: OrderCreated Event Projection->>Projection: Build Read Model Projection->>ReadDB: Update Denormalized View Note over Client,QueryAPI: Read Operation Client->>QueryAPI: Get Order Query QueryAPI->>ReadDB: Query Read Model ReadDB-->>QueryAPI: Order Data QueryAPI-->>Client: Order Details (200) Real-World Use Cases E-commerce Platforms: Separate product catalog writes from fast product searches Banking Systems: Transaction processing vs. account balance queries Analytics Platforms: Data ingestion vs. complex reporting queries Social Media: Post creation vs. feed generation Inventory Management: Stock updates vs. availability queries Audit Systems: Event recording vs. audit log queries CQRS Pattern Implementation Project Structure cqrs-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── commands/ │ │ ├── create_order.go │ │ ├── update_order.go │ │ └── handler.go │ ├── queries/ │ │ ├── get_order.go │ │ ├── list_orders.go │ │ └── handler.go │ ├── domain/ │ │ └── order.go │ ├── write/ │ │ ├── repository.go │ │ └── postgres_repository.go │ ├── read/ │ │ ├── repository.go │ │ └── postgres_repository.go │ ├── projections/ │ │ └── order_projection.go │ └── events/ │ └── events.go └── go.mod Domain Model // internal/domain/order.go package domain import ( "errors" "time" ) type OrderID string type OrderStatus string const ( OrderStatusPending OrderStatus = "pending" OrderStatusConfirmed OrderStatus = "confirmed" OrderStatusShipped OrderStatus = "shipped" OrderStatusDelivered OrderStatus = "delivered" OrderStatusCancelled OrderStatus = "cancelled" ) type OrderItem struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` Price float64 `json:"price"` } // Write Model - Rich domain model with business logic type Order struct { ID OrderID UserID string Items []OrderItem Total float64 Status OrderStatus CreatedAt time.Time UpdatedAt time.Time Version int // For optimistic locking } var ( ErrInvalidOrder = errors.New("invalid order") ErrOrderNotFound = errors.New("order not found") ErrInvalidStatus = errors.New("invalid status transition") ErrConcurrentUpdate = errors.New("concurrent update detected") ) // NewOrder creates a new order with validation func NewOrder(userID string, items []OrderItem) (*Order, error) { if userID == "" { return nil, ErrInvalidOrder } if len(items) == 0 { return nil, ErrInvalidOrder } var total float64 for _, item := range items { if item.Quantity <= 0 || item.Price < 0 { return nil, ErrInvalidOrder } total += item.Price * float64(item.Quantity) } return &Order{ ID: OrderID(generateID()), UserID: userID, Items: items, Total: total, Status: OrderStatusPending, CreatedAt: time.Now(), UpdatedAt: time.Now(), Version: 1, }, nil } // Confirm transitions order to confirmed status func (o *Order) Confirm() error { if o.Status != OrderStatusPending { return ErrInvalidStatus } o.Status = OrderStatusConfirmed o.UpdatedAt = time.Now() o.Version++ return nil } // Ship transitions order to shipped status func (o *Order) Ship() error { if o.Status != OrderStatusConfirmed { return ErrInvalidStatus } o.Status = OrderStatusShipped o.UpdatedAt = time.Now() o.Version++ return nil } // Cancel cancels the order func (o *Order) Cancel() error { if o.Status == OrderStatusDelivered || o.Status == OrderStatusCancelled { return ErrInvalidStatus } o.Status = OrderStatusCancelled o.UpdatedAt = time.Now() o.Version++ return nil } func generateID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Commands // internal/commands/create_order.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/events" ) // CreateOrderCommand represents a command to create an order type CreateOrderCommand struct { UserID string `json:"user_id"` Items []domain.OrderItem `json:"items"` } // CreateOrderHandler handles order creation type CreateOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewCreateOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CreateOrderHandler { return &CreateOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *CreateOrderHandler) Handle(ctx context.Context, cmd *CreateOrderCommand) (*domain.Order, error) { // Create domain model with business logic validation order, err := domain.NewOrder(cmd.UserID, cmd.Items) if err != nil { return nil, fmt.Errorf("invalid command: %w", err) } // Persist to write store if err := h.writeRepo.Save(ctx, order); err != nil { return nil, fmt.Errorf("failed to save order: %w", err) } // Publish event for read model synchronization event := events.OrderCreatedEvent{ OrderID: string(order.ID), UserID: order.UserID, Items: order.Items, Total: order.Total, Status: string(order.Status), CreatedAt: order.CreatedAt, } if err := h.eventBus.Publish(ctx, "order.created", event); err != nil { // Log error but don't fail the command - eventual consistency fmt.Printf("Failed to publish event: %v\n", err) } return order, nil } // internal/commands/update_order.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/events" ) // ConfirmOrderCommand confirms an order type ConfirmOrderCommand struct { OrderID string `json:"order_id"` } type ConfirmOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewConfirmOrderHandler(repo domain.WriteRepository, bus events.EventBus) *ConfirmOrderHandler { return &ConfirmOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd *ConfirmOrderCommand) error { // Load from write store order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID)) if err != nil { return fmt.Errorf("order not found: %w", err) } // Execute domain logic if err := order.Confirm(); err != nil { return fmt.Errorf("cannot confirm order: %w", err) } // Save with optimistic locking if err := h.writeRepo.Save(ctx, order); err != nil { return fmt.Errorf("failed to save order: %w", err) } // Publish event event := events.OrderConfirmedEvent{ OrderID: string(order.ID), Status: string(order.Status), ConfirmedAt: order.UpdatedAt, } if err := h.eventBus.Publish(ctx, "order.confirmed", event); err != nil { fmt.Printf("Failed to publish event: %v\n", err) } return nil } // CancelOrderCommand cancels an order type CancelOrderCommand struct { OrderID string `json:"order_id"` Reason string `json:"reason"` } type CancelOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewCancelOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CancelOrderHandler { return &CancelOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *CancelOrderHandler) Handle(ctx context.Context, cmd *CancelOrderCommand) error { order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID)) if err != nil { return fmt.Errorf("order not found: %w", err) } if err := order.Cancel(); err != nil { return fmt.Errorf("cannot cancel order: %w", err) } if err := h.writeRepo.Save(ctx, order); err != nil { return fmt.Errorf("failed to save order: %w", err) } event := events.OrderCancelledEvent{ OrderID: string(order.ID), Status: string(order.Status), Reason: cmd.Reason, CancelledAt: order.UpdatedAt, } if err := h.eventBus.Publish(ctx, "order.cancelled", event); err != nil { fmt.Printf("Failed to publish event: %v\n", err) } return nil } Queries // internal/queries/get_order.go package queries import ( "context" "time" ) // OrderReadModel - Optimized for reading, denormalized type OrderReadModel struct { ID string `json:"id"` UserID string `json:"user_id"` UserName string `json:"user_name"` // Denormalized UserEmail string `json:"user_email"` // Denormalized Items []OrderItemReadModel `json:"items"` ItemCount int `json:"item_count"` // Computed Total float64 `json:"total"` Status string `json:"status"` StatusDisplay string `json:"status_display"` // Formatted for display CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type OrderItemReadModel struct { ProductID string `json:"product_id"` ProductName string `json:"product_name"` // Denormalized Quantity int `json:"quantity"` Price float64 `json:"price"` Subtotal float64 `json:"subtotal"` // Computed } // GetOrderQuery retrieves a single order type GetOrderQuery struct { OrderID string `json:"order_id"` } type GetOrderHandler struct { readRepo ReadRepository } func NewGetOrderHandler(repo ReadRepository) *GetOrderHandler { return &GetOrderHandler{readRepo: repo} } func (h *GetOrderHandler) Handle(ctx context.Context, query *GetOrderQuery) (*OrderReadModel, error) { return h.readRepo.GetByID(ctx, query.OrderID) } // internal/queries/list_orders.go package queries import ( "context" ) // ListOrdersQuery lists orders with filters type ListOrdersQuery struct { UserID string `json:"user_id"` Status string `json:"status"` Page int `json:"page"` PageSize int `json:"page_size"` } type OrderListResult struct { Orders []*OrderReadModel `json:"orders"` Total int `json:"total"` Page int `json:"page"` PageSize int `json:"page_size"` TotalPages int `json:"total_pages"` } type ListOrdersHandler struct { readRepo ReadRepository } func NewListOrdersHandler(repo ReadRepository) *ListOrdersHandler { return &ListOrdersHandler{readRepo: repo} } func (h *ListOrdersHandler) Handle(ctx context.Context, query *ListOrdersQuery) (*OrderListResult, error) { // Set defaults if query.Page < 1 { query.Page = 1 } if query.PageSize < 1 || query.PageSize > 100 { query.PageSize = 20 } offset := (query.Page - 1) * query.PageSize orders, total, err := h.readRepo.List(ctx, query.UserID, query.Status, query.PageSize, offset) if err != nil { return nil, err } totalPages := (total + query.PageSize - 1) / query.PageSize return &OrderListResult{ Orders: orders, Total: total, Page: query.Page, PageSize: query.PageSize, TotalPages: totalPages, }, nil } // ReadRepository interface for query side type ReadRepository interface { GetByID(ctx context.Context, id string) (*OrderReadModel, error) List(ctx context.Context, userID, status string, limit, offset int) ([]*OrderReadModel, int, error) } Projections // internal/projections/order_projection.go package projections import ( "context" "database/sql" "fmt" "app/internal/events" "app/internal/queries" ) // OrderProjection builds read models from events type OrderProjection struct { db *sql.DB } func NewOrderProjection(db *sql.DB) *OrderProjection { return &OrderProjection{db: db} } // HandleOrderCreated projects OrderCreated event to read model func (p *OrderProjection) HandleOrderCreated(ctx context.Context, event events.OrderCreatedEvent) error { tx, err := p.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Insert denormalized order query := ` INSERT INTO order_read_model (id, user_id, user_name, user_email, total, status, status_display, item_count, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ` // Fetch user details for denormalization userName, userEmail := p.getUserDetails(ctx, event.UserID) _, err = tx.ExecContext(ctx, query, event.OrderID, event.UserID, userName, userEmail, event.Total, event.Status, formatStatus(event.Status), len(event.Items), event.CreatedAt, event.CreatedAt, ) if err != nil { return fmt.Errorf("failed to insert order: %w", err) } // Insert order items with denormalized product info itemQuery := ` INSERT INTO order_item_read_model (order_id, product_id, product_name, quantity, price, subtotal) VALUES ($1, $2, $3, $4, $5, $6) ` for _, item := range event.Items { productName := p.getProductName(ctx, item.ProductID) subtotal := item.Price * float64(item.Quantity) _, err = tx.ExecContext(ctx, itemQuery, event.OrderID, item.ProductID, productName, item.Quantity, item.Price, subtotal, ) if err != nil { return fmt.Errorf("failed to insert order item: %w", err) } } return tx.Commit() } // HandleOrderConfirmed updates read model when order is confirmed func (p *OrderProjection) HandleOrderConfirmed(ctx context.Context, event events.OrderConfirmedEvent) error { query := ` UPDATE order_read_model SET status = $2, status_display = $3, updated_at = $4 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.OrderID, event.Status, formatStatus(event.Status), event.ConfirmedAt, ) return err } // HandleOrderCancelled updates read model when order is cancelled func (p *OrderProjection) HandleOrderCancelled(ctx context.Context, event events.OrderCancelledEvent) error { query := ` UPDATE order_read_model SET status = $2, status_display = $3, updated_at = $4 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.OrderID, event.Status, formatStatus(event.Status)+" ("+event.Reason+")", event.CancelledAt, ) return err } func (p *OrderProjection) getUserDetails(ctx context.Context, userID string) (name, email string) { // Query user service or cache query := `SELECT name, email FROM users WHERE id = $1` p.db.QueryRowContext(ctx, query, userID).Scan(&name, &email) return } func (p *OrderProjection) getProductName(ctx context.Context, productID string) string { // Query product service or cache var name string query := `SELECT name FROM products WHERE id = $1` p.db.QueryRowContext(ctx, query, productID).Scan(&name) return name } func formatStatus(status string) string { // Format status for display switch status { case "pending": return "Pending" case "confirmed": return "Confirmed" case "shipped": return "Shipped" case "delivered": return "Delivered" case "cancelled": return "Cancelled" default: return status } } // RegisterHandlers registers projection handlers with event bus func (p *OrderProjection) RegisterHandlers(bus events.EventBus) { bus.Subscribe("order.created", func(ctx context.Context, data interface{}) error { event := data.(events.OrderCreatedEvent) return p.HandleOrderCreated(ctx, event) }) bus.Subscribe("order.confirmed", func(ctx context.Context, data interface{}) error { event := data.(events.OrderConfirmedEvent) return p.HandleOrderConfirmed(ctx, event) }) bus.Subscribe("order.cancelled", func(ctx context.Context, data interface{}) error { event := data.(events.OrderCancelledEvent) return p.HandleOrderCancelled(ctx, event) }) } Main Application // cmd/api/main.go package main import ( "context" "database/sql" "encoding/json" "log" "net/http" "github.com/gorilla/mux" _ "github.com/lib/pq" "app/internal/commands" "app/internal/events" "app/internal/projections" "app/internal/queries" "app/internal/read" "app/internal/write" ) func main() { // Connect to write database writeDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_write?sslmode=disable") if err != nil { log.Fatal(err) } defer writeDB.Close() // Connect to read database (could be same or different DB) readDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_read?sslmode=disable") if err != nil { log.Fatal(err) } defer readDB.Close() // Initialize event bus eventBus := events.NewInMemoryEventBus() // Initialize repositories writeRepo := write.NewPostgresRepository(writeDB) readRepo := read.NewPostgresRepository(readDB) // Initialize projections orderProjection := projections.NewOrderProjection(readDB) orderProjection.RegisterHandlers(eventBus) // Initialize command handlers createOrderHandler := commands.NewCreateOrderHandler(writeRepo, eventBus) confirmOrderHandler := commands.NewConfirmOrderHandler(writeRepo, eventBus) cancelOrderHandler := commands.NewCancelOrderHandler(writeRepo, eventBus) // Initialize query handlers getOrderHandler := queries.NewGetOrderHandler(readRepo) listOrdersHandler := queries.NewListOrdersHandler(readRepo) // Setup HTTP routes router := mux.NewRouter() // Command endpoints router.HandleFunc("/commands/orders", func(w http.ResponseWriter, r *http.Request) { var cmd commands.CreateOrderCommand if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := createOrderHandler.Handle(r.Context(), &cmd) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(map[string]string{"order_id": string(order.ID)}) }).Methods("POST") router.HandleFunc("/commands/orders/{id}/confirm", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) cmd := commands.ConfirmOrderCommand{OrderID: vars["id"]} if err := confirmOrderHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusAccepted) }).Methods("POST") // Query endpoints router.HandleFunc("/queries/orders/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) query := queries.GetOrderQuery{OrderID: vars["id"]} order, err := getOrderHandler.Handle(r.Context(), &query) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(order) }).Methods("GET") router.HandleFunc("/queries/orders", func(w http.ResponseWriter, r *http.Request) { query := queries.ListOrdersQuery{ UserID: r.URL.Query().Get("user_id"), Status: r.URL.Query().Get("status"), } result, err := listOrdersHandler.Handle(r.Context(), &query) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) }).Methods("GET") log.Println("Server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } Best Practices Clear Command/Query Separation: Never mix command and query logic Eventual Consistency: Accept and handle eventual consistency gracefully Idempotent Projections: Ensure projections can be replayed safely Versioning: Version commands and events for evolution Validation: Validate commands before processing Optimistic Locking: Use versioning to handle concurrent updates Monitoring: Track projection lag and command processing times Testing: Test commands and queries independently Common Pitfalls Over-Engineering: Using CQRS for simple CRUD applications Synchronous Projections: Making projections synchronous defeats the purpose Shared Models: Reusing write models in read side Complex Queries in Write Model: Adding query logic to command side No Monitoring: Not tracking eventual consistency lag Poor Error Handling: Not handling projection failures Inconsistent State: Not handling projection rebuild scenarios When to Use CQRS Pattern Use When: ...

    February 14, 2025 · 13 min · Rafiul Alam

    RabbitMQ & AMQP in Go: Advanced Message Queueing Protocol

    {{series_nav(current_post=8)}} RabbitMQ is one of the most popular open-source message brokers, implementing the Advanced Message Queueing Protocol (AMQP). It acts as a reliable intermediary for asynchronous message passing between applications, providing features like message routing, queuing, delivery guarantees, and complex routing patterns. What is RabbitMQ? RabbitMQ is a message broker that accepts and forwards messages. Unlike direct communication between services, RabbitMQ provides a buffer that decouples producers from consumers, allowing them to operate independently and scale separately. ...

    February 13, 2025 · 19 min · Rafiul Alam

    Event-Driven Architecture in Go: Building Reactive Systems

    Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern → What is Event-Driven Architecture? Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling. Key Principles: Event Producers: Components that generate events when state changes Event Consumers: Components that react to events Event Broker: Middleware that routes events from producers to consumers Asynchronous Communication: Decoupled, non-blocking interactions Event Immutability: Events represent past facts that cannot be changed Eventual Consistency: Systems converge to consistent state over time Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% graph TB subgraph "Event Producers" UserService[User Service] OrderService[Order Service] PaymentService[Payment Service] end subgraph "Event Broker" EventBus[Event Bus / Message Broker] UserTopic[User Events Topic] OrderTopic[Order Events Topic] PaymentTopic[Payment Events Topic] end subgraph "Event Consumers" EmailService[Email Service] AnalyticsService[Analytics Service] NotificationService[Notification Service] InventoryService[Inventory Service] end subgraph "Event Store" EventLog[(Event Log DB)] end UserService -->|UserCreated| UserTopic OrderService -->|OrderPlaced| OrderTopic OrderService -->|OrderConfirmed| OrderTopic PaymentService -->|PaymentProcessed| PaymentTopic UserTopic --> EventBus OrderTopic --> EventBus PaymentTopic --> EventBus EventBus --> EmailService EventBus --> AnalyticsService EventBus --> NotificationService EventBus --> InventoryService EventBus --> EventLog style UserService fill:#1e3a5f,color:#fff style OrderService fill:#78350f,color:#fff style PaymentService fill:#4c1d95,color:#fff style EventBus fill:#134e4a,color:#fff Event Flow Pattern %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% sequenceDiagram participant User participant OrderService participant EventBroker participant EmailService participant InventoryService participant AnalyticsService User->>OrderService: Place Order OrderService->>OrderService: Validate Order OrderService->>EventBroker: Publish OrderPlaced Event Note over EventBroker: Event is distributed to all subscribers EventBroker->>EmailService: OrderPlaced Event EventBroker->>InventoryService: OrderPlaced Event EventBroker->>AnalyticsService: OrderPlaced Event EmailService->>EmailService: Send Confirmation Email InventoryService->>InventoryService: Update Stock AnalyticsService->>AnalyticsService: Record Metrics InventoryService->>EventBroker: Publish StockUpdated Event OrderService-->>User: Order Accepted (202) Real-World Use Cases E-commerce: Order processing, inventory updates, shipping notifications Financial Systems: Transaction processing, fraud detection, account updates IoT Platforms: Sensor data processing, device state changes Social Media: Post updates, notifications, activity feeds Streaming Services: Video processing, recommendation updates Gaming: Player actions, leaderboard updates, achievements Event-Driven Architecture Implementation Project Structure event-driven-app/ ├── cmd/ │ ├── order-service/ │ │ └── main.go │ ├── email-service/ │ │ └── main.go │ └── inventory-service/ │ └── main.go ├── internal/ │ ├── events/ │ │ ├── event.go │ │ ├── bus.go │ │ └── types.go │ ├── broker/ │ │ ├── rabbitmq.go │ │ └── kafka.go │ └── services/ │ ├── order/ │ ├── email/ │ └── inventory/ ├── pkg/ │ └── messaging/ │ └── publisher.go └── go.mod Core Event System // internal/events/event.go package events import ( "encoding/json" "time" ) // Event represents a domain event type Event struct { ID string `json:"id"` Type string `json:"type"` AggregateID string `json:"aggregate_id"` Payload map[string]interface{} `json:"payload"` Metadata map[string]string `json:"metadata"` Timestamp time.Time `json:"timestamp"` Version int `json:"version"` } // NewEvent creates a new event func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event { return &Event{ ID: generateEventID(), Type: eventType, AggregateID: aggregateID, Payload: payload, Metadata: make(map[string]string), Timestamp: time.Now(), Version: 1, } } // ToJSON serializes event to JSON func (e *Event) ToJSON() ([]byte, error) { return json.Marshal(e) } // FromJSON deserializes event from JSON func FromJSON(data []byte) (*Event, error) { var event Event if err := json.Unmarshal(data, &event); err != nil { return nil, err } return &event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } // internal/events/types.go package events const ( // User Events UserCreated = "user.created" UserUpdated = "user.updated" UserDeleted = "user.deleted" // Order Events OrderPlaced = "order.placed" OrderConfirmed = "order.confirmed" OrderCancelled = "order.cancelled" OrderShipped = "order.shipped" OrderDelivered = "order.delivered" // Payment Events PaymentProcessed = "payment.processed" PaymentFailed = "payment.failed" PaymentRefunded = "payment.refunded" // Inventory Events StockUpdated = "inventory.stock_updated" StockDepleted = "inventory.stock_depleted" StockRestocked = "inventory.stock_restocked" ) // internal/events/bus.go package events import ( "context" "fmt" "sync" ) // Handler is a function that processes events type Handler func(ctx context.Context, event *Event) error // EventBus manages event publishing and subscriptions type EventBus struct { mu sync.RWMutex handlers map[string][]Handler middleware []Middleware } // Middleware wraps event handlers type Middleware func(Handler) Handler // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ handlers: make(map[string][]Handler), middleware: make([]Middleware, 0), } } // Subscribe registers a handler for an event type func (b *EventBus) Subscribe(eventType string, handler Handler) { b.mu.Lock() defer b.mu.Unlock() // Apply middleware for i := len(b.middleware) - 1; i >= 0; i-- { handler = b.middleware[i](handler) } b.handlers[eventType] = append(b.handlers[eventType], handler) } // Publish publishes an event to all subscribers func (b *EventBus) Publish(ctx context.Context, event *Event) error { b.mu.RLock() handlers, exists := b.handlers[event.Type] b.mu.RUnlock() if !exists { return nil // No handlers registered } var wg sync.WaitGroup errors := make(chan error, len(handlers)) for _, handler := range handlers { wg.Add(1) go func(h Handler) { defer wg.Done() if err := h(ctx, event); err != nil { errors <- fmt.Errorf("handler error: %w", err) } }(handler) } wg.Wait() close(errors) // Collect errors var handlerErrors []error for err := range errors { handlerErrors = append(handlerErrors, err) } if len(handlerErrors) > 0 { return fmt.Errorf("some handlers failed: %v", handlerErrors) } return nil } // Use adds middleware to the event bus func (b *EventBus) Use(middleware Middleware) { b.mu.Lock() defer b.mu.Unlock() b.middleware = append(b.middleware, middleware) } Message Broker Integration (RabbitMQ) // internal/broker/rabbitmq.go package broker import ( "context" "fmt" "log" "github.com/rabbitmq/amqp091-go" "app/internal/events" ) // RabbitMQBroker wraps RabbitMQ for event publishing/consuming type RabbitMQBroker struct { conn *amqp091.Connection channel *amqp091.Channel } // NewRabbitMQBroker creates a new RabbitMQ broker func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) { conn, err := amqp091.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } channel, err := conn.Channel() if err != nil { return nil, fmt.Errorf("failed to open channel: %w", err) } return &RabbitMQBroker{ conn: conn, channel: channel, }, nil } // DeclareExchange declares an exchange func (b *RabbitMQBroker) DeclareExchange(name, kind string) error { return b.channel.ExchangeDeclare( name, // name kind, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) } // Publish publishes an event to RabbitMQ func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error { body, err := event.ToJSON() if err != nil { return fmt.Errorf("failed to serialize event: %w", err) } err = b.channel.PublishWithContext( ctx, exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp091.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp091.Persistent, MessageId: event.ID, Timestamp: event.Timestamp, }, ) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } // Subscribe subscribes to events from a queue func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error { // Declare queue queue, err := b.channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } // Bind queue to exchange err = b.channel.QueueBind( queue.Name, // queue name routingKey, // routing key exchange, // exchange false, nil, ) if err != nil { return fmt.Errorf("failed to bind queue: %w", err) } // Consume messages msgs, err := b.channel.Consume( queue.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("failed to register consumer: %w", err) } // Process messages go func() { for msg := range msgs { event, err := events.FromJSON(msg.Body) if err != nil { log.Printf("Failed to deserialize event: %v", err) msg.Nack(false, false) continue } if err := handler(context.Background(), event); err != nil { log.Printf("Handler error: %v", err) msg.Nack(false, true) // Requeue on error continue } msg.Ack(false) } }() return nil } // Close closes the broker connection func (b *RabbitMQBroker) Close() error { if err := b.channel.Close(); err != nil { return err } return b.conn.Close() } Order Service (Event Producer) // internal/services/order/service.go package order import ( "context" "fmt" "time" "app/internal/events" ) type Order struct { ID string `json:"id"` UserID string `json:"user_id"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) { order := &Order{ ID: generateOrderID(), UserID: userID, Total: total, Status: "pending", CreatedAt: time.Now(), } // Publish OrderPlaced event event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{ "order_id": order.ID, "user_id": order.UserID, "total": order.Total, "status": order.Status, }) if err := s.eventBus.Publish(ctx, event); err != nil { return nil, fmt.Errorf("failed to publish event: %w", err) } return order, nil } func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error { // Publish OrderConfirmed event event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{ "order_id": orderID, "status": "confirmed", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { // Publish OrderCancelled event event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{ "order_id": orderID, "status": "cancelled", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func generateOrderID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Email Service (Event Consumer) // internal/services/email/service.go package email import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { // SMTP client or email service client } func NewService() *Service { return &Service{} } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) userID := event.Payload["user_id"].(string) total := event.Payload["total"].(float64) log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID) // Send email if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf( "Your order %s for $%.2f has been placed successfully.", orderID, total, )); err != nil { return fmt.Errorf("failed to send email: %w", err) } return nil } // HandleOrderShipped handles order shipped events func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) trackingNumber := event.Payload["tracking_number"].(string) log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber) // Send email return nil } func (s *Service) sendEmail(recipient, subject, body string) error { // Implementation of email sending log.Printf("Email sent to %s: %s - %s", recipient, subject, body) return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderShipped, s.HandleOrderShipped) } Inventory Service (Event Consumer) // internal/services/inventory/service.go package inventory import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Updating inventory for order %s", orderID) // Update stock levels if err := s.updateStock(orderID); err != nil { return fmt.Errorf("failed to update stock: %w", err) } // Publish StockUpdated event stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{ "order_id": orderID, "stock_level": 100, "updated_at": time.Now(), }) if err := s.eventBus.Publish(ctx, stockEvent); err != nil { return fmt.Errorf("failed to publish stock event: %w", err) } return nil } // HandleOrderCancelled handles order cancelled events func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Restoring inventory for cancelled order %s", orderID) // Restore stock if err := s.restoreStock(orderID); err != nil { return fmt.Errorf("failed to restore stock: %w", err) } // Publish StockRestocked event stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{ "order_id": orderID, "restored": true, }) return s.eventBus.Publish(ctx, stockEvent) } func (s *Service) updateStock(orderID string) error { // Implementation of stock update return nil } func (s *Service) restoreStock(orderID string) error { // Implementation of stock restoration return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled) } Main Application // cmd/order-service/main.go package main import ( "context" "encoding/json" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gorilla/mux" "app/internal/broker" "app/internal/events" "app/internal/services/email" "app/internal/services/inventory" "app/internal/services/order" ) func main() { // Initialize event bus eventBus := events.NewEventBus() // Add logging middleware eventBus.Use(loggingMiddleware) // Initialize RabbitMQ broker rabbitURL := os.Getenv("RABBITMQ_URL") if rabbitURL == "" { rabbitURL = "amqp://guest:guest@localhost:5672/" } mqBroker, err := broker.NewRabbitMQBroker(rabbitURL) if err != nil { log.Fatal(err) } defer mqBroker.Close() // Declare exchange if err := mqBroker.DeclareExchange("events", "topic"); err != nil { log.Fatal(err) } // Initialize services orderService := order.NewService(eventBus) emailService := email.NewService() inventoryService := inventory.NewService(eventBus) // Register event handlers emailService.RegisterHandlers(eventBus) inventoryService.RegisterHandlers(eventBus) // Bridge event bus to RabbitMQ eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error { return mqBroker.Publish(ctx, "events", "order.placed", event) }) // Setup HTTP server router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Total float64 `json:"total"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) server := &http.Server{ Addr: ":8080", Handler: router, } // Start server go func() { log.Println("Server starting on :8080") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting down server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Fatal(err) } } func loggingMiddleware(next events.Handler) events.Handler { return func(ctx context.Context, event *events.Event) error { start := time.Now() log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID) err := next(ctx, event) log.Printf("Event %s handled in %v", event.ID, time.Since(start)) return err } } Best Practices Event Schema Versioning: Version your events to handle schema evolution Idempotent Handlers: Ensure handlers can process same event multiple times Error Handling: Implement retry logic and dead letter queues Event Ordering: Be aware that events may not arrive in order Event Store: Persist events for audit trail and replay Monitoring: Track event processing metrics and failures Testing: Test event handlers independently Documentation: Document all event types and their payloads Common Pitfalls Event Coupling: Events containing too much internal implementation details Lost Messages: Not ensuring message delivery guarantees Duplicate Processing: Not handling duplicate events No Versioning: Breaking changes to event schemas Synchronous Expectations: Expecting immediate consistency Overusing Events: Using events for simple request-response patterns Poor Error Handling: Not handling failures gracefully When to Use Event-Driven Architecture Use When: ...

    February 12, 2025 · 12 min · Rafiul Alam

    Apache Kafka in Go: Distributed Event Streaming at Scale

    {{series_nav(current_post=7)}} Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, and scalable data pipelines. Originally developed at LinkedIn, Kafka combines messaging, storage, and stream processing into a single platform that can handle trillions of events per day. What is Apache Kafka? Kafka is built around a distributed commit log that allows producers to publish records to topics, which are partitioned and replicated across a cluster of brokers. Consumers read from these topics using consumer groups, enabling horizontal scaling and fault tolerance. ...

    February 11, 2025 · 18 min · Rafiul Alam

    The 'System 2' LLM: How Models Learn to Reason (o1, R1)

    Introduction: Two Systems of Thinking In cognitive science, Nobel laureate Daniel Kahneman described human thinking as two distinct systems: System 1: Fast, automatic, intuitive (e.g., recognizing faces, reading emotions) System 2: Slow, deliberate, analytical (e.g., solving math problems, planning) Traditional LLMs operate almost entirely in System 1 mode: they generate responses instantly, token by token, with no deliberate planning or self-reflection. Ask GPT-4 a question, and it starts answering immediately-no visible “thinking time.” ...

    February 10, 2025 · 11 min · Rafiul Alam

    Deconstructing the Mixture-of-Experts (MoE) Architecture

    Introduction: The Scaling Dilemma Traditional transformer models face a fundamental trade-off: to increase model capacity, you must scale all parameters proportionally. If you want a smarter model, every single token must pass through every single parameter. This is dense activation, and it’s extremely expensive. Enter Mixture-of-Experts (MoE): a revolutionary architecture that achieves massive model capacity while keeping computational costs manageable through sparse activation. Models like GPT-4, Mixtral, and Switch Transformer use MoE to reach trillion-parameter scales while using only a fraction of those parameters per token. ...

    February 9, 2025 · 9 min · Rafiul Alam

    From Choreography to Durable Execution: Why Temporal Changes Everything

    The Choreography Problem: Hope-Driven Development For years, we’ve built distributed systems using choreography-Service A fires an event into the void, hoping Service B hears it. Service B processes it and fires another event, hoping Service C is listening. When something fails (and it will), we’re left scrambling through logs across multiple services, trying to piece together what happened. This is hope-driven development, and it’s fundamentally broken. Enter Temporal and the concept of Durable Execution-a paradigm shift that replaces hope with guarantees. ...

    February 8, 2025 · 11 min · Rafiul Alam

    NATS & JetStream in Go: Cloud-Native Messaging at Scale

    Backend Communication Current: NATS & JetStream WebRTC All Posts Apache Kafka What is NATS? NATS is a high-performance, cloud-native messaging system designed for microservices, IoT, and edge computing. It provides a simple yet powerful pub/sub model with subject-based addressing, making it ideal for building distributed systems that require fast, reliable communication. ...

    February 7, 2025 · 12 min · Rafiul Alam

    Visualizing LLM Embeddings: The Geometry of Meaning

    Introduction: Words as Numbers How do language models understand meaning? The answer lies in embeddings: representing words, sentences, and entire documents as vectors of numbers in high-dimensional space. In this space: Similar words cluster together Analogies emerge as geometric relationships Meaning becomes computable through vector arithmetic Let’s visualize this invisible geometry where meaning is distance. From Words to Vectors Traditional Approach: One-Hot Encoding graph TB A[Vocabulary:cat, dog, king, queen, apple] A --> B[cat = 1,0,0,0,0] A --> C[dog = 0,1,0,0,0] A --> D[king = 0,0,1,0,0] A --> E[queen = 0,0,0,1,0] A --> F[apple = 0,0,0,0,1] style B fill:#e74c3c style C fill:#e74c3c style D fill:#e74c3c style E fill:#e74c3c style F fill:#e74c3c Problem: No semantic relationship! ...

    February 6, 2025 · 11 min · Rafiul Alam