Saga Pattern in Go: Managing Distributed Transactions

    Go Architecture Patterns Series: ← Previous: Event Sourcing | Series Overview What is the Saga Pattern? The Saga pattern is a design pattern for managing distributed transactions across multiple microservices. Instead of using traditional ACID transactions, a saga breaks a transaction into a sequence of local transactions, each with a compensating transaction to undo changes if needed. Key Principles: Local Transactions: Each service executes its own local transaction Compensating Transactions: Undo operations for rollback Eventual Consistency: System reaches consistent state over time Choreography or Orchestration: Two approaches to coordinate sagas Idempotency: Operations can be safely retried Error Handling: Graceful handling of failures Architecture Overview graph TB subgraph "Orchestration-Based Saga" Orchestrator[Saga Orchestrator] S1[Service 1] S2[Service 2] S3[Service 3] S4[Service 4] Orchestrator -->|1. Execute| S1 Orchestrator -->|2. Execute| S2 Orchestrator -->|3. Execute| S3 Orchestrator -->|4. Execute| S4 S1 -.->|Failed| Orchestrator S2 -.->|Success| Orchestrator S3 -.->|Failed| Orchestrator Orchestrator -.->|Compensate| S2 Orchestrator -.->|Compensate| S1 end subgraph "Choreography-Based Saga" EventBus[Event Bus] OrderSvc[Order Service] PaymentSvc[Payment Service] InventorySvc[Inventory Service] ShippingSvc[Shipping Service] OrderSvc -->|OrderCreated| EventBus EventBus -->|OrderCreated| PaymentSvc PaymentSvc -->|PaymentProcessed| EventBus EventBus -->|PaymentProcessed| InventorySvc InventorySvc -->|InventoryReserved| EventBus EventBus -->|InventoryReserved| ShippingSvc PaymentSvc -.->|PaymentFailed| EventBus EventBus -.->|PaymentFailed| OrderSvc end style Orchestrator fill:#fff4e1 style EventBus fill:#e1f5ff style S1 fill:#e8f5e9 style OrderSvc fill:#f3e5f5 Saga Execution Flow sequenceDiagram participant Client participant Orchestrator participant OrderSvc participant PaymentSvc participant InventorySvc participant ShippingSvc Note over Client,ShippingSvc: Happy Path Client->>Orchestrator: Create Order Orchestrator->>OrderSvc: Create Order OrderSvc-->>Orchestrator: Order Created Orchestrator->>PaymentSvc: Process Payment PaymentSvc-->>Orchestrator: Payment Success Orchestrator->>InventorySvc: Reserve Inventory InventorySvc-->>Orchestrator: Inventory Reserved Orchestrator->>ShippingSvc: Schedule Shipping ShippingSvc-->>Orchestrator: Shipping Scheduled Orchestrator-->>Client: Order Completed Note over Client,ShippingSvc: Failure with Compensation Client->>Orchestrator: Create Order Orchestrator->>OrderSvc: Create Order OrderSvc-->>Orchestrator: Order Created Orchestrator->>PaymentSvc: Process Payment PaymentSvc-->>Orchestrator: Payment Success Orchestrator->>InventorySvc: Reserve Inventory InventorySvc-->>Orchestrator: Insufficient Stock Note over Orchestrator: Trigger Compensation Orchestrator->>PaymentSvc: Refund Payment PaymentSvc-->>Orchestrator: Payment Refunded Orchestrator->>OrderSvc: Cancel Order OrderSvc-->>Orchestrator: Order Cancelled Orchestrator-->>Client: Order Failed (Compensated) Real-World Use Cases E-commerce Order Processing: Order, payment, inventory, shipping coordination Travel Booking Systems: Flight, hotel, car rental bookings Financial Transactions: Multi-step payment processing Supply Chain Management: Order fulfillment across warehouses Banking Systems: Money transfers across accounts Healthcare Systems: Patient admission, bed assignment, billing Saga Pattern Implementation Project Structure saga-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── saga/ │ │ ├── orchestrator.go │ │ ├── step.go │ │ └── state.go │ ├── services/ │ │ ├── order/ │ │ ├── payment/ │ │ ├── inventory/ │ │ └── shipping/ │ └── models/ │ └── order.go └── go.mod Saga Step Definition // internal/saga/step.go package saga import ( "context" "fmt" ) // StepStatus represents the status of a saga step type StepStatus string const ( StepStatusPending StepStatus = "pending" StepStatusInProgress StepStatus = "in_progress" StepStatusCompleted StepStatus = "completed" StepStatusFailed StepStatus = "failed" StepStatusCompensated StepStatus = "compensated" ) // Step represents a single step in a saga type Step struct { Name string Execute ExecuteFunc Compensate CompensateFunc Status StepStatus RetryCount int MaxRetries int CompensationData interface{} } // ExecuteFunc is the function that executes a step type ExecuteFunc func(ctx context.Context, data interface{}) (interface{}, error) // CompensateFunc is the function that compensates a step type CompensateFunc func(ctx context.Context, data interface{}) error // NewStep creates a new saga step func NewStep(name string, execute ExecuteFunc, compensate CompensateFunc) *Step { return &Step{ Name: name, Execute: execute, Compensate: compensate, Status: StepStatusPending, MaxRetries: 3, } } // Run executes the step func (s *Step) Run(ctx context.Context, data interface{}) (interface{}, error) { s.Status = StepStatusInProgress result, err := s.Execute(ctx, data) if err != nil { s.Status = StepStatusFailed return nil, fmt.Errorf("step %s failed: %w", s.Name, err) } s.Status = StepStatusCompleted s.CompensationData = result return result, nil } // Compensate executes the compensation func (s *Step) Compensate(ctx context.Context) error { if s.Status != StepStatusCompleted { return nil // Only compensate completed steps } if err := s.Compensate(ctx, s.CompensationData); err != nil { return fmt.Errorf("compensation for %s failed: %w", s.Name, err) } s.Status = StepStatusCompensated return nil } Saga Orchestrator // internal/saga/orchestrator.go package saga import ( "context" "fmt" "log" ) // SagaStatus represents the status of a saga type SagaStatus string const ( SagaStatusPending SagaStatus = "pending" SagaStatusInProgress SagaStatus = "in_progress" SagaStatusCompleted SagaStatus = "completed" SagaStatusFailed SagaStatus = "failed" SagaStatusCompensated SagaStatus = "compensated" ) // Saga represents a distributed transaction type Saga struct { ID string Steps []*Step Status SagaStatus Data interface{} } // Orchestrator manages saga execution type Orchestrator struct { sagas map[string]*Saga } // NewOrchestrator creates a new saga orchestrator func NewOrchestrator() *Orchestrator { return &Orchestrator{ sagas: make(map[string]*Saga), } } // NewSaga creates a new saga func (o *Orchestrator) NewSaga(id string, steps []*Step) *Saga { saga := &Saga{ ID: id, Steps: steps, Status: SagaStatusPending, } o.sagas[id] = saga return saga } // Execute runs the saga func (o *Orchestrator) Execute(ctx context.Context, saga *Saga, initialData interface{}) error { saga.Status = SagaStatusInProgress saga.Data = initialData completedSteps := make([]*Step, 0) var currentData interface{} = initialData // Execute steps sequentially for _, step := range saga.Steps { log.Printf("Executing step: %s", step.Name) result, err := step.Run(ctx, currentData) if err != nil { log.Printf("Step %s failed: %v", step.Name, err) saga.Status = SagaStatusFailed // Trigger compensation if compErr := o.compensate(ctx, completedSteps); compErr != nil { log.Printf("Compensation failed: %v", compErr) return fmt.Errorf("saga failed and compensation failed: %w", compErr) } saga.Status = SagaStatusCompensated return fmt.Errorf("saga failed at step %s: %w", step.Name, err) } completedSteps = append(completedSteps, step) currentData = result log.Printf("Step %s completed successfully", step.Name) } saga.Status = SagaStatusCompleted log.Printf("Saga %s completed successfully", saga.ID) return nil } // compensate executes compensation for completed steps in reverse order func (o *Orchestrator) compensate(ctx context.Context, steps []*Step) error { log.Println("Starting compensation...") // Compensate in reverse order for i := len(steps) - 1; i >= 0; i-- { step := steps[i] log.Printf("Compensating step: %s", step.Name) if err := step.Compensate(ctx); err != nil { return fmt.Errorf("compensation failed for step %s: %w", step.Name, err) } log.Printf("Step %s compensated successfully", step.Name) } log.Println("Compensation completed") return nil } // GetSaga retrieves a saga by ID func (o *Orchestrator) GetSaga(id string) (*Saga, error) { saga, exists := o.sagas[id] if !exists { return nil, fmt.Errorf("saga not found: %s", id) } return saga, nil } Service Implementations // internal/services/order/service.go package order import ( "context" "fmt" "time" ) type Order struct { ID string UserID string Items []OrderItem Total float64 Status string CreatedAt time.Time } type OrderItem struct { ProductID string Quantity int Price float64 } type Service struct { orders map[string]*Order } func NewService() *Service { return &Service{ orders: make(map[string]*Order), } } func (s *Service) CreateOrder(ctx context.Context, userID string, items []OrderItem) (*Order, error) { var total float64 for _, item := range items { total += item.Price * float64(item.Quantity) } order := &Order{ ID: generateID(), UserID: userID, Items: items, Total: total, Status: "pending", CreatedAt: time.Now(), } s.orders[order.ID] = order fmt.Printf("Order created: %s\n", order.ID) return order, nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { order, exists := s.orders[orderID] if !exists { return fmt.Errorf("order not found") } order.Status = "cancelled" fmt.Printf("Order cancelled: %s\n", orderID) return nil } func generateID() string { return fmt.Sprintf("ord_%d", time.Now().UnixNano()) } // internal/services/payment/service.go package payment import ( "context" "fmt" "time" ) type Payment struct { ID string OrderID string Amount float64 Status string TransactionID string CreatedAt time.Time } type Service struct { payments map[string]*Payment } func NewService() *Service { return &Service{ payments: make(map[string]*Payment), } } func (s *Service) ProcessPayment(ctx context.Context, orderID string, amount float64) (*Payment, error) { // Simulate payment processing if amount > 10000 { return nil, fmt.Errorf("amount exceeds limit") } payment := &Payment{ ID: generateID(), OrderID: orderID, Amount: amount, Status: "completed", TransactionID: fmt.Sprintf("txn_%d", time.Now().UnixNano()), CreatedAt: time.Now(), } s.payments[payment.ID] = payment fmt.Printf("Payment processed: %s for order %s\n", payment.ID, orderID) return payment, nil } func (s *Service) RefundPayment(ctx context.Context, paymentID string) error { payment, exists := s.payments[paymentID] if !exists { return fmt.Errorf("payment not found") } payment.Status = "refunded" fmt.Printf("Payment refunded: %s\n", paymentID) return nil } func generateID() string { return fmt.Sprintf("pay_%d", time.Now().UnixNano()) } // internal/services/inventory/service.go package inventory import ( "context" "fmt" ) type Reservation struct { ID string ProductID string Quantity int OrderID string } type Service struct { stock map[string]int reservations map[string]*Reservation } func NewService() *Service { return &Service{ stock: map[string]int{ "product_1": 100, "product_2": 50, "product_3": 200, }, reservations: make(map[string]*Reservation), } } func (s *Service) ReserveInventory(ctx context.Context, orderID, productID string, quantity int) (*Reservation, error) { currentStock, exists := s.stock[productID] if !exists { return nil, fmt.Errorf("product not found") } if currentStock < quantity { return nil, fmt.Errorf("insufficient stock: have %d, need %d", currentStock, quantity) } s.stock[productID] = currentStock - quantity reservation := &Reservation{ ID: generateID(), ProductID: productID, Quantity: quantity, OrderID: orderID, } s.reservations[reservation.ID] = reservation fmt.Printf("Inventory reserved: %d units of %s for order %s\n", quantity, productID, orderID) return reservation, nil } func (s *Service) ReleaseReservation(ctx context.Context, reservationID string) error { reservation, exists := s.reservations[reservationID] if !exists { return fmt.Errorf("reservation not found") } s.stock[reservation.ProductID] += reservation.Quantity delete(s.reservations, reservationID) fmt.Printf("Reservation released: %s\n", reservationID) return nil } func generateID() string { return fmt.Sprintf("res_%d", time.Now().UnixNano()) } // internal/services/shipping/service.go package shipping import ( "context" "fmt" "time" ) type Shipment struct { ID string OrderID string TrackingNumber string Status string ScheduledDate time.Time } type Service struct { shipments map[string]*Shipment } func NewService() *Service { return &Service{ shipments: make(map[string]*Shipment), } } func (s *Service) ScheduleShipment(ctx context.Context, orderID string) (*Shipment, error) { shipment := &Shipment{ ID: generateID(), OrderID: orderID, TrackingNumber: fmt.Sprintf("TRK%d", time.Now().UnixNano()), Status: "scheduled", ScheduledDate: time.Now().Add(24 * time.Hour), } s.shipments[shipment.ID] = shipment fmt.Printf("Shipment scheduled: %s for order %s\n", shipment.ID, orderID) return shipment, nil } func (s *Service) CancelShipment(ctx context.Context, shipmentID string) error { shipment, exists := s.shipments[shipmentID] if !exists { return fmt.Errorf("shipment not found") } shipment.Status = "cancelled" fmt.Printf("Shipment cancelled: %s\n", shipmentID) return nil } func generateID() string { return fmt.Sprintf("shp_%d", time.Now().UnixNano()) } Saga Workflow Implementation // internal/saga/order_saga.go package saga import ( "context" "fmt" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) type OrderSagaData struct { UserID string Items []order.OrderItem OrderID string PaymentID string ReservationID string ShipmentID string } // CreateOrderSaga creates a saga for order processing func CreateOrderSaga( orderSvc *order.Service, paymentSvc *payment.Service, inventorySvc *inventory.Service, shippingSvc *shipping.Service, ) []*Step { // Step 1: Create Order createOrderStep := NewStep( "CreateOrder", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) order, err := orderSvc.CreateOrder(ctx, sagaData.UserID, sagaData.Items) if err != nil { return nil, err } sagaData.OrderID = order.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return orderSvc.CancelOrder(ctx, sagaData.OrderID) }, ) // Step 2: Process Payment processPaymentStep := NewStep( "ProcessPayment", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) var total float64 for _, item := range sagaData.Items { total += item.Price * float64(item.Quantity) } payment, err := paymentSvc.ProcessPayment(ctx, sagaData.OrderID, total) if err != nil { return nil, err } sagaData.PaymentID = payment.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return paymentSvc.RefundPayment(ctx, sagaData.PaymentID) }, ) // Step 3: Reserve Inventory reserveInventoryStep := NewStep( "ReserveInventory", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) // Reserve first item (simplified) if len(sagaData.Items) == 0 { return nil, fmt.Errorf("no items to reserve") } item := sagaData.Items[0] reservation, err := inventorySvc.ReserveInventory( ctx, sagaData.OrderID, item.ProductID, item.Quantity, ) if err != nil { return nil, err } sagaData.ReservationID = reservation.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return inventorySvc.ReleaseReservation(ctx, sagaData.ReservationID) }, ) // Step 4: Schedule Shipment scheduleShipmentStep := NewStep( "ScheduleShipment", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) shipment, err := shippingSvc.ScheduleShipment(ctx, sagaData.OrderID) if err != nil { return nil, err } sagaData.ShipmentID = shipment.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return shippingSvc.CancelShipment(ctx, sagaData.ShipmentID) }, ) return []*Step{ createOrderStep, processPaymentStep, reserveInventoryStep, scheduleShipmentStep, } } Main Application // cmd/api/main.go package main import ( "context" "encoding/json" "log" "net/http" "github.com/gorilla/mux" "app/internal/saga" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) func main() { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Initialize saga orchestrator orchestrator := saga.NewOrchestrator() router := mux.NewRouter() // Create order endpoint (triggers saga) router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Items []order.OrderItem `json:"items"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Create saga steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga(generateSagaID(), steps) // Execute saga sagaData := &saga.OrderSagaData{ UserID: req.UserID, Items: req.Items, } if err := orchestrator.Execute(context.Background(), orderSaga, sagaData); err != nil { http.Error(w, fmt.Sprintf("Saga failed: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(map[string]string{ "saga_id": orderSaga.ID, "order_id": sagaData.OrderID, "status": string(orderSaga.Status), }) }).Methods("POST") // Get saga status endpoint router.HandleFunc("/sagas/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) sagaID := vars["id"] saga, err := orchestrator.GetSaga(sagaID) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } response := map[string]interface{}{ "saga_id": saga.ID, "status": saga.Status, "steps": len(saga.Steps), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) }).Methods("GET") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) log.Println("Saga orchestrator server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } func generateSagaID() string { return fmt.Sprintf("saga_%d", time.Now().UnixNano()) } Testing Saga // internal/saga/order_saga_test.go package saga_test import ( "context" "testing" "app/internal/saga" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) func TestOrderSaga_HappyPath(t *testing.T) { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Create saga orchestrator := saga.NewOrchestrator() steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga("test-saga", steps) // Execute saga sagaData := &saga.OrderSagaData{ UserID: "user_123", Items: []order.OrderItem{ {ProductID: "product_1", Quantity: 2, Price: 100.0}, }, } err := orchestrator.Execute(context.Background(), orderSaga, sagaData) if err != nil { t.Fatalf("Saga execution failed: %v", err) } if orderSaga.Status != saga.SagaStatusCompleted { t.Errorf("Expected status %s, got %s", saga.SagaStatusCompleted, orderSaga.Status) } } func TestOrderSaga_PaymentFailure(t *testing.T) { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Create saga orchestrator := saga.NewOrchestrator() steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga("test-saga-fail", steps) // Execute saga with amount that will fail sagaData := &saga.OrderSagaData{ UserID: "user_123", Items: []order.OrderItem{ {ProductID: "product_1", Quantity: 2, Price: 10000.0}, // Exceeds limit }, } err := orchestrator.Execute(context.Background(), orderSaga, sagaData) if err == nil { t.Fatal("Expected saga to fail, but it succeeded") } if orderSaga.Status != saga.SagaStatusCompensated { t.Errorf("Expected status %s, got %s", saga.SagaStatusCompensated, orderSaga.Status) } } Best Practices Idempotency: Ensure all operations can be safely retried Compensating Transactions: Design proper rollback logic Timeout Handling: Set timeouts for each saga step State Persistence: Store saga state for recovery Monitoring: Track saga execution and failures Error Handling: Handle partial failures gracefully Testing: Test both success and failure scenarios Documentation: Document saga workflows clearly Common Pitfalls Missing Compensations: Not implementing proper rollback Non-Idempotent Operations: Operations that can’t be retried safely Circular Dependencies: Services depending on each other in compensation Long-Running Sagas: Sagas that hold resources for too long No State Persistence: Losing saga state on failure Ignoring Partial Failures: Not handling scenarios where compensation fails Over-Compensation: Compensating steps that were never executed When to Use Saga Pattern Use When: ...

    January 24, 2025 · 13 min · Rafiul Alam

    Event Sourcing in Go: Building Audit-Trail Applications

    Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern → What is Event Sourcing? Event Sourcing is an architectural pattern where state changes are stored as a sequence of immutable events rather than updating records in place. The current state is derived by replaying all events from the beginning. Key Principles: Event Store: All state changes stored as events Immutable Events: Events are never modified or deleted Event Replay: Current state reconstructed by replaying events Complete Audit Trail: Every change is recorded with full context Temporal Queries: Query state at any point in time Event Versioning: Events can evolve while maintaining history Architecture Overview graph TB subgraph "Command Side" Command[Command] Aggregate[Aggregate Root] EventStore[(Event Store)] end subgraph "Event Processing" EventBus[Event Bus] Projector[Event Projectors] end subgraph "Query Side" ReadModel1[(Current State View)] ReadModel2[(Analytics View)] ReadModel3[(Audit Log View)] end subgraph "Time Travel" Snapshot[(Snapshots)] Replay[Event Replay] end Command --> Aggregate Aggregate -->|Load Events| EventStore Aggregate -->|Append Events| EventStore EventStore -->|Publish| EventBus EventBus --> Projector Projector --> ReadModel1 Projector --> ReadModel2 Projector --> ReadModel3 EventStore --> Snapshot Snapshot --> Replay Replay --> Aggregate style Aggregate fill:#fff4e1 style EventStore fill:#e1f5ff style Projector fill:#e8f5e9 style ReadModel1 fill:#f3e5f5 Event Sourcing Flow sequenceDiagram participant Client participant CommandHandler participant Aggregate participant EventStore participant EventBus participant Projection Client->>CommandHandler: Execute Command CommandHandler->>EventStore: Load Events for Aggregate EventStore-->>CommandHandler: Event Stream CommandHandler->>Aggregate: Replay Events Aggregate->>Aggregate: Rebuild State CommandHandler->>Aggregate: Execute Command Aggregate->>Aggregate: Validate Business Rules Aggregate->>Aggregate: Generate Events CommandHandler->>EventStore: Append New Events EventStore-->>CommandHandler: Events Persisted EventStore->>EventBus: Publish Events EventBus->>Projection: Project Events Projection->>Projection: Update Read Model CommandHandler-->>Client: Command Accepted Real-World Use Cases Banking Systems: Complete transaction history and audit trails E-commerce: Order lifecycle tracking and returns processing Healthcare: Patient medical history and treatment records Trading Systems: Trade execution history and reconciliation Gaming: Player action history and replay functionality Workflow Systems: Process execution history and debugging Event Sourcing Implementation Project Structure event-sourcing-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── domain/ │ │ ├── account.go │ │ └── events.go │ ├── eventstore/ │ │ ├── store.go │ │ ├── postgres_store.go │ │ └── snapshot.go │ ├── commands/ │ │ └── handlers.go │ ├── projections/ │ │ └── account_projection.go │ └── queries/ │ └── handlers.go └── go.mod Domain Events // internal/domain/events.go package domain import ( "encoding/json" "time" ) // Event represents a domain event type Event interface { EventType() string AggregateID() string EventVersion() int Timestamp() time.Time } // BaseEvent provides common event fields type BaseEvent struct { Type string `json:"type"` AggrID string `json:"aggregate_id"` Version int `json:"version"` OccurredAt time.Time `json:"occurred_at"` Metadata map[string]string `json:"metadata"` } func (e BaseEvent) EventType() string { return e.Type } func (e BaseEvent) AggregateID() string { return e.AggrID } func (e BaseEvent) EventVersion() int { return e.Version } func (e BaseEvent) Timestamp() time.Time { return e.OccurredAt } // Account Events type AccountCreatedEvent struct { BaseEvent AccountID string `json:"account_id"` Owner string `json:"owner"` Currency string `json:"currency"` } type MoneyDepositedEvent struct { BaseEvent AccountID string `json:"account_id"` Amount float64 `json:"amount"` Balance float64 `json:"balance"` } type MoneyWithdrawnEvent struct { BaseEvent AccountID string `json:"account_id"` Amount float64 `json:"amount"` Balance float64 `json:"balance"` } type AccountClosedEvent struct { BaseEvent AccountID string `json:"account_id"` Reason string `json:"reason"` } // EventEnvelope wraps events for storage type EventEnvelope struct { EventID string `json:"event_id"` EventType string `json:"event_type"` AggregateID string `json:"aggregate_id"` Version int `json:"version"` Data json.RawMessage `json:"data"` Metadata map[string]string `json:"metadata"` CreatedAt time.Time `json:"created_at"` } // Serialize serializes event to envelope func SerializeEvent(event Event) (*EventEnvelope, error) { data, err := json.Marshal(event) if err != nil { return nil, err } return &EventEnvelope{ EventID: generateEventID(), EventType: event.EventType(), AggregateID: event.AggregateID(), Version: event.EventVersion(), Data: data, CreatedAt: event.Timestamp(), }, nil } // DeserializeEvent deserializes envelope to event func DeserializeEvent(envelope *EventEnvelope) (Event, error) { var event Event switch envelope.EventType { case "AccountCreated": event = &AccountCreatedEvent{} case "MoneyDeposited": event = &MoneyDepositedEvent{} case "MoneyWithdrawn": event = &MoneyWithdrawnEvent{} case "AccountClosed": event = &AccountClosedEvent{} default: return nil, fmt.Errorf("unknown event type: %s", envelope.EventType) } if err := json.Unmarshal(envelope.Data, event); err != nil { return nil, err } return event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } Aggregate Root // internal/domain/account.go package domain import ( "errors" "fmt" ) var ( ErrInsufficientFunds = errors.New("insufficient funds") ErrAccountClosed = errors.New("account is closed") ErrInvalidAmount = errors.New("invalid amount") ) // Account is an aggregate root that uses event sourcing type Account struct { id string owner string currency string balance float64 isClosed bool version int uncommittedEvents []Event } // NewAccount creates a new account func NewAccount(id, owner, currency string) (*Account, error) { if id == "" || owner == "" || currency == "" { return nil, errors.New("invalid account parameters") } account := &Account{ uncommittedEvents: make([]Event, 0), } // Raise domain event event := AccountCreatedEvent{ BaseEvent: BaseEvent{ Type: "AccountCreated", AggrID: id, Version: 1, OccurredAt: time.Now(), }, AccountID: id, Owner: owner, Currency: currency, } account.raiseEvent(&event) return account, nil } // LoadFromHistory rebuilds account state from events func LoadFromHistory(events []Event) (*Account, error) { if len(events) == 0 { return nil, errors.New("no events to load") } account := &Account{ uncommittedEvents: make([]Event, 0), } for _, event := range events { if err := account.apply(event); err != nil { return nil, err } account.version = event.EventVersion() } return account, nil } // Deposit adds money to account func (a *Account) Deposit(amount float64) error { if a.isClosed { return ErrAccountClosed } if amount <= 0 { return ErrInvalidAmount } newBalance := a.balance + amount event := MoneyDepositedEvent{ BaseEvent: BaseEvent{ Type: "MoneyDeposited", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Amount: amount, Balance: newBalance, } a.raiseEvent(&event) return nil } // Withdraw removes money from account func (a *Account) Withdraw(amount float64) error { if a.isClosed { return ErrAccountClosed } if amount <= 0 { return ErrInvalidAmount } if a.balance < amount { return ErrInsufficientFunds } newBalance := a.balance - amount event := MoneyWithdrawnEvent{ BaseEvent: BaseEvent{ Type: "MoneyWithdrawn", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Amount: amount, Balance: newBalance, } a.raiseEvent(&event) return nil } // Close closes the account func (a *Account) Close(reason string) error { if a.isClosed { return ErrAccountClosed } event := AccountClosedEvent{ BaseEvent: BaseEvent{ Type: "AccountClosed", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Reason: reason, } a.raiseEvent(&event) return nil } // GetUncommittedEvents returns uncommitted events func (a *Account) GetUncommittedEvents() []Event { return a.uncommittedEvents } // MarkEventsAsCommitted clears uncommitted events func (a *Account) MarkEventsAsCommitted() { a.uncommittedEvents = make([]Event, 0) } // raiseEvent adds event to uncommitted and applies it func (a *Account) raiseEvent(event Event) { a.uncommittedEvents = append(a.uncommittedEvents, event) a.apply(event) } // apply applies an event to the aggregate state func (a *Account) apply(event Event) error { switch e := event.(type) { case *AccountCreatedEvent: a.id = e.AccountID a.owner = e.Owner a.currency = e.Currency a.balance = 0 a.isClosed = false case *MoneyDepositedEvent: a.balance = e.Balance case *MoneyWithdrawnEvent: a.balance = e.Balance case *AccountClosedEvent: a.isClosed = true default: return fmt.Errorf("unknown event type: %T", event) } return nil } // Getters func (a *Account) ID() string { return a.id } func (a *Account) Balance() float64 { return a.balance } func (a *Account) Version() int { return a.version } func (a *Account) IsClosed() bool { return a.isClosed } Event Store // internal/eventstore/store.go package eventstore import ( "context" "app/internal/domain" ) // EventStore interface type EventStore interface { SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) GetAllEvents(ctx context.Context) ([]domain.Event, error) } // internal/eventstore/postgres_store.go package eventstore import ( "context" "database/sql" "fmt" "app/internal/domain" ) type PostgresEventStore struct { db *sql.DB } func NewPostgresEventStore(db *sql.DB) *PostgresEventStore { return &PostgresEventStore{db: db} } // SaveEvents appends events to the event store with optimistic concurrency func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Check current version for optimistic locking var currentVersion int err = tx.QueryRowContext(ctx, "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", aggregateID, ).Scan(&currentVersion) if err != nil && err != sql.ErrNoRows { return err } if currentVersion != expectedVersion { return fmt.Errorf("concurrency conflict: expected version %d, got %d", expectedVersion, currentVersion) } // Insert events query := ` INSERT INTO events (event_id, event_type, aggregate_id, version, data, metadata, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ` for _, event := range events { envelope, err := domain.SerializeEvent(event) if err != nil { return err } _, err = tx.ExecContext(ctx, query, envelope.EventID, envelope.EventType, envelope.AggregateID, envelope.Version, envelope.Data, envelope.Metadata, envelope.CreatedAt, ) if err != nil { return err } } return tx.Commit() } // GetEvents retrieves all events for an aggregate func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events WHERE aggregate_id = $1 ORDER BY version ASC ` rows, err := s.db.QueryContext(ctx, query, aggregateID) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } // GetEventsSince retrieves events after a specific version func (s *PostgresEventStore) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC ` rows, err := s.db.QueryContext(ctx, query, aggregateID, version) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } // GetAllEvents retrieves all events from the store func (s *PostgresEventStore) GetAllEvents(ctx context.Context) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events ORDER BY created_at ASC ` rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } Snapshots for Performance // internal/eventstore/snapshot.go package eventstore import ( "context" "database/sql" "encoding/json" "time" ) // Snapshot represents a point-in-time state type Snapshot struct { AggregateID string Version int State json.RawMessage CreatedAt time.Time } type SnapshotStore struct { db *sql.DB } func NewSnapshotStore(db *sql.DB) *SnapshotStore { return &SnapshotStore{db: db} } // SaveSnapshot saves a snapshot func (s *SnapshotStore) SaveSnapshot(ctx context.Context, snapshot *Snapshot) error { query := ` INSERT INTO snapshots (aggregate_id, version, state, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, state = $3, created_at = $4 ` _, err := s.db.ExecContext(ctx, query, snapshot.AggregateID, snapshot.Version, snapshot.State, snapshot.CreatedAt, ) return err } // GetSnapshot retrieves the latest snapshot func (s *SnapshotStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) { query := ` SELECT aggregate_id, version, state, created_at FROM snapshots WHERE aggregate_id = $1 ` snapshot := &Snapshot{} err := s.db.QueryRowContext(ctx, query, aggregateID).Scan( &snapshot.AggregateID, &snapshot.Version, &snapshot.State, &snapshot.CreatedAt, ) if err == sql.ErrNoRows { return nil, nil } return snapshot, err } Command Handlers // internal/commands/handlers.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/eventstore" ) // CreateAccountCommand type CreateAccountCommand struct { AccountID string Owner string Currency string } type CreateAccountHandler struct { eventStore eventstore.EventStore } func NewCreateAccountHandler(store eventstore.EventStore) *CreateAccountHandler { return &CreateAccountHandler{eventStore: store} } func (h *CreateAccountHandler) Handle(ctx context.Context, cmd *CreateAccountCommand) error { // Create new aggregate account, err := domain.NewAccount(cmd.AccountID, cmd.Owner, cmd.Currency) if err != nil { return err } // Save events events := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, events, 0); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } // DepositMoneyCommand type DepositMoneyCommand struct { AccountID string Amount float64 } type DepositMoneyHandler struct { eventStore eventstore.EventStore } func NewDepositMoneyHandler(store eventstore.EventStore) *DepositMoneyHandler { return &DepositMoneyHandler{eventStore: store} } func (h *DepositMoneyHandler) Handle(ctx context.Context, cmd *DepositMoneyCommand) error { // Load aggregate from events events, err := h.eventStore.GetEvents(ctx, cmd.AccountID) if err != nil { return err } account, err := domain.LoadFromHistory(events) if err != nil { return err } // Execute command if err := account.Deposit(cmd.Amount); err != nil { return err } // Save new events newEvents := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } // WithdrawMoneyCommand type WithdrawMoneyCommand struct { AccountID string Amount float64 } type WithdrawMoneyHandler struct { eventStore eventstore.EventStore } func NewWithdrawMoneyHandler(store eventstore.EventStore) *WithdrawMoneyHandler { return &WithdrawMoneyHandler{eventStore: store} } func (h *WithdrawMoneyHandler) Handle(ctx context.Context, cmd *WithdrawMoneyCommand) error { events, err := h.eventStore.GetEvents(ctx, cmd.AccountID) if err != nil { return err } account, err := domain.LoadFromHistory(events) if err != nil { return err } if err := account.Withdraw(cmd.Amount); err != nil { return err } newEvents := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } Projections // internal/projections/account_projection.go package projections import ( "context" "database/sql" "app/internal/domain" ) // AccountReadModel represents the current state type AccountReadModel struct { ID string Owner string Currency string Balance float64 IsClosed bool TotalDeposits float64 TotalWithdrawals float64 TransactionCount int } type AccountProjection struct { db *sql.DB } func NewAccountProjection(db *sql.DB) *AccountProjection { return &AccountProjection{db: db} } // Project processes events and updates read model func (p *AccountProjection) Project(ctx context.Context, event domain.Event) error { switch e := event.(type) { case *domain.AccountCreatedEvent: return p.handleAccountCreated(ctx, e) case *domain.MoneyDepositedEvent: return p.handleMoneyDeposited(ctx, e) case *domain.MoneyWithdrawnEvent: return p.handleMoneyWithdrawn(ctx, e) case *domain.AccountClosedEvent: return p.handleAccountClosed(ctx, e) } return nil } func (p *AccountProjection) handleAccountCreated(ctx context.Context, event *domain.AccountCreatedEvent) error { query := ` INSERT INTO account_read_model (id, owner, currency, balance, is_closed, total_deposits, total_withdrawals, transaction_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Owner, event.Currency, 0, false, 0, 0, 0) return err } func (p *AccountProjection) handleMoneyDeposited(ctx context.Context, event *domain.MoneyDepositedEvent) error { query := ` UPDATE account_read_model SET balance = $2, total_deposits = total_deposits + $3, transaction_count = transaction_count + 1 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount) return err } func (p *AccountProjection) handleMoneyWithdrawn(ctx context.Context, event *domain.MoneyWithdrawnEvent) error { query := ` UPDATE account_read_model SET balance = $2, total_withdrawals = total_withdrawals + $3, transaction_count = transaction_count + 1 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount) return err } func (p *AccountProjection) handleAccountClosed(ctx context.Context, event *domain.AccountClosedEvent) error { query := `UPDATE account_read_model SET is_closed = true WHERE id = $1` _, err := p.db.ExecContext(ctx, query, event.AccountID) return err } // RebuildProjections rebuilds all projections from events func (p *AccountProjection) RebuildProjections(ctx context.Context, eventStore eventstore.EventStore) error { // Clear existing projections if _, err := p.db.ExecContext(ctx, "TRUNCATE account_read_model"); err != nil { return err } // Replay all events events, err := eventStore.GetAllEvents(ctx) if err != nil { return err } for _, event := range events { if err := p.Project(ctx, event); err != nil { return err } } return nil } Main Application // cmd/api/main.go package main import ( "context" "database/sql" "encoding/json" "log" "net/http" "github.com/gorilla/mux" _ "github.com/lib/pq" "app/internal/commands" "app/internal/eventstore" "app/internal/projections" ) func main() { db, err := sql.Open("postgres", "postgres://user:pass@localhost/eventsourcing?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize stores eventStore := eventstore.NewPostgresEventStore(db) projection := projections.NewAccountProjection(db) // Initialize command handlers createAccountHandler := commands.NewCreateAccountHandler(eventStore) depositHandler := commands.NewDepositMoneyHandler(eventStore) withdrawHandler := commands.NewWithdrawMoneyHandler(eventStore) router := mux.NewRouter() // Command endpoints router.HandleFunc("/accounts", func(w http.ResponseWriter, r *http.Request) { var cmd commands.CreateAccountCommand if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := createAccountHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusCreated) }).Methods("POST") router.HandleFunc("/accounts/{id}/deposit", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) var req struct { Amount float64 `json:"amount"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } cmd := commands.DepositMoneyCommand{ AccountID: vars["id"], Amount: req.Amount, } if err := depositHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) }).Methods("POST") // Event replay endpoint router.HandleFunc("/admin/rebuild-projections", func(w http.ResponseWriter, r *http.Request) { if err := projection.RebuildProjections(r.Context(), eventStore); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write([]byte("Projections rebuilt successfully")) }).Methods("POST") log.Println("Server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } Best Practices Immutable Events: Never modify or delete events Event Versioning: Version events for schema evolution Snapshots: Use snapshots to optimize replay performance Idempotency: Ensure event handlers are idempotent Event Schema: Keep events small and focused Testing: Test event replay and projections Monitoring: Track event store size and replay performance Documentation: Document all event types and their purpose Common Pitfalls Mutable Events: Modifying or deleting events Missing Events: Not capturing all state changes Large Events: Storing too much data in events No Snapshots: Performance degradation from replaying all events Tight Coupling: Events containing implementation details No Versioning: Breaking changes to event schemas Synchronous Projections: Slowing down command processing When to Use Event Sourcing Use When: ...

    January 23, 2025 · 15 min · Rafiul Alam

    CQRS Pattern in Go: Separating Reads and Writes

    Go Architecture Patterns Series: ← Previous: Event-Driven Architecture | Series Overview | Next: Event Sourcing → What is CQRS Pattern? Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands). This separation allows you to optimize each side independently for better performance, scalability, and maintainability. Key Principles: Command Model: Handles writes, updates, and business logic validation Query Model: Handles reads, optimized for data retrieval Separate Data Stores: Optional use of different databases for reads and writes Eventual Consistency: Query model may lag behind command model Single Responsibility: Each model focused on its specific task Independent Scaling: Scale read and write operations separately Architecture Overview graph TB subgraph "Client Layer" Client[Client Application] end subgraph "Command Side" CommandAPI[Command API] CommandHandler[Command Handlers] CommandModel[Command Model] WriteDB[(Write Database)] end subgraph "Query Side" QueryAPI[Query API] QueryHandler[Query Handlers] QueryModel[Read Model] ReadDB[(Read Database)] end subgraph "Synchronization" EventBus[Event Bus] Projections[Projections/Denormalizers] end Client -->|Commands| CommandAPI Client -->|Queries| QueryAPI CommandAPI --> CommandHandler CommandHandler --> CommandModel CommandModel --> WriteDB QueryAPI --> QueryHandler QueryHandler --> QueryModel QueryModel --> ReadDB CommandHandler -->|Events| EventBus EventBus --> Projections Projections --> ReadDB style CommandAPI fill:#fff4e1 style QueryAPI fill:#e1f5ff style EventBus fill:#e8f5e9 style Projections fill:#f3e5f5 CQRS Flow Pattern sequenceDiagram participant Client participant CommandAPI participant CommandHandler participant WriteDB participant EventBus participant Projection participant ReadDB participant QueryAPI Note over Client,QueryAPI: Write Operation Client->>CommandAPI: Create Order Command CommandAPI->>CommandHandler: Handle Command CommandHandler->>CommandHandler: Validate Business Rules CommandHandler->>WriteDB: Save Order WriteDB-->>CommandHandler: Success CommandHandler->>EventBus: Publish OrderCreated Event CommandHandler-->>Client: Command Accepted (202) Note over EventBus,ReadDB: Background Synchronization EventBus->>Projection: OrderCreated Event Projection->>Projection: Build Read Model Projection->>ReadDB: Update Denormalized View Note over Client,QueryAPI: Read Operation Client->>QueryAPI: Get Order Query QueryAPI->>ReadDB: Query Read Model ReadDB-->>QueryAPI: Order Data QueryAPI-->>Client: Order Details (200) Real-World Use Cases E-commerce Platforms: Separate product catalog writes from fast product searches Banking Systems: Transaction processing vs. account balance queries Analytics Platforms: Data ingestion vs. complex reporting queries Social Media: Post creation vs. feed generation Inventory Management: Stock updates vs. availability queries Audit Systems: Event recording vs. audit log queries CQRS Pattern Implementation Project Structure cqrs-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── commands/ │ │ ├── create_order.go │ │ ├── update_order.go │ │ └── handler.go │ ├── queries/ │ │ ├── get_order.go │ │ ├── list_orders.go │ │ └── handler.go │ ├── domain/ │ │ └── order.go │ ├── write/ │ │ ├── repository.go │ │ └── postgres_repository.go │ ├── read/ │ │ ├── repository.go │ │ └── postgres_repository.go │ ├── projections/ │ │ └── order_projection.go │ └── events/ │ └── events.go └── go.mod Domain Model // internal/domain/order.go package domain import ( "errors" "time" ) type OrderID string type OrderStatus string const ( OrderStatusPending OrderStatus = "pending" OrderStatusConfirmed OrderStatus = "confirmed" OrderStatusShipped OrderStatus = "shipped" OrderStatusDelivered OrderStatus = "delivered" OrderStatusCancelled OrderStatus = "cancelled" ) type OrderItem struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` Price float64 `json:"price"` } // Write Model - Rich domain model with business logic type Order struct { ID OrderID UserID string Items []OrderItem Total float64 Status OrderStatus CreatedAt time.Time UpdatedAt time.Time Version int // For optimistic locking } var ( ErrInvalidOrder = errors.New("invalid order") ErrOrderNotFound = errors.New("order not found") ErrInvalidStatus = errors.New("invalid status transition") ErrConcurrentUpdate = errors.New("concurrent update detected") ) // NewOrder creates a new order with validation func NewOrder(userID string, items []OrderItem) (*Order, error) { if userID == "" { return nil, ErrInvalidOrder } if len(items) == 0 { return nil, ErrInvalidOrder } var total float64 for _, item := range items { if item.Quantity <= 0 || item.Price < 0 { return nil, ErrInvalidOrder } total += item.Price * float64(item.Quantity) } return &Order{ ID: OrderID(generateID()), UserID: userID, Items: items, Total: total, Status: OrderStatusPending, CreatedAt: time.Now(), UpdatedAt: time.Now(), Version: 1, }, nil } // Confirm transitions order to confirmed status func (o *Order) Confirm() error { if o.Status != OrderStatusPending { return ErrInvalidStatus } o.Status = OrderStatusConfirmed o.UpdatedAt = time.Now() o.Version++ return nil } // Ship transitions order to shipped status func (o *Order) Ship() error { if o.Status != OrderStatusConfirmed { return ErrInvalidStatus } o.Status = OrderStatusShipped o.UpdatedAt = time.Now() o.Version++ return nil } // Cancel cancels the order func (o *Order) Cancel() error { if o.Status == OrderStatusDelivered || o.Status == OrderStatusCancelled { return ErrInvalidStatus } o.Status = OrderStatusCancelled o.UpdatedAt = time.Now() o.Version++ return nil } func generateID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Commands // internal/commands/create_order.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/events" ) // CreateOrderCommand represents a command to create an order type CreateOrderCommand struct { UserID string `json:"user_id"` Items []domain.OrderItem `json:"items"` } // CreateOrderHandler handles order creation type CreateOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewCreateOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CreateOrderHandler { return &CreateOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *CreateOrderHandler) Handle(ctx context.Context, cmd *CreateOrderCommand) (*domain.Order, error) { // Create domain model with business logic validation order, err := domain.NewOrder(cmd.UserID, cmd.Items) if err != nil { return nil, fmt.Errorf("invalid command: %w", err) } // Persist to write store if err := h.writeRepo.Save(ctx, order); err != nil { return nil, fmt.Errorf("failed to save order: %w", err) } // Publish event for read model synchronization event := events.OrderCreatedEvent{ OrderID: string(order.ID), UserID: order.UserID, Items: order.Items, Total: order.Total, Status: string(order.Status), CreatedAt: order.CreatedAt, } if err := h.eventBus.Publish(ctx, "order.created", event); err != nil { // Log error but don't fail the command - eventual consistency fmt.Printf("Failed to publish event: %v\n", err) } return order, nil } // internal/commands/update_order.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/events" ) // ConfirmOrderCommand confirms an order type ConfirmOrderCommand struct { OrderID string `json:"order_id"` } type ConfirmOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewConfirmOrderHandler(repo domain.WriteRepository, bus events.EventBus) *ConfirmOrderHandler { return &ConfirmOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd *ConfirmOrderCommand) error { // Load from write store order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID)) if err != nil { return fmt.Errorf("order not found: %w", err) } // Execute domain logic if err := order.Confirm(); err != nil { return fmt.Errorf("cannot confirm order: %w", err) } // Save with optimistic locking if err := h.writeRepo.Save(ctx, order); err != nil { return fmt.Errorf("failed to save order: %w", err) } // Publish event event := events.OrderConfirmedEvent{ OrderID: string(order.ID), Status: string(order.Status), ConfirmedAt: order.UpdatedAt, } if err := h.eventBus.Publish(ctx, "order.confirmed", event); err != nil { fmt.Printf("Failed to publish event: %v\n", err) } return nil } // CancelOrderCommand cancels an order type CancelOrderCommand struct { OrderID string `json:"order_id"` Reason string `json:"reason"` } type CancelOrderHandler struct { writeRepo domain.WriteRepository eventBus events.EventBus } func NewCancelOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CancelOrderHandler { return &CancelOrderHandler{ writeRepo: repo, eventBus: bus, } } func (h *CancelOrderHandler) Handle(ctx context.Context, cmd *CancelOrderCommand) error { order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID)) if err != nil { return fmt.Errorf("order not found: %w", err) } if err := order.Cancel(); err != nil { return fmt.Errorf("cannot cancel order: %w", err) } if err := h.writeRepo.Save(ctx, order); err != nil { return fmt.Errorf("failed to save order: %w", err) } event := events.OrderCancelledEvent{ OrderID: string(order.ID), Status: string(order.Status), Reason: cmd.Reason, CancelledAt: order.UpdatedAt, } if err := h.eventBus.Publish(ctx, "order.cancelled", event); err != nil { fmt.Printf("Failed to publish event: %v\n", err) } return nil } Queries // internal/queries/get_order.go package queries import ( "context" "time" ) // OrderReadModel - Optimized for reading, denormalized type OrderReadModel struct { ID string `json:"id"` UserID string `json:"user_id"` UserName string `json:"user_name"` // Denormalized UserEmail string `json:"user_email"` // Denormalized Items []OrderItemReadModel `json:"items"` ItemCount int `json:"item_count"` // Computed Total float64 `json:"total"` Status string `json:"status"` StatusDisplay string `json:"status_display"` // Formatted for display CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type OrderItemReadModel struct { ProductID string `json:"product_id"` ProductName string `json:"product_name"` // Denormalized Quantity int `json:"quantity"` Price float64 `json:"price"` Subtotal float64 `json:"subtotal"` // Computed } // GetOrderQuery retrieves a single order type GetOrderQuery struct { OrderID string `json:"order_id"` } type GetOrderHandler struct { readRepo ReadRepository } func NewGetOrderHandler(repo ReadRepository) *GetOrderHandler { return &GetOrderHandler{readRepo: repo} } func (h *GetOrderHandler) Handle(ctx context.Context, query *GetOrderQuery) (*OrderReadModel, error) { return h.readRepo.GetByID(ctx, query.OrderID) } // internal/queries/list_orders.go package queries import ( "context" ) // ListOrdersQuery lists orders with filters type ListOrdersQuery struct { UserID string `json:"user_id"` Status string `json:"status"` Page int `json:"page"` PageSize int `json:"page_size"` } type OrderListResult struct { Orders []*OrderReadModel `json:"orders"` Total int `json:"total"` Page int `json:"page"` PageSize int `json:"page_size"` TotalPages int `json:"total_pages"` } type ListOrdersHandler struct { readRepo ReadRepository } func NewListOrdersHandler(repo ReadRepository) *ListOrdersHandler { return &ListOrdersHandler{readRepo: repo} } func (h *ListOrdersHandler) Handle(ctx context.Context, query *ListOrdersQuery) (*OrderListResult, error) { // Set defaults if query.Page < 1 { query.Page = 1 } if query.PageSize < 1 || query.PageSize > 100 { query.PageSize = 20 } offset := (query.Page - 1) * query.PageSize orders, total, err := h.readRepo.List(ctx, query.UserID, query.Status, query.PageSize, offset) if err != nil { return nil, err } totalPages := (total + query.PageSize - 1) / query.PageSize return &OrderListResult{ Orders: orders, Total: total, Page: query.Page, PageSize: query.PageSize, TotalPages: totalPages, }, nil } // ReadRepository interface for query side type ReadRepository interface { GetByID(ctx context.Context, id string) (*OrderReadModel, error) List(ctx context.Context, userID, status string, limit, offset int) ([]*OrderReadModel, int, error) } Projections // internal/projections/order_projection.go package projections import ( "context" "database/sql" "fmt" "app/internal/events" "app/internal/queries" ) // OrderProjection builds read models from events type OrderProjection struct { db *sql.DB } func NewOrderProjection(db *sql.DB) *OrderProjection { return &OrderProjection{db: db} } // HandleOrderCreated projects OrderCreated event to read model func (p *OrderProjection) HandleOrderCreated(ctx context.Context, event events.OrderCreatedEvent) error { tx, err := p.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Insert denormalized order query := ` INSERT INTO order_read_model (id, user_id, user_name, user_email, total, status, status_display, item_count, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ` // Fetch user details for denormalization userName, userEmail := p.getUserDetails(ctx, event.UserID) _, err = tx.ExecContext(ctx, query, event.OrderID, event.UserID, userName, userEmail, event.Total, event.Status, formatStatus(event.Status), len(event.Items), event.CreatedAt, event.CreatedAt, ) if err != nil { return fmt.Errorf("failed to insert order: %w", err) } // Insert order items with denormalized product info itemQuery := ` INSERT INTO order_item_read_model (order_id, product_id, product_name, quantity, price, subtotal) VALUES ($1, $2, $3, $4, $5, $6) ` for _, item := range event.Items { productName := p.getProductName(ctx, item.ProductID) subtotal := item.Price * float64(item.Quantity) _, err = tx.ExecContext(ctx, itemQuery, event.OrderID, item.ProductID, productName, item.Quantity, item.Price, subtotal, ) if err != nil { return fmt.Errorf("failed to insert order item: %w", err) } } return tx.Commit() } // HandleOrderConfirmed updates read model when order is confirmed func (p *OrderProjection) HandleOrderConfirmed(ctx context.Context, event events.OrderConfirmedEvent) error { query := ` UPDATE order_read_model SET status = $2, status_display = $3, updated_at = $4 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.OrderID, event.Status, formatStatus(event.Status), event.ConfirmedAt, ) return err } // HandleOrderCancelled updates read model when order is cancelled func (p *OrderProjection) HandleOrderCancelled(ctx context.Context, event events.OrderCancelledEvent) error { query := ` UPDATE order_read_model SET status = $2, status_display = $3, updated_at = $4 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.OrderID, event.Status, formatStatus(event.Status)+" ("+event.Reason+")", event.CancelledAt, ) return err } func (p *OrderProjection) getUserDetails(ctx context.Context, userID string) (name, email string) { // Query user service or cache query := `SELECT name, email FROM users WHERE id = $1` p.db.QueryRowContext(ctx, query, userID).Scan(&name, &email) return } func (p *OrderProjection) getProductName(ctx context.Context, productID string) string { // Query product service or cache var name string query := `SELECT name FROM products WHERE id = $1` p.db.QueryRowContext(ctx, query, productID).Scan(&name) return name } func formatStatus(status string) string { // Format status for display switch status { case "pending": return "Pending" case "confirmed": return "Confirmed" case "shipped": return "Shipped" case "delivered": return "Delivered" case "cancelled": return "Cancelled" default: return status } } // RegisterHandlers registers projection handlers with event bus func (p *OrderProjection) RegisterHandlers(bus events.EventBus) { bus.Subscribe("order.created", func(ctx context.Context, data interface{}) error { event := data.(events.OrderCreatedEvent) return p.HandleOrderCreated(ctx, event) }) bus.Subscribe("order.confirmed", func(ctx context.Context, data interface{}) error { event := data.(events.OrderConfirmedEvent) return p.HandleOrderConfirmed(ctx, event) }) bus.Subscribe("order.cancelled", func(ctx context.Context, data interface{}) error { event := data.(events.OrderCancelledEvent) return p.HandleOrderCancelled(ctx, event) }) } Main Application // cmd/api/main.go package main import ( "context" "database/sql" "encoding/json" "log" "net/http" "github.com/gorilla/mux" _ "github.com/lib/pq" "app/internal/commands" "app/internal/events" "app/internal/projections" "app/internal/queries" "app/internal/read" "app/internal/write" ) func main() { // Connect to write database writeDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_write?sslmode=disable") if err != nil { log.Fatal(err) } defer writeDB.Close() // Connect to read database (could be same or different DB) readDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_read?sslmode=disable") if err != nil { log.Fatal(err) } defer readDB.Close() // Initialize event bus eventBus := events.NewInMemoryEventBus() // Initialize repositories writeRepo := write.NewPostgresRepository(writeDB) readRepo := read.NewPostgresRepository(readDB) // Initialize projections orderProjection := projections.NewOrderProjection(readDB) orderProjection.RegisterHandlers(eventBus) // Initialize command handlers createOrderHandler := commands.NewCreateOrderHandler(writeRepo, eventBus) confirmOrderHandler := commands.NewConfirmOrderHandler(writeRepo, eventBus) cancelOrderHandler := commands.NewCancelOrderHandler(writeRepo, eventBus) // Initialize query handlers getOrderHandler := queries.NewGetOrderHandler(readRepo) listOrdersHandler := queries.NewListOrdersHandler(readRepo) // Setup HTTP routes router := mux.NewRouter() // Command endpoints router.HandleFunc("/commands/orders", func(w http.ResponseWriter, r *http.Request) { var cmd commands.CreateOrderCommand if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := createOrderHandler.Handle(r.Context(), &cmd) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) json.NewEncoder(w).Encode(map[string]string{"order_id": string(order.ID)}) }).Methods("POST") router.HandleFunc("/commands/orders/{id}/confirm", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) cmd := commands.ConfirmOrderCommand{OrderID: vars["id"]} if err := confirmOrderHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusAccepted) }).Methods("POST") // Query endpoints router.HandleFunc("/queries/orders/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) query := queries.GetOrderQuery{OrderID: vars["id"]} order, err := getOrderHandler.Handle(r.Context(), &query) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(order) }).Methods("GET") router.HandleFunc("/queries/orders", func(w http.ResponseWriter, r *http.Request) { query := queries.ListOrdersQuery{ UserID: r.URL.Query().Get("user_id"), Status: r.URL.Query().Get("status"), } result, err := listOrdersHandler.Handle(r.Context(), &query) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) }).Methods("GET") log.Println("Server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } Best Practices Clear Command/Query Separation: Never mix command and query logic Eventual Consistency: Accept and handle eventual consistency gracefully Idempotent Projections: Ensure projections can be replayed safely Versioning: Version commands and events for evolution Validation: Validate commands before processing Optimistic Locking: Use versioning to handle concurrent updates Monitoring: Track projection lag and command processing times Testing: Test commands and queries independently Common Pitfalls Over-Engineering: Using CQRS for simple CRUD applications Synchronous Projections: Making projections synchronous defeats the purpose Shared Models: Reusing write models in read side Complex Queries in Write Model: Adding query logic to command side No Monitoring: Not tracking eventual consistency lag Poor Error Handling: Not handling projection failures Inconsistent State: Not handling projection rebuild scenarios When to Use CQRS Pattern Use When: ...

    January 22, 2025 · 13 min · Rafiul Alam

    Event-Driven Architecture in Go: Building Reactive Systems

    Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern → What is Event-Driven Architecture? Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling. Key Principles: Event Producers: Components that generate events when state changes Event Consumers: Components that react to events Event Broker: Middleware that routes events from producers to consumers Asynchronous Communication: Decoupled, non-blocking interactions Event Immutability: Events represent past facts that cannot be changed Eventual Consistency: Systems converge to consistent state over time Architecture Overview graph TB subgraph "Event Producers" UserService[User Service] OrderService[Order Service] PaymentService[Payment Service] end subgraph "Event Broker" EventBus[Event Bus / Message Broker] UserTopic[User Events Topic] OrderTopic[Order Events Topic] PaymentTopic[Payment Events Topic] end subgraph "Event Consumers" EmailService[Email Service] AnalyticsService[Analytics Service] NotificationService[Notification Service] InventoryService[Inventory Service] end subgraph "Event Store" EventLog[(Event Log DB)] end UserService -->|UserCreated| UserTopic OrderService -->|OrderPlaced| OrderTopic OrderService -->|OrderConfirmed| OrderTopic PaymentService -->|PaymentProcessed| PaymentTopic UserTopic --> EventBus OrderTopic --> EventBus PaymentTopic --> EventBus EventBus --> EmailService EventBus --> AnalyticsService EventBus --> NotificationService EventBus --> InventoryService EventBus --> EventLog style UserService fill:#e1f5ff style OrderService fill:#fff4e1 style PaymentService fill:#f3e5f5 style EventBus fill:#e8f5e9 Event Flow Pattern sequenceDiagram participant User participant OrderService participant EventBroker participant EmailService participant InventoryService participant AnalyticsService User->>OrderService: Place Order OrderService->>OrderService: Validate Order OrderService->>EventBroker: Publish OrderPlaced Event Note over EventBroker: Event is distributed to all subscribers EventBroker->>EmailService: OrderPlaced Event EventBroker->>InventoryService: OrderPlaced Event EventBroker->>AnalyticsService: OrderPlaced Event EmailService->>EmailService: Send Confirmation Email InventoryService->>InventoryService: Update Stock AnalyticsService->>AnalyticsService: Record Metrics InventoryService->>EventBroker: Publish StockUpdated Event OrderService-->>User: Order Accepted (202) Real-World Use Cases E-commerce: Order processing, inventory updates, shipping notifications Financial Systems: Transaction processing, fraud detection, account updates IoT Platforms: Sensor data processing, device state changes Social Media: Post updates, notifications, activity feeds Streaming Services: Video processing, recommendation updates Gaming: Player actions, leaderboard updates, achievements Event-Driven Architecture Implementation Project Structure event-driven-app/ ├── cmd/ │ ├── order-service/ │ │ └── main.go │ ├── email-service/ │ │ └── main.go │ └── inventory-service/ │ └── main.go ├── internal/ │ ├── events/ │ │ ├── event.go │ │ ├── bus.go │ │ └── types.go │ ├── broker/ │ │ ├── rabbitmq.go │ │ └── kafka.go │ └── services/ │ ├── order/ │ ├── email/ │ └── inventory/ ├── pkg/ │ └── messaging/ │ └── publisher.go └── go.mod Core Event System // internal/events/event.go package events import ( "encoding/json" "time" ) // Event represents a domain event type Event struct { ID string `json:"id"` Type string `json:"type"` AggregateID string `json:"aggregate_id"` Payload map[string]interface{} `json:"payload"` Metadata map[string]string `json:"metadata"` Timestamp time.Time `json:"timestamp"` Version int `json:"version"` } // NewEvent creates a new event func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event { return &Event{ ID: generateEventID(), Type: eventType, AggregateID: aggregateID, Payload: payload, Metadata: make(map[string]string), Timestamp: time.Now(), Version: 1, } } // ToJSON serializes event to JSON func (e *Event) ToJSON() ([]byte, error) { return json.Marshal(e) } // FromJSON deserializes event from JSON func FromJSON(data []byte) (*Event, error) { var event Event if err := json.Unmarshal(data, &event); err != nil { return nil, err } return &event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } // internal/events/types.go package events const ( // User Events UserCreated = "user.created" UserUpdated = "user.updated" UserDeleted = "user.deleted" // Order Events OrderPlaced = "order.placed" OrderConfirmed = "order.confirmed" OrderCancelled = "order.cancelled" OrderShipped = "order.shipped" OrderDelivered = "order.delivered" // Payment Events PaymentProcessed = "payment.processed" PaymentFailed = "payment.failed" PaymentRefunded = "payment.refunded" // Inventory Events StockUpdated = "inventory.stock_updated" StockDepleted = "inventory.stock_depleted" StockRestocked = "inventory.stock_restocked" ) // internal/events/bus.go package events import ( "context" "fmt" "sync" ) // Handler is a function that processes events type Handler func(ctx context.Context, event *Event) error // EventBus manages event publishing and subscriptions type EventBus struct { mu sync.RWMutex handlers map[string][]Handler middleware []Middleware } // Middleware wraps event handlers type Middleware func(Handler) Handler // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ handlers: make(map[string][]Handler), middleware: make([]Middleware, 0), } } // Subscribe registers a handler for an event type func (b *EventBus) Subscribe(eventType string, handler Handler) { b.mu.Lock() defer b.mu.Unlock() // Apply middleware for i := len(b.middleware) - 1; i >= 0; i-- { handler = b.middleware[i](handler) } b.handlers[eventType] = append(b.handlers[eventType], handler) } // Publish publishes an event to all subscribers func (b *EventBus) Publish(ctx context.Context, event *Event) error { b.mu.RLock() handlers, exists := b.handlers[event.Type] b.mu.RUnlock() if !exists { return nil // No handlers registered } var wg sync.WaitGroup errors := make(chan error, len(handlers)) for _, handler := range handlers { wg.Add(1) go func(h Handler) { defer wg.Done() if err := h(ctx, event); err != nil { errors <- fmt.Errorf("handler error: %w", err) } }(handler) } wg.Wait() close(errors) // Collect errors var handlerErrors []error for err := range errors { handlerErrors = append(handlerErrors, err) } if len(handlerErrors) > 0 { return fmt.Errorf("some handlers failed: %v", handlerErrors) } return nil } // Use adds middleware to the event bus func (b *EventBus) Use(middleware Middleware) { b.mu.Lock() defer b.mu.Unlock() b.middleware = append(b.middleware, middleware) } Message Broker Integration (RabbitMQ) // internal/broker/rabbitmq.go package broker import ( "context" "fmt" "log" "github.com/rabbitmq/amqp091-go" "app/internal/events" ) // RabbitMQBroker wraps RabbitMQ for event publishing/consuming type RabbitMQBroker struct { conn *amqp091.Connection channel *amqp091.Channel } // NewRabbitMQBroker creates a new RabbitMQ broker func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) { conn, err := amqp091.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } channel, err := conn.Channel() if err != nil { return nil, fmt.Errorf("failed to open channel: %w", err) } return &RabbitMQBroker{ conn: conn, channel: channel, }, nil } // DeclareExchange declares an exchange func (b *RabbitMQBroker) DeclareExchange(name, kind string) error { return b.channel.ExchangeDeclare( name, // name kind, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) } // Publish publishes an event to RabbitMQ func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error { body, err := event.ToJSON() if err != nil { return fmt.Errorf("failed to serialize event: %w", err) } err = b.channel.PublishWithContext( ctx, exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp091.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp091.Persistent, MessageId: event.ID, Timestamp: event.Timestamp, }, ) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } // Subscribe subscribes to events from a queue func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error { // Declare queue queue, err := b.channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } // Bind queue to exchange err = b.channel.QueueBind( queue.Name, // queue name routingKey, // routing key exchange, // exchange false, nil, ) if err != nil { return fmt.Errorf("failed to bind queue: %w", err) } // Consume messages msgs, err := b.channel.Consume( queue.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("failed to register consumer: %w", err) } // Process messages go func() { for msg := range msgs { event, err := events.FromJSON(msg.Body) if err != nil { log.Printf("Failed to deserialize event: %v", err) msg.Nack(false, false) continue } if err := handler(context.Background(), event); err != nil { log.Printf("Handler error: %v", err) msg.Nack(false, true) // Requeue on error continue } msg.Ack(false) } }() return nil } // Close closes the broker connection func (b *RabbitMQBroker) Close() error { if err := b.channel.Close(); err != nil { return err } return b.conn.Close() } Order Service (Event Producer) // internal/services/order/service.go package order import ( "context" "fmt" "time" "app/internal/events" ) type Order struct { ID string `json:"id"` UserID string `json:"user_id"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) { order := &Order{ ID: generateOrderID(), UserID: userID, Total: total, Status: "pending", CreatedAt: time.Now(), } // Publish OrderPlaced event event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{ "order_id": order.ID, "user_id": order.UserID, "total": order.Total, "status": order.Status, }) if err := s.eventBus.Publish(ctx, event); err != nil { return nil, fmt.Errorf("failed to publish event: %w", err) } return order, nil } func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error { // Publish OrderConfirmed event event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{ "order_id": orderID, "status": "confirmed", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { // Publish OrderCancelled event event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{ "order_id": orderID, "status": "cancelled", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func generateOrderID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Email Service (Event Consumer) // internal/services/email/service.go package email import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { // SMTP client or email service client } func NewService() *Service { return &Service{} } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) userID := event.Payload["user_id"].(string) total := event.Payload["total"].(float64) log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID) // Send email if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf( "Your order %s for $%.2f has been placed successfully.", orderID, total, )); err != nil { return fmt.Errorf("failed to send email: %w", err) } return nil } // HandleOrderShipped handles order shipped events func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) trackingNumber := event.Payload["tracking_number"].(string) log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber) // Send email return nil } func (s *Service) sendEmail(recipient, subject, body string) error { // Implementation of email sending log.Printf("Email sent to %s: %s - %s", recipient, subject, body) return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderShipped, s.HandleOrderShipped) } Inventory Service (Event Consumer) // internal/services/inventory/service.go package inventory import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Updating inventory for order %s", orderID) // Update stock levels if err := s.updateStock(orderID); err != nil { return fmt.Errorf("failed to update stock: %w", err) } // Publish StockUpdated event stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{ "order_id": orderID, "stock_level": 100, "updated_at": time.Now(), }) if err := s.eventBus.Publish(ctx, stockEvent); err != nil { return fmt.Errorf("failed to publish stock event: %w", err) } return nil } // HandleOrderCancelled handles order cancelled events func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Restoring inventory for cancelled order %s", orderID) // Restore stock if err := s.restoreStock(orderID); err != nil { return fmt.Errorf("failed to restore stock: %w", err) } // Publish StockRestocked event stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{ "order_id": orderID, "restored": true, }) return s.eventBus.Publish(ctx, stockEvent) } func (s *Service) updateStock(orderID string) error { // Implementation of stock update return nil } func (s *Service) restoreStock(orderID string) error { // Implementation of stock restoration return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled) } Main Application // cmd/order-service/main.go package main import ( "context" "encoding/json" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gorilla/mux" "app/internal/broker" "app/internal/events" "app/internal/services/email" "app/internal/services/inventory" "app/internal/services/order" ) func main() { // Initialize event bus eventBus := events.NewEventBus() // Add logging middleware eventBus.Use(loggingMiddleware) // Initialize RabbitMQ broker rabbitURL := os.Getenv("RABBITMQ_URL") if rabbitURL == "" { rabbitURL = "amqp://guest:guest@localhost:5672/" } mqBroker, err := broker.NewRabbitMQBroker(rabbitURL) if err != nil { log.Fatal(err) } defer mqBroker.Close() // Declare exchange if err := mqBroker.DeclareExchange("events", "topic"); err != nil { log.Fatal(err) } // Initialize services orderService := order.NewService(eventBus) emailService := email.NewService() inventoryService := inventory.NewService(eventBus) // Register event handlers emailService.RegisterHandlers(eventBus) inventoryService.RegisterHandlers(eventBus) // Bridge event bus to RabbitMQ eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error { return mqBroker.Publish(ctx, "events", "order.placed", event) }) // Setup HTTP server router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Total float64 `json:"total"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) server := &http.Server{ Addr: ":8080", Handler: router, } // Start server go func() { log.Println("Server starting on :8080") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting down server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Fatal(err) } } func loggingMiddleware(next events.Handler) events.Handler { return func(ctx context.Context, event *events.Event) error { start := time.Now() log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID) err := next(ctx, event) log.Printf("Event %s handled in %v", event.ID, time.Since(start)) return err } } Best Practices Event Schema Versioning: Version your events to handle schema evolution Idempotent Handlers: Ensure handlers can process same event multiple times Error Handling: Implement retry logic and dead letter queues Event Ordering: Be aware that events may not arrive in order Event Store: Persist events for audit trail and replay Monitoring: Track event processing metrics and failures Testing: Test event handlers independently Documentation: Document all event types and their payloads Common Pitfalls Event Coupling: Events containing too much internal implementation details Lost Messages: Not ensuring message delivery guarantees Duplicate Processing: Not handling duplicate events No Versioning: Breaking changes to event schemas Synchronous Expectations: Expecting immediate consistency Overusing Events: Using events for simple request-response patterns Poor Error Handling: Not handling failures gracefully When to Use Event-Driven Architecture Use When: ...

    January 21, 2025 · 12 min · Rafiul Alam

    Microservices Architecture in Go: Building Distributed Systems

    Go Architecture Patterns Series: ← Previous: Modular Monolith | Series Overview | Next: Event-Driven Architecture → What is Microservices Architecture? Microservices Architecture is an approach where an application is composed of small, independent services that communicate over a network. Each service is self-contained, owns its data, and can be deployed, scaled, and updated independently. Key Principles: Service Independence: Each service is deployed and scaled independently Single Responsibility: Each service handles a specific business capability Decentralized Data: Each service owns its database API-First Design: Services communicate through well-defined APIs Resilience: Services handle failures gracefully Technology Diversity: Services can use different technologies Architecture Overview graph TB subgraph "Client Layer" Client[Web/Mobile Client] end subgraph "API Gateway" Gateway[API Gateway / Load Balancer] end subgraph "Service Mesh" UserService[User Service] ProductService[Product Service] OrderService[Order Service] PaymentService[Payment Service] NotificationService[Notification Service] end subgraph "Data Layer" UserDB[(User DB)] ProductDB[(Product DB)] OrderDB[(Order DB)] PaymentDB[(Payment DB)] end subgraph "Infrastructure" MessageBroker[Message Broker] ServiceRegistry[Service Discovery] ConfigServer[Config Server] end Client --> Gateway Gateway --> UserService Gateway --> ProductService Gateway --> OrderService Gateway --> PaymentService UserService --> UserDB ProductService --> ProductDB OrderService --> OrderDB PaymentService --> PaymentDB OrderService -.->|HTTP/gRPC| UserService OrderService -.->|HTTP/gRPC| ProductService OrderService -.->|HTTP/gRPC| PaymentService OrderService -.->|Async| MessageBroker PaymentService -.->|Async| MessageBroker NotificationService -.->|Subscribe| MessageBroker UserService -.-> ServiceRegistry ProductService -.-> ServiceRegistry OrderService -.-> ServiceRegistry style UserService fill:#e1f5ff style ProductService fill:#fff4e1 style OrderService fill:#e8f5e9 style PaymentService fill:#f3e5f5 style NotificationService fill:#ffe1f5 Service Communication Patterns sequenceDiagram participant Client participant Gateway participant OrderSvc as Order Service participant UserSvc as User Service participant ProductSvc as Product Service participant PaymentSvc as Payment Service participant Queue as Message Queue participant NotifySvc as Notification Service Client->>Gateway: Create Order Request Gateway->>OrderSvc: POST /orders OrderSvc->>UserSvc: GET /users/{id} UserSvc-->>OrderSvc: User Data OrderSvc->>ProductSvc: GET /products/{id} ProductSvc-->>OrderSvc: Product Data OrderSvc->>ProductSvc: POST /products/reserve ProductSvc-->>OrderSvc: Stock Reserved OrderSvc->>PaymentSvc: POST /payments PaymentSvc-->>OrderSvc: Payment Success OrderSvc->>Queue: Publish OrderCreated Event Queue->>NotifySvc: OrderCreated Event NotifySvc->>NotifySvc: Send Email/SMS OrderSvc-->>Gateway: Order Created Gateway-->>Client: Response Real-World Use Cases E-commerce Platforms: Amazon, eBay with separate services for products, orders, payments Streaming Services: Netflix with services for recommendations, playback, billing Ride-Sharing Apps: Uber with services for riders, drivers, payments, routing Financial Systems: Banking apps with separate services for accounts, transactions, loans Social Media: Twitter with services for posts, timelines, notifications, messages Cloud Platforms: AWS-like platforms with independent service offerings Microservices Implementation Project Structure (Multi-Repository) microservices/ ├── user-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ └── service/ │ ├── proto/ │ │ └── user.proto │ └── go.mod ├── product-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ └── service/ │ └── go.mod ├── order-service/ │ ├── cmd/ │ │ └── server/ │ │ └── main.go │ ├── internal/ │ │ ├── domain/ │ │ ├── handlers/ │ │ ├── repository/ │ │ ├── service/ │ │ └── clients/ │ └── go.mod └── api-gateway/ ├── cmd/ │ └── server/ │ └── main.go └── go.mod Service 1: User Service // user-service/internal/domain/user.go package domain import ( "context" "errors" "time" ) type User struct { ID string `json:"id"` Email string `json:"email"` Name string `json:"name"` Active bool `json:"active"` CreatedAt time.Time `json:"created_at"` } var ( ErrUserNotFound = errors.New("user not found") ErrUserExists = errors.New("user already exists") ) type Repository interface { Create(ctx context.Context, user *User) error GetByID(ctx context.Context, id string) (*User, error) GetByEmail(ctx context.Context, email string) (*User, error) Update(ctx context.Context, user *User) error } // user-service/internal/service/user_service.go package service import ( "context" "fmt" "time" "user-service/internal/domain" ) type UserService struct { repo domain.Repository } func NewUserService(repo domain.Repository) *UserService { return &UserService{repo: repo} } func (s *UserService) CreateUser(ctx context.Context, email, name string) (*domain.User, error) { existing, _ := s.repo.GetByEmail(ctx, email) if existing != nil { return nil, domain.ErrUserExists } user := &domain.User{ ID: generateID(), Email: email, Name: name, Active: true, CreatedAt: time.Now(), } if err := s.repo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } return user, nil } func (s *UserService) GetUser(ctx context.Context, id string) (*domain.User, error) { return s.repo.GetByID(ctx, id) } func (s *UserService) ValidateUser(ctx context.Context, id string) (bool, error) { user, err := s.repo.GetByID(ctx, id) if err != nil { return false, err } return user.Active, nil } func generateID() string { return fmt.Sprintf("user_%d", time.Now().UnixNano()) } // user-service/internal/handlers/http_handler.go package handlers import ( "encoding/json" "net/http" "github.com/gorilla/mux" "user-service/internal/service" ) type HTTPHandler struct { service *service.UserService } func NewHTTPHandler(service *service.UserService) *HTTPHandler { return &HTTPHandler{service: service} } type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` } func (h *HTTPHandler) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request") return } user, err := h.service.CreateUser(r.Context(), req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusCreated, user) } func (h *HTTPHandler) GetUser(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["id"] user, err := h.service.GetUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, user) } func (h *HTTPHandler) ValidateUser(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["id"] valid, err := h.service.ValidateUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, map[string]bool{"valid": valid}) } func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func respondError(w http.ResponseWriter, status int, message string) { respondJSON(w, status, map[string]string{"error": message}) } // user-service/cmd/server/main.go package main import ( "database/sql" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "user-service/internal/handlers" "user-service/internal/repository" "user-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/users?sslmode=disable" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) svc := service.NewUserService(repo) handler := handlers.NewHTTPHandler(svc) router := mux.NewRouter() router.HandleFunc("/users", handler.CreateUser).Methods("POST") router.HandleFunc("/users/{id}", handler.GetUser).Methods("GET") router.HandleFunc("/users/{id}/validate", handler.ValidateUser).Methods("GET") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) port := os.Getenv("PORT") if port == "" { port = "8081" } log.Printf("User service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Service 2: Product Service // product-service/internal/domain/product.go package domain import ( "context" "errors" "time" ) type Product struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Price float64 `json:"price"` Stock int `json:"stock"` CreatedAt time.Time `json:"created_at"` } var ( ErrProductNotFound = errors.New("product not found") ErrInsufficientStock = errors.New("insufficient stock") ) type Repository interface { Create(ctx context.Context, product *Product) error GetByID(ctx context.Context, id string) (*Product, error) Update(ctx context.Context, product *Product) error ReserveStock(ctx context.Context, id string, quantity int) error } // product-service/internal/service/product_service.go package service import ( "context" "fmt" "time" "product-service/internal/domain" ) type ProductService struct { repo domain.Repository } func NewProductService(repo domain.Repository) *ProductService { return &ProductService{repo: repo} } func (s *ProductService) CreateProduct(ctx context.Context, name, desc string, price float64, stock int) (*domain.Product, error) { product := &domain.Product{ ID: generateID(), Name: name, Description: desc, Price: price, Stock: stock, CreatedAt: time.Now(), } if err := s.repo.Create(ctx, product); err != nil { return nil, fmt.Errorf("failed to create product: %w", err) } return product, nil } func (s *ProductService) GetProduct(ctx context.Context, id string) (*domain.Product, error) { return s.repo.GetByID(ctx, id) } func (s *ProductService) ReserveStock(ctx context.Context, id string, quantity int) error { product, err := s.repo.GetByID(ctx, id) if err != nil { return err } if product.Stock < quantity { return domain.ErrInsufficientStock } return s.repo.ReserveStock(ctx, id, quantity) } func generateID() string { return fmt.Sprintf("product_%d", time.Now().UnixNano()) } // product-service/cmd/server/main.go package main import ( "database/sql" "encoding/json" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "product-service/internal/repository" "product-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/products?sslmode=disable" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) svc := service.NewProductService(repo) router := mux.NewRouter() router.HandleFunc("/products/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) product, err := svc.GetProduct(r.Context(), vars["id"]) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } json.NewEncoder(w).Encode(product) }).Methods("GET") router.HandleFunc("/products/reserve", func(w http.ResponseWriter, r *http.Request) { var req struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := svc.ReserveStock(r.Context(), req.ProductID, req.Quantity); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]string{"status": "reserved"}) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) port := os.Getenv("PORT") if port == "" { port = "8082" } log.Printf("Product service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Service 3: Order Service (Orchestrator) // order-service/internal/clients/user_client.go package clients import ( "context" "encoding/json" "fmt" "net/http" "time" ) type UserClient struct { baseURL string httpClient *http.Client } func NewUserClient(baseURL string) *UserClient { return &UserClient{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 5 * time.Second, }, } } func (c *UserClient) ValidateUser(ctx context.Context, userID string) (bool, error) { url := fmt.Sprintf("%s/users/%s/validate", c.baseURL, userID) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return false, err } resp, err := c.httpClient.Do(req) if err != nil { return false, fmt.Errorf("failed to call user service: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return false, fmt.Errorf("user service returned status %d", resp.StatusCode) } var result struct { Valid bool `json:"valid"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return false, err } return result.Valid, nil } // order-service/internal/clients/product_client.go package clients import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" ) type Product struct { ID string `json:"id"` Name string `json:"name"` Price float64 `json:"price"` Stock int `json:"stock"` } type ProductClient struct { baseURL string httpClient *http.Client } func NewProductClient(baseURL string) *ProductClient { return &ProductClient{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 5 * time.Second, }, } } func (c *ProductClient) GetProduct(ctx context.Context, productID string) (*Product, error) { url := fmt.Sprintf("%s/products/%s", c.baseURL, productID) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, err } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to call product service: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("product not found") } var product Product if err := json.NewDecoder(resp.Body).Decode(&product); err != nil { return nil, err } return &product, nil } func (c *ProductClient) ReserveStock(ctx context.Context, productID string, quantity int) error { url := fmt.Sprintf("%s/products/reserve", c.baseURL) reqBody := map[string]interface{}{ "product_id": productID, "quantity": quantity, } body, err := json.Marshal(reqBody) if err != nil { return err } req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to reserve stock: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("failed to reserve stock: status %d", resp.StatusCode) } return nil } // order-service/internal/service/order_service.go package service import ( "context" "fmt" "time" "order-service/internal/clients" "order-service/internal/domain" ) type OrderService struct { repo domain.Repository userClient *clients.UserClient productClient *clients.ProductClient } func NewOrderService( repo domain.Repository, userClient *clients.UserClient, productClient *clients.ProductClient, ) *OrderService { return &OrderService{ repo: repo, userClient: userClient, productClient: productClient, } } func (s *OrderService) CreateOrder(ctx context.Context, userID string, items []domain.OrderItem) (*domain.Order, error) { // Validate user via User Service valid, err := s.userClient.ValidateUser(ctx, userID) if err != nil { return nil, fmt.Errorf("failed to validate user: %w", err) } if !valid { return nil, fmt.Errorf("user is not valid") } // Validate products and calculate total var total float64 for i, item := range items { product, err := s.productClient.GetProduct(ctx, item.ProductID) if err != nil { return nil, fmt.Errorf("failed to get product: %w", err) } items[i].Price = product.Price total += product.Price * float64(item.Quantity) } // Reserve stock via Product Service for _, item := range items { if err := s.productClient.ReserveStock(ctx, item.ProductID, item.Quantity); err != nil { return nil, fmt.Errorf("failed to reserve stock: %w", err) } } order := &domain.Order{ ID: generateID(), UserID: userID, Items: items, Total: total, Status: "pending", CreatedAt: time.Now(), } if err := s.repo.Create(ctx, order); err != nil { return nil, fmt.Errorf("failed to create order: %w", err) } return order, nil } func generateID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } // order-service/cmd/server/main.go package main import ( "database/sql" "encoding/json" "log" "net/http" "os" "github.com/gorilla/mux" _ "github.com/lib/pq" "order-service/internal/clients" "order-service/internal/domain" "order-service/internal/repository" "order-service/internal/service" ) func main() { dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { dbURL = "postgres://user:pass@localhost/orders?sslmode=disable" } userServiceURL := os.Getenv("USER_SERVICE_URL") if userServiceURL == "" { userServiceURL = "http://localhost:8081" } productServiceURL := os.Getenv("PRODUCT_SERVICE_URL") if productServiceURL == "" { productServiceURL = "http://localhost:8082" } db, err := sql.Open("postgres", dbURL) if err != nil { log.Fatal(err) } defer db.Close() repo := repository.NewPostgresRepository(db) userClient := clients.NewUserClient(userServiceURL) productClient := clients.NewProductClient(productServiceURL) svc := service.NewOrderService(repo, userClient, productClient) router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Items []domain.OrderItem `json:"items"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := svc.CreateOrder(r.Context(), req.UserID, req.Items) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) port := os.Getenv("PORT") if port == "" { port = "8083" } log.Printf("Order service starting on port %s", port) if err := http.ListenAndServe(":"+port, router); err != nil { log.Fatal(err) } } Docker Compose Setup version: '3.8' services: user-service: build: ./user-service ports: - "8081:8081" environment: - DATABASE_URL=postgres://user:pass@user-db:5432/users?sslmode=disable - PORT=8081 depends_on: - user-db product-service: build: ./product-service ports: - "8082:8082" environment: - DATABASE_URL=postgres://user:pass@product-db:5432/products?sslmode=disable - PORT=8082 depends_on: - product-db order-service: build: ./order-service ports: - "8083:8083" environment: - DATABASE_URL=postgres://user:pass@order-db:5432/orders?sslmode=disable - USER_SERVICE_URL=http://user-service:8081 - PRODUCT_SERVICE_URL=http://product-service:8082 - PORT=8083 depends_on: - order-db - user-service - product-service user-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=users product-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=products order-db: image: postgres:15 environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=pass - POSTGRES_DB=orders Best Practices Service Boundaries: Define clear service boundaries based on business capabilities API Contracts: Use API versioning and maintain backward compatibility Service Discovery: Implement service registry for dynamic service location Circuit Breakers: Prevent cascading failures with circuit breaker pattern Distributed Tracing: Implement tracing to debug cross-service calls Health Checks: Provide health endpoints for monitoring Configuration Management: Externalize configuration Security: Implement service-to-service authentication Common Pitfalls Distributed Monolith: Services too tightly coupled, defeating the purpose Chatty Services: Too many synchronous calls between services Shared Database: Multiple services accessing the same database Ignoring Network Failures: Not handling network errors gracefully No Service Versioning: Breaking changes without versioning Data Consistency Issues: Not handling eventual consistency Over-Engineering: Creating too many small services When to Use Microservices Architecture Use When: ...

    January 20, 2025 · 13 min · Rafiul Alam

    Modular Monolith Architecture in Go: Scaling Without Microservices

    Go Architecture Patterns Series: ← Previous: Domain-Driven Design | Series Overview | Next: Microservices Architecture → What is Modular Monolith Architecture? Modular Monolith Architecture is an approach that combines the simplicity of monolithic deployment with the modularity of microservices. It organizes code into independent, loosely coupled modules with well-defined boundaries, all deployed as a single application. Key Principles: Module Independence: Each module is self-contained with its own domain logic Clear Boundaries: Modules communicate through well-defined interfaces Shared Deployment: All modules deployed together in a single process Domain Alignment: Modules organized around business capabilities Internal APIs: Modules expose APIs for inter-module communication Data Ownership: Each module owns its data and database schema Architecture Overview graph TD subgraph "Modular Monolith Application" API[API Gateway/Router] subgraph "User Module" U1[User Service] U2[User Repository] U3[(User DB Schema)] end subgraph "Order Module" O1[Order Service] O2[Order Repository] O3[(Order DB Schema)] end subgraph "Product Module" P1[Product Service] P2[Product Repository] P3[(Product DB Schema)] end subgraph "Payment Module" PA1[Payment Service] PA2[Payment Repository] PA3[(Payment DB Schema)] end API --> U1 API --> O1 API --> P1 API --> PA1 U1 --> U2 O1 --> O2 P1 --> P2 PA1 --> PA2 U2 --> U3 O2 --> O3 P2 --> P3 PA2 --> PA3 O1 -.->|Module API| U1 O1 -.->|Module API| P1 O1 -.->|Module API| PA1 end style U1 fill:#e1f5ff style O1 fill:#fff4e1 style P1 fill:#e8f5e9 style PA1 fill:#f3e5f5 Module Communication Patterns sequenceDiagram participant Client participant OrderModule participant UserModule participant ProductModule participant PaymentModule Client->>OrderModule: Create Order OrderModule->>UserModule: Validate User UserModule-->>OrderModule: User Valid OrderModule->>ProductModule: Check Stock ProductModule-->>OrderModule: Stock Available OrderModule->>ProductModule: Reserve Items ProductModule-->>OrderModule: Items Reserved OrderModule->>PaymentModule: Process Payment PaymentModule-->>OrderModule: Payment Success OrderModule->>ProductModule: Confirm Reservation ProductModule-->>OrderModule: Confirmed OrderModule-->>Client: Order Created Real-World Use Cases E-commerce Platforms: Product, order, inventory, and payment management SaaS Applications: Multi-tenant applications with distinct features Content Management Systems: Content, media, user, and workflow modules Banking Systems: Account, transaction, loan, and reporting modules Healthcare Systems: Patient, appointment, billing, and medical records Enterprise Applications: HR, finance, inventory, and CRM modules Modular Monolith Implementation Project Structure ├── cmd/ │ └── app/ │ └── main.go ├── internal/ │ ├── modules/ │ │ ├── user/ │ │ │ ├── domain/ │ │ │ │ ├── user.go │ │ │ │ └── repository.go │ │ │ ├── application/ │ │ │ │ └── service.go │ │ │ ├── infrastructure/ │ │ │ │ └── postgres_repository.go │ │ │ ├── api/ │ │ │ │ └── http_handler.go │ │ │ └── module.go │ │ ├── order/ │ │ │ ├── domain/ │ │ │ │ ├── order.go │ │ │ │ └── repository.go │ │ │ ├── application/ │ │ │ │ └── service.go │ │ │ ├── infrastructure/ │ │ │ │ └── postgres_repository.go │ │ │ ├── api/ │ │ │ │ └── http_handler.go │ │ │ └── module.go │ │ ├── product/ │ │ │ ├── domain/ │ │ │ │ ├── product.go │ │ │ │ └── repository.go │ │ │ ├── application/ │ │ │ │ └── service.go │ │ │ ├── infrastructure/ │ │ │ │ └── postgres_repository.go │ │ │ ├── api/ │ │ │ │ └── http_handler.go │ │ │ └── module.go │ │ └── payment/ │ │ ├── domain/ │ │ │ ├── payment.go │ │ │ └── repository.go │ │ ├── application/ │ │ │ └── service.go │ │ ├── infrastructure/ │ │ │ └── postgres_repository.go │ │ ├── api/ │ │ │ └── http_handler.go │ │ └── module.go │ └── shared/ │ ├── database/ │ │ └── postgres.go │ └── events/ │ └── event_bus.go └── go.mod Module 1: User Module // internal/modules/user/domain/user.go package domain import ( "context" "errors" "time" ) type UserID string type User struct { ID UserID Email string Name string Active bool CreatedAt time.Time UpdatedAt time.Time } var ( ErrUserNotFound = errors.New("user not found") ErrUserAlreadyExists = errors.New("user already exists") ErrInvalidEmail = errors.New("invalid email") ) // Repository defines the interface for user storage type Repository interface { Create(ctx context.Context, user *User) error GetByID(ctx context.Context, id UserID) (*User, error) GetByEmail(ctx context.Context, email string) (*User, error) Update(ctx context.Context, user *User) error Delete(ctx context.Context, id UserID) error } // internal/modules/user/application/service.go package application import ( "context" "fmt" "regexp" "app/internal/modules/user/domain" ) type Service struct { repo domain.Repository } func NewService(repo domain.Repository) *Service { return &Service{repo: repo} } func (s *Service) CreateUser(ctx context.Context, email, name string) (*domain.User, error) { if !isValidEmail(email) { return nil, domain.ErrInvalidEmail } // Check if user exists existing, _ := s.repo.GetByEmail(ctx, email) if existing != nil { return nil, domain.ErrUserAlreadyExists } user := &domain.User{ ID: domain.UserID(generateID()), Email: email, Name: name, Active: true, CreatedAt: time.Now(), UpdatedAt: time.Now(), } if err := s.repo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } return user, nil } func (s *Service) GetUser(ctx context.Context, id domain.UserID) (*domain.User, error) { return s.repo.GetByID(ctx, id) } func (s *Service) ValidateUser(ctx context.Context, id domain.UserID) (bool, error) { user, err := s.repo.GetByID(ctx, id) if err != nil { return false, err } return user.Active, nil } func isValidEmail(email string) bool { emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`) return emailRegex.MatchString(email) } func generateID() string { return fmt.Sprintf("user_%d", time.Now().UnixNano()) } // internal/modules/user/infrastructure/postgres_repository.go package infrastructure import ( "context" "database/sql" "fmt" "app/internal/modules/user/domain" ) type PostgresRepository struct { db *sql.DB } func NewPostgresRepository(db *sql.DB) *PostgresRepository { return &PostgresRepository{db: db} } func (r *PostgresRepository) Create(ctx context.Context, user *domain.User) error { query := ` INSERT INTO users.users (id, email, name, active, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) ` _, err := r.db.ExecContext(ctx, query, user.ID, user.Email, user.Name, user.Active, user.CreatedAt, user.UpdatedAt) return err } func (r *PostgresRepository) GetByID(ctx context.Context, id domain.UserID) (*domain.User, error) { query := ` SELECT id, email, name, active, created_at, updated_at FROM users.users WHERE id = $1 ` user := &domain.User{} err := r.db.QueryRowContext(ctx, query, id).Scan( &user.ID, &user.Email, &user.Name, &user.Active, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, domain.ErrUserNotFound } return user, err } func (r *PostgresRepository) GetByEmail(ctx context.Context, email string) (*domain.User, error) { query := ` SELECT id, email, name, active, created_at, updated_at FROM users.users WHERE email = $1 ` user := &domain.User{} err := r.db.QueryRowContext(ctx, query, email).Scan( &user.ID, &user.Email, &user.Name, &user.Active, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, domain.ErrUserNotFound } return user, err } func (r *PostgresRepository) Update(ctx context.Context, user *domain.User) error { query := ` UPDATE users.users SET email = $2, name = $3, active = $4, updated_at = $5 WHERE id = $1 ` _, err := r.db.ExecContext(ctx, query, user.ID, user.Email, user.Name, user.Active, user.UpdatedAt) return err } func (r *PostgresRepository) Delete(ctx context.Context, id domain.UserID) error { query := `DELETE FROM users.users WHERE id = $1` _, err := r.db.ExecContext(ctx, query, id) return err } // internal/modules/user/module.go package user import ( "database/sql" "app/internal/modules/user/application" "app/internal/modules/user/infrastructure" ) type Module struct { Service *application.Service } func NewModule(db *sql.DB) *Module { repo := infrastructure.NewPostgresRepository(db) service := application.NewService(repo) return &Module{ Service: service, } } Module 2: Product Module // internal/modules/product/domain/product.go package domain import ( "context" "errors" "time" ) type ProductID string type Product struct { ID ProductID Name string Description string Price float64 Stock int CreatedAt time.Time UpdatedAt time.Time } var ( ErrProductNotFound = errors.New("product not found") ErrInsufficientStock = errors.New("insufficient stock") ErrInvalidPrice = errors.New("invalid price") ) type Repository interface { Create(ctx context.Context, product *Product) error GetByID(ctx context.Context, id ProductID) (*Product, error) Update(ctx context.Context, product *Product) error ReserveStock(ctx context.Context, id ProductID, quantity int) error ReleaseStock(ctx context.Context, id ProductID, quantity int) error } // internal/modules/product/application/service.go package application import ( "context" "fmt" "app/internal/modules/product/domain" ) type Service struct { repo domain.Repository } func NewService(repo domain.Repository) *Service { return &Service{repo: repo} } func (s *Service) CreateProduct(ctx context.Context, name, description string, price float64, stock int) (*domain.Product, error) { if price <= 0 { return nil, domain.ErrInvalidPrice } product := &domain.Product{ ID: domain.ProductID(generateID()), Name: name, Description: description, Price: price, Stock: stock, CreatedAt: time.Now(), UpdatedAt: time.Now(), } if err := s.repo.Create(ctx, product); err != nil { return nil, fmt.Errorf("failed to create product: %w", err) } return product, nil } func (s *Service) GetProduct(ctx context.Context, id domain.ProductID) (*domain.Product, error) { return s.repo.GetByID(ctx, id) } func (s *Service) CheckStock(ctx context.Context, id domain.ProductID, quantity int) (bool, error) { product, err := s.repo.GetByID(ctx, id) if err != nil { return false, err } return product.Stock >= quantity, nil } func (s *Service) ReserveStock(ctx context.Context, id domain.ProductID, quantity int) error { available, err := s.CheckStock(ctx, id, quantity) if err != nil { return err } if !available { return domain.ErrInsufficientStock } return s.repo.ReserveStock(ctx, id, quantity) } func (s *Service) ConfirmReservation(ctx context.Context, id domain.ProductID, quantity int) error { // In a real implementation, this would mark the reservation as confirmed return nil } func generateID() string { return fmt.Sprintf("product_%d", time.Now().UnixNano()) } // internal/modules/product/infrastructure/postgres_repository.go package infrastructure import ( "context" "database/sql" "app/internal/modules/product/domain" ) type PostgresRepository struct { db *sql.DB } func NewPostgresRepository(db *sql.DB) *PostgresRepository { return &PostgresRepository{db: db} } func (r *PostgresRepository) Create(ctx context.Context, product *domain.Product) error { query := ` INSERT INTO products.products (id, name, description, price, stock, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ` _, err := r.db.ExecContext(ctx, query, product.ID, product.Name, product.Description, product.Price, product.Stock, product.CreatedAt, product.UpdatedAt) return err } func (r *PostgresRepository) GetByID(ctx context.Context, id domain.ProductID) (*domain.Product, error) { query := ` SELECT id, name, description, price, stock, created_at, updated_at FROM products.products WHERE id = $1 ` product := &domain.Product{} err := r.db.QueryRowContext(ctx, query, id).Scan( &product.ID, &product.Name, &product.Description, &product.Price, &product.Stock, &product.CreatedAt, &product.UpdatedAt, ) if err == sql.ErrNoRows { return nil, domain.ErrProductNotFound } return product, err } func (r *PostgresRepository) Update(ctx context.Context, product *domain.Product) error { query := ` UPDATE products.products SET name = $2, description = $3, price = $4, stock = $5, updated_at = $6 WHERE id = $1 ` _, err := r.db.ExecContext(ctx, query, product.ID, product.Name, product.Description, product.Price, product.Stock, product.UpdatedAt) return err } func (r *PostgresRepository) ReserveStock(ctx context.Context, id domain.ProductID, quantity int) error { query := ` UPDATE products.products SET stock = stock - $2 WHERE id = $1 AND stock >= $2 ` result, err := r.db.ExecContext(ctx, query, id, quantity) if err != nil { return err } rows, err := result.RowsAffected() if err != nil { return err } if rows == 0 { return domain.ErrInsufficientStock } return nil } func (r *PostgresRepository) ReleaseStock(ctx context.Context, id domain.ProductID, quantity int) error { query := ` UPDATE products.products SET stock = stock + $2 WHERE id = $1 ` _, err := r.db.ExecContext(ctx, query, id, quantity) return err } // internal/modules/product/module.go package product import ( "database/sql" "app/internal/modules/product/application" "app/internal/modules/product/infrastructure" ) type Module struct { Service *application.Service } func NewModule(db *sql.DB) *Module { repo := infrastructure.NewPostgresRepository(db) service := application.NewService(repo) return &Module{ Service: service, } } Module 3: Order Module (Coordinates Other Modules) // internal/modules/order/domain/order.go package domain import ( "context" "errors" "time" "app/internal/modules/user/domain" "app/internal/modules/product/domain" ) type OrderID string type OrderStatus string const ( OrderStatusPending OrderStatus = "pending" OrderStatusConfirmed OrderStatus = "confirmed" OrderStatusCancelled OrderStatus = "cancelled" ) type OrderItem struct { ProductID domain.ProductID Quantity int Price float64 } type Order struct { ID OrderID UserID domain.UserID Items []OrderItem Total float64 Status OrderStatus CreatedAt time.Time UpdatedAt time.Time } var ( ErrOrderNotFound = errors.New("order not found") ErrInvalidOrder = errors.New("invalid order") ) type Repository interface { Create(ctx context.Context, order *Order) error GetByID(ctx context.Context, id OrderID) (*Order, error) Update(ctx context.Context, order *Order) error } // internal/modules/order/application/service.go package application import ( "context" "fmt" "time" orderdomain "app/internal/modules/order/domain" productapp "app/internal/modules/product/application" userapp "app/internal/modules/user/application" ) // Service coordinates between modules type Service struct { repo orderdomain.Repository userService *userapp.Service productService *productapp.Service } func NewService( repo orderdomain.Repository, userService *userapp.Service, productService *productapp.Service, ) *Service { return &Service{ repo: repo, userService: userService, productService: productService, } } func (s *Service) CreateOrder(ctx context.Context, userID domain.UserID, items []orderdomain.OrderItem) (*orderdomain.Order, error) { // Validate user through User module valid, err := s.userService.ValidateUser(ctx, userID) if err != nil { return nil, fmt.Errorf("failed to validate user: %w", err) } if !valid { return nil, fmt.Errorf("user is not active") } // Calculate total and validate products var total float64 for i, item := range items { product, err := s.productService.GetProduct(ctx, item.ProductID) if err != nil { return nil, fmt.Errorf("failed to get product: %w", err) } // Check stock availability available, err := s.productService.CheckStock(ctx, item.ProductID, item.Quantity) if err != nil { return nil, fmt.Errorf("failed to check stock: %w", err) } if !available { return nil, fmt.Errorf("insufficient stock for product %s", item.ProductID) } items[i].Price = product.Price total += product.Price * float64(item.Quantity) } // Reserve stock for all items for _, item := range items { if err := s.productService.ReserveStock(ctx, item.ProductID, item.Quantity); err != nil { // Rollback reservations on failure return nil, fmt.Errorf("failed to reserve stock: %w", err) } } order := &orderdomain.Order{ ID: orderdomain.OrderID(generateID()), UserID: userID, Items: items, Total: total, Status: orderdomain.OrderStatusPending, CreatedAt: time.Now(), UpdatedAt: time.Now(), } if err := s.repo.Create(ctx, order); err != nil { return nil, fmt.Errorf("failed to create order: %w", err) } return order, nil } func (s *Service) GetOrder(ctx context.Context, id orderdomain.OrderID) (*orderdomain.Order, error) { return s.repo.GetByID(ctx, id) } func (s *Service) ConfirmOrder(ctx context.Context, id orderdomain.OrderID) error { order, err := s.repo.GetByID(ctx, id) if err != nil { return err } // Confirm stock reservations for _, item := range order.Items { if err := s.productService.ConfirmReservation(ctx, item.ProductID, item.Quantity); err != nil { return fmt.Errorf("failed to confirm reservation: %w", err) } } order.Status = orderdomain.OrderStatusConfirmed order.UpdatedAt = time.Now() return s.repo.Update(ctx, order) } func generateID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } // internal/modules/order/module.go package order import ( "database/sql" "app/internal/modules/order/application" "app/internal/modules/order/infrastructure" productapp "app/internal/modules/product/application" userapp "app/internal/modules/user/application" ) type Module struct { Service *application.Service } func NewModule(db *sql.DB, userService *userapp.Service, productService *productapp.Service) *Module { repo := infrastructure.NewPostgresRepository(db) service := application.NewService(repo, userService, productService) return &Module{ Service: service, } } Main Application // cmd/app/main.go package main import ( "database/sql" "log" "net/http" _ "github.com/lib/pq" "app/internal/modules/user" "app/internal/modules/product" "app/internal/modules/order" ) func main() { // Initialize database db, err := sql.Open("postgres", "postgres://user:pass@localhost/modular_monolith?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize modules userModule := user.NewModule(db) productModule := product.NewModule(db) orderModule := order.NewModule(db, userModule.Service, productModule.Service) // Setup HTTP routes mux := http.NewServeMux() // User endpoints mux.HandleFunc("POST /users", func(w http.ResponseWriter, r *http.Request) { // Handle user creation }) // Product endpoints mux.HandleFunc("POST /products", func(w http.ResponseWriter, r *http.Request) { // Handle product creation }) // Order endpoints mux.HandleFunc("POST /orders", func(w http.ResponseWriter, r *http.Request) { // Handle order creation using orderModule.Service }) log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatal(err) } } Best Practices Module Boundaries: Keep modules independent with clear interfaces Shared Database: Use schemas or table prefixes to separate module data Module APIs: Define explicit APIs for inter-module communication Dependency Direction: Modules should depend on interfaces, not implementations Event-Driven Communication: Use events for async inter-module communication Transaction Management: Handle cross-module transactions carefully Testing: Test modules independently with mocked dependencies Documentation: Document module APIs and boundaries clearly Common Pitfalls Shared Models: Sharing domain models between modules creates tight coupling Direct Database Access: Modules accessing other modules’ database tables Circular Dependencies: Modules depending on each other directly Anemic Modules: Modules with no business logic, just CRUD operations God Modules: Modules that know too much about other modules Ignoring Boundaries: Calling internal implementations instead of module APIs Synchronous Coupling: Over-reliance on synchronous inter-module calls When to Use Modular Monolith Use When: ...

    January 19, 2025 · 13 min · Rafiul Alam

    Layered Architecture in Go: Building Maintainable Applications

    Go Architecture Patterns Series: Series Overview | Next: Clean Architecture → What is Layered Architecture? Layered Architecture is one of the most common and fundamental architectural patterns in software development. It organizes code into horizontal layers, where each layer has a specific responsibility and only communicates with adjacent layers. Key Principles: Separation of Concerns: Each layer handles a specific aspect of the application Layer Independence: Layers are loosely coupled and can be changed independently Unidirectional Dependencies: Dependencies flow in one direction (top-down) Clear Boundaries: Well-defined interfaces between layers Testability: Each layer can be tested in isolation Architecture Overview graph TD A[Presentation Layer] --> B[Business Logic Layer] B --> C[Data Access Layer] C --> D[(Database)] style A fill:#e1f5ff style B fill:#fff4e1 style C fill:#e8f5e9 style D fill:#f3e5f5 Traditional 3-Tier Layered Architecture graph LR subgraph "Presentation Layer" A1[HTTP Handlers] A2[Templates/Views] A3[Request/Response DTOs] end subgraph "Business Logic Layer" B1[Services] B2[Business Rules] B3[Domain Models] end subgraph "Data Access Layer" C1[Repositories] C2[Database Access] C3[Data Models] end A1 --> B1 A2 --> B1 B1 --> C1 C1 --> C2 style A1 fill:#e1f5ff style B1 fill:#fff4e1 style C1 fill:#e8f5e9 Real-World Use Cases Web Applications: RESTful APIs and web services Enterprise Applications: Business management systems CRUD Applications: Standard create-read-update-delete operations Monolithic Applications: Traditional single-deployment applications Internal Tools: Admin panels and dashboards Legacy System Modernization: Refactoring existing codebases Basic Layered Architecture Implementation Project Structure ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── handlers/ # Presentation Layer │ │ ├── user_handler.go │ │ └── product_handler.go │ ├── services/ # Business Logic Layer │ │ ├── user_service.go │ │ └── product_service.go │ ├── repositories/ # Data Access Layer │ │ ├── user_repository.go │ │ └── product_repository.go │ └── models/ # Domain Models │ ├── user.go │ └── product.go └── go.mod Layer 1: Domain Models package models import "time" // User represents a user in the system type User struct { ID int64 `json:"id"` Email string `json:"email"` Name string `json:"name"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // Product represents a product in the system type Product struct { ID int64 `json:"id"` Name string `json:"name"` Description string `json:"description"` Price float64 `json:"price"` Stock int `json:"stock"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // Order represents an order in the system type Order struct { ID int64 `json:"id"` UserID int64 `json:"user_id"` ProductID int64 `json:"product_id"` Quantity int `json:"quantity"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } Layer 2: Data Access Layer (Repository) package repositories import ( "context" "database/sql" "fmt" "time" "myapp/internal/models" ) // UserRepository handles user data access type UserRepository struct { db *sql.DB } // NewUserRepository creates a new user repository func NewUserRepository(db *sql.DB) *UserRepository { return &UserRepository{db: db} } // Create creates a new user func (r *UserRepository) Create(ctx context.Context, user *models.User) error { query := ` INSERT INTO users (email, name, created_at, updated_at) VALUES ($1, $2, $3, $4) RETURNING id ` now := time.Now() user.CreatedAt = now user.UpdatedAt = now err := r.db.QueryRowContext( ctx, query, user.Email, user.Name, user.CreatedAt, user.UpdatedAt, ).Scan(&user.ID) if err != nil { return fmt.Errorf("failed to create user: %w", err) } return nil } // GetByID retrieves a user by ID func (r *UserRepository) GetByID(ctx context.Context, id int64) (*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users WHERE id = $1 ` user := &models.User{} err := r.db.QueryRowContext(ctx, query, id).Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, fmt.Errorf("user not found") } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } return user, nil } // GetByEmail retrieves a user by email func (r *UserRepository) GetByEmail(ctx context.Context, email string) (*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users WHERE email = $1 ` user := &models.User{} err := r.db.QueryRowContext(ctx, query, email).Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, fmt.Errorf("user not found") } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } return user, nil } // Update updates a user func (r *UserRepository) Update(ctx context.Context, user *models.User) error { query := ` UPDATE users SET email = $1, name = $2, updated_at = $3 WHERE id = $4 ` user.UpdatedAt = time.Now() result, err := r.db.ExecContext( ctx, query, user.Email, user.Name, user.UpdatedAt, user.ID, ) if err != nil { return fmt.Errorf("failed to update user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("user not found") } return nil } // Delete deletes a user func (r *UserRepository) Delete(ctx context.Context, id int64) error { query := `DELETE FROM users WHERE id = $1` result, err := r.db.ExecContext(ctx, query, id) if err != nil { return fmt.Errorf("failed to delete user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("user not found") } return nil } // List retrieves all users func (r *UserRepository) List(ctx context.Context, limit, offset int) ([]*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users ORDER BY id DESC LIMIT $1 OFFSET $2 ` rows, err := r.db.QueryContext(ctx, query, limit, offset) if err != nil { return nil, fmt.Errorf("failed to list users: %w", err) } defer rows.Close() var users []*models.User for rows.Next() { user := &models.User{} err := rows.Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to scan user: %w", err) } users = append(users, user) } return users, nil } // ProductRepository handles product data access type ProductRepository struct { db *sql.DB } // NewProductRepository creates a new product repository func NewProductRepository(db *sql.DB) *ProductRepository { return &ProductRepository{db: db} } // UpdateStock updates product stock func (r *ProductRepository) UpdateStock(ctx context.Context, productID int64, quantity int) error { query := ` UPDATE products SET stock = stock + $1, updated_at = $2 WHERE id = $3 ` result, err := r.db.ExecContext(ctx, query, quantity, time.Now(), productID) if err != nil { return fmt.Errorf("failed to update stock: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("product not found") } return nil } Layer 3: Business Logic Layer (Service) package services import ( "context" "fmt" "regexp" "myapp/internal/models" "myapp/internal/repositories" ) // UserService handles user business logic type UserService struct { userRepo *repositories.UserRepository } // NewUserService creates a new user service func NewUserService(userRepo *repositories.UserRepository) *UserService { return &UserService{ userRepo: userRepo, } } // CreateUser creates a new user with validation func (s *UserService) CreateUser(ctx context.Context, email, name string) (*models.User, error) { // Business rule: Validate email format if !isValidEmail(email) { return nil, fmt.Errorf("invalid email format") } // Business rule: Name must not be empty if name == "" { return nil, fmt.Errorf("name cannot be empty") } // Business rule: Email must be unique existingUser, err := s.userRepo.GetByEmail(ctx, email) if err == nil && existingUser != nil { return nil, fmt.Errorf("email already exists") } user := &models.User{ Email: email, Name: name, } if err := s.userRepo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } return user, nil } // GetUser retrieves a user by ID func (s *UserService) GetUser(ctx context.Context, id int64) (*models.User, error) { return s.userRepo.GetByID(ctx, id) } // UpdateUser updates a user with validation func (s *UserService) UpdateUser(ctx context.Context, id int64, email, name string) (*models.User, error) { // Business rule: Validate email format if !isValidEmail(email) { return nil, fmt.Errorf("invalid email format") } // Business rule: Name must not be empty if name == "" { return nil, fmt.Errorf("name cannot be empty") } user, err := s.userRepo.GetByID(ctx, id) if err != nil { return nil, fmt.Errorf("user not found: %w", err) } // Business rule: If email is changing, check uniqueness if user.Email != email { existingUser, err := s.userRepo.GetByEmail(ctx, email) if err == nil && existingUser != nil { return nil, fmt.Errorf("email already exists") } } user.Email = email user.Name = name if err := s.userRepo.Update(ctx, user); err != nil { return nil, fmt.Errorf("failed to update user: %w", err) } return user, nil } // DeleteUser deletes a user func (s *UserService) DeleteUser(ctx context.Context, id int64) error { return s.userRepo.Delete(ctx, id) } // ListUsers retrieves users with pagination func (s *UserService) ListUsers(ctx context.Context, page, pageSize int) ([]*models.User, error) { // Business rule: Validate pagination parameters if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } offset := (page - 1) * pageSize return s.userRepo.List(ctx, pageSize, offset) } // isValidEmail validates email format func isValidEmail(email string) bool { emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`) return emailRegex.MatchString(email) } // OrderService handles order business logic type OrderService struct { userRepo *repositories.UserRepository productRepo *repositories.ProductRepository } // NewOrderService creates a new order service func NewOrderService( userRepo *repositories.UserRepository, productRepo *repositories.ProductRepository, ) *OrderService { return &OrderService{ userRepo: userRepo, productRepo: productRepo, } } // PlaceOrder places a new order with business validation func (s *OrderService) PlaceOrder(ctx context.Context, userID, productID int64, quantity int) (*models.Order, error) { // Business rule: Validate user exists user, err := s.userRepo.GetByID(ctx, userID) if err != nil { return nil, fmt.Errorf("user not found: %w", err) } // Business rule: Validate quantity if quantity <= 0 { return nil, fmt.Errorf("quantity must be positive") } // Business rule: Check stock availability // (In a real implementation, this would be in ProductRepository) // Business rule: Calculate total // (In a real implementation, this would fetch product price) order := &models.Order{ UserID: user.ID, ProductID: productID, Quantity: quantity, Status: "pending", } return order, nil } Layer 4: Presentation Layer (HTTP Handlers) package handlers import ( "encoding/json" "net/http" "strconv" "myapp/internal/services" ) // UserHandler handles HTTP requests for users type UserHandler struct { userService *services.UserService } // NewUserHandler creates a new user handler func NewUserHandler(userService *services.UserService) *UserHandler { return &UserHandler{ userService: userService, } } // CreateUserRequest represents the request to create a user type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` } // CreateUser handles POST /users func (h *UserHandler) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request body") return } user, err := h.userService.CreateUser(r.Context(), req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusCreated, user) } // GetUser handles GET /users/:id func (h *UserHandler) GetUser(w http.ResponseWriter, r *http.Request) { // Extract ID from URL (implementation depends on router) id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } user, err := h.userService.GetUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, user) } // UpdateUser handles PUT /users/:id func (h *UserHandler) UpdateUser(w http.ResponseWriter, r *http.Request) { id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request body") return } user, err := h.userService.UpdateUser(r.Context(), id, req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusOK, user) } // DeleteUser handles DELETE /users/:id func (h *UserHandler) DeleteUser(w http.ResponseWriter, r *http.Request) { id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } if err := h.userService.DeleteUser(r.Context(), id); err != nil { respondError(w, http.StatusNotFound, err.Error()) return } w.WriteHeader(http.StatusNoContent) } // ListUsers handles GET /users func (h *UserHandler) ListUsers(w http.ResponseWriter, r *http.Request) { page, _ := strconv.Atoi(r.URL.Query().Get("page")) pageSize, _ := strconv.Atoi(r.URL.Query().Get("page_size")) users, err := h.userService.ListUsers(r.Context(), page, pageSize) if err != nil { respondError(w, http.StatusInternalServerError, "failed to list users") return } respondJSON(w, http.StatusOK, users) } // Helper functions func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func respondError(w http.ResponseWriter, status int, message string) { respondJSON(w, status, map[string]string{"error": message}) } func extractIDFromPath(r *http.Request) (int64, error) { // This is a simplified version - use your router's method // For example with chi: chi.URLParam(r, "id") return 1, nil } Main Application package main import ( "database/sql" "log" "net/http" _ "github.com/lib/pq" "myapp/internal/handlers" "myapp/internal/repositories" "myapp/internal/services" ) func main() { // Initialize database connection db, err := sql.Open("postgres", "postgres://user:pass@localhost/dbname?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize layers from bottom to top // Data Access Layer userRepo := repositories.NewUserRepository(db) productRepo := repositories.NewProductRepository(db) // Business Logic Layer userService := services.NewUserService(userRepo) orderService := services.NewOrderService(userRepo, productRepo) // Presentation Layer userHandler := handlers.NewUserHandler(userService) // Setup routes mux := http.NewServeMux() mux.HandleFunc("POST /users", userHandler.CreateUser) mux.HandleFunc("GET /users/{id}", userHandler.GetUser) mux.HandleFunc("PUT /users/{id}", userHandler.UpdateUser) mux.HandleFunc("DELETE /users/{id}", userHandler.DeleteUser) mux.HandleFunc("GET /users", userHandler.ListUsers) // Start server log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatal(err) } } Testing Layered Architecture package services_test import ( "context" "testing" "myapp/internal/models" "myapp/internal/services" ) // MockUserRepository for testing type MockUserRepository struct { users map[int64]*models.User nextID int64 } func NewMockUserRepository() *MockUserRepository { return &MockUserRepository{ users: make(map[int64]*models.User), nextID: 1, } } func (m *MockUserRepository) Create(ctx context.Context, user *models.User) error { user.ID = m.nextID m.nextID++ m.users[user.ID] = user return nil } func (m *MockUserRepository) GetByID(ctx context.Context, id int64) (*models.User, error) { user, exists := m.users[id] if !exists { return nil, fmt.Errorf("user not found") } return user, nil } func (m *MockUserRepository) GetByEmail(ctx context.Context, email string) (*models.User, error) { for _, user := range m.users { if user.Email == email { return user, nil } } return nil, fmt.Errorf("user not found") } func TestUserService_CreateUser(t *testing.T) { mockRepo := NewMockUserRepository() service := services.NewUserService(mockRepo) tests := []struct { name string email string userName string wantErr bool }{ { name: "valid user", email: "[email protected]", userName: "Test User", wantErr: false, }, { name: "invalid email", email: "invalid-email", userName: "Test User", wantErr: true, }, { name: "empty name", email: "[email protected]", userName: "", wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { user, err := service.CreateUser(context.Background(), tt.email, tt.userName) if tt.wantErr { if err == nil { t.Errorf("expected error but got none") } return } if err != nil { t.Errorf("unexpected error: %v", err) return } if user.Email != tt.email { t.Errorf("expected email %s, got %s", tt.email, user.Email) } }) } } Best Practices Clear Layer Boundaries: Define clear interfaces between layers Dependency Direction: Always depend downward, never upward Keep Layers Thin: Avoid bloated service layers with too much logic Use Dependency Injection: Inject dependencies rather than creating them Interface Segregation: Define minimal interfaces for layer communication Error Handling: Handle errors appropriately at each layer Testing: Test each layer independently with mocks Common Pitfalls Layer Violation: Skipping layers (e.g., handler directly accessing repository) Anemic Domain Models: Models with no behavior, only data Fat Service Layer: Putting all logic in the service layer Tight Coupling: Layers knowing too much about each other God Objects: Services or repositories handling too many responsibilities Inconsistent Abstractions: Different patterns in different layers When to Use Layered Architecture Use When: ...

    January 15, 2025 · 13 min · Rafiul Alam