Saga Pattern in Go: Managing Distributed Transactions

    Go Architecture Patterns Series: ← Previous: Event Sourcing | Series Overview What is the Saga Pattern? The Saga pattern is a design pattern for managing distributed transactions across multiple microservices. Instead of using traditional ACID transactions, a saga breaks a transaction into a sequence of local transactions, each with a compensating transaction to undo changes if needed. Key Principles: Local Transactions: Each service executes its own local transaction Compensating Transactions: Undo operations for rollback Eventual Consistency: System reaches consistent state over time Choreography or Orchestration: Two approaches to coordinate sagas Idempotency: Operations can be safely retried Error Handling: Graceful handling of failures Architecture Overview graph TB subgraph "Orchestration-Based Saga" Orchestrator[Saga Orchestrator] S1[Service 1] S2[Service 2] S3[Service 3] S4[Service 4] Orchestrator -->|1. Execute| S1 Orchestrator -->|2. Execute| S2 Orchestrator -->|3. Execute| S3 Orchestrator -->|4. Execute| S4 S1 -.->|Failed| Orchestrator S2 -.->|Success| Orchestrator S3 -.->|Failed| Orchestrator Orchestrator -.->|Compensate| S2 Orchestrator -.->|Compensate| S1 end subgraph "Choreography-Based Saga" EventBus[Event Bus] OrderSvc[Order Service] PaymentSvc[Payment Service] InventorySvc[Inventory Service] ShippingSvc[Shipping Service] OrderSvc -->|OrderCreated| EventBus EventBus -->|OrderCreated| PaymentSvc PaymentSvc -->|PaymentProcessed| EventBus EventBus -->|PaymentProcessed| InventorySvc InventorySvc -->|InventoryReserved| EventBus EventBus -->|InventoryReserved| ShippingSvc PaymentSvc -.->|PaymentFailed| EventBus EventBus -.->|PaymentFailed| OrderSvc end style Orchestrator fill:#fff4e1 style EventBus fill:#e1f5ff style S1 fill:#e8f5e9 style OrderSvc fill:#f3e5f5 Saga Execution Flow sequenceDiagram participant Client participant Orchestrator participant OrderSvc participant PaymentSvc participant InventorySvc participant ShippingSvc Note over Client,ShippingSvc: Happy Path Client->>Orchestrator: Create Order Orchestrator->>OrderSvc: Create Order OrderSvc-->>Orchestrator: Order Created Orchestrator->>PaymentSvc: Process Payment PaymentSvc-->>Orchestrator: Payment Success Orchestrator->>InventorySvc: Reserve Inventory InventorySvc-->>Orchestrator: Inventory Reserved Orchestrator->>ShippingSvc: Schedule Shipping ShippingSvc-->>Orchestrator: Shipping Scheduled Orchestrator-->>Client: Order Completed Note over Client,ShippingSvc: Failure with Compensation Client->>Orchestrator: Create Order Orchestrator->>OrderSvc: Create Order OrderSvc-->>Orchestrator: Order Created Orchestrator->>PaymentSvc: Process Payment PaymentSvc-->>Orchestrator: Payment Success Orchestrator->>InventorySvc: Reserve Inventory InventorySvc-->>Orchestrator: Insufficient Stock Note over Orchestrator: Trigger Compensation Orchestrator->>PaymentSvc: Refund Payment PaymentSvc-->>Orchestrator: Payment Refunded Orchestrator->>OrderSvc: Cancel Order OrderSvc-->>Orchestrator: Order Cancelled Orchestrator-->>Client: Order Failed (Compensated) Real-World Use Cases E-commerce Order Processing: Order, payment, inventory, shipping coordination Travel Booking Systems: Flight, hotel, car rental bookings Financial Transactions: Multi-step payment processing Supply Chain Management: Order fulfillment across warehouses Banking Systems: Money transfers across accounts Healthcare Systems: Patient admission, bed assignment, billing Saga Pattern Implementation Project Structure saga-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── saga/ │ │ ├── orchestrator.go │ │ ├── step.go │ │ └── state.go │ ├── services/ │ │ ├── order/ │ │ ├── payment/ │ │ ├── inventory/ │ │ └── shipping/ │ └── models/ │ └── order.go └── go.mod Saga Step Definition // internal/saga/step.go package saga import ( "context" "fmt" ) // StepStatus represents the status of a saga step type StepStatus string const ( StepStatusPending StepStatus = "pending" StepStatusInProgress StepStatus = "in_progress" StepStatusCompleted StepStatus = "completed" StepStatusFailed StepStatus = "failed" StepStatusCompensated StepStatus = "compensated" ) // Step represents a single step in a saga type Step struct { Name string Execute ExecuteFunc Compensate CompensateFunc Status StepStatus RetryCount int MaxRetries int CompensationData interface{} } // ExecuteFunc is the function that executes a step type ExecuteFunc func(ctx context.Context, data interface{}) (interface{}, error) // CompensateFunc is the function that compensates a step type CompensateFunc func(ctx context.Context, data interface{}) error // NewStep creates a new saga step func NewStep(name string, execute ExecuteFunc, compensate CompensateFunc) *Step { return &Step{ Name: name, Execute: execute, Compensate: compensate, Status: StepStatusPending, MaxRetries: 3, } } // Run executes the step func (s *Step) Run(ctx context.Context, data interface{}) (interface{}, error) { s.Status = StepStatusInProgress result, err := s.Execute(ctx, data) if err != nil { s.Status = StepStatusFailed return nil, fmt.Errorf("step %s failed: %w", s.Name, err) } s.Status = StepStatusCompleted s.CompensationData = result return result, nil } // Compensate executes the compensation func (s *Step) Compensate(ctx context.Context) error { if s.Status != StepStatusCompleted { return nil // Only compensate completed steps } if err := s.Compensate(ctx, s.CompensationData); err != nil { return fmt.Errorf("compensation for %s failed: %w", s.Name, err) } s.Status = StepStatusCompensated return nil } Saga Orchestrator // internal/saga/orchestrator.go package saga import ( "context" "fmt" "log" ) // SagaStatus represents the status of a saga type SagaStatus string const ( SagaStatusPending SagaStatus = "pending" SagaStatusInProgress SagaStatus = "in_progress" SagaStatusCompleted SagaStatus = "completed" SagaStatusFailed SagaStatus = "failed" SagaStatusCompensated SagaStatus = "compensated" ) // Saga represents a distributed transaction type Saga struct { ID string Steps []*Step Status SagaStatus Data interface{} } // Orchestrator manages saga execution type Orchestrator struct { sagas map[string]*Saga } // NewOrchestrator creates a new saga orchestrator func NewOrchestrator() *Orchestrator { return &Orchestrator{ sagas: make(map[string]*Saga), } } // NewSaga creates a new saga func (o *Orchestrator) NewSaga(id string, steps []*Step) *Saga { saga := &Saga{ ID: id, Steps: steps, Status: SagaStatusPending, } o.sagas[id] = saga return saga } // Execute runs the saga func (o *Orchestrator) Execute(ctx context.Context, saga *Saga, initialData interface{}) error { saga.Status = SagaStatusInProgress saga.Data = initialData completedSteps := make([]*Step, 0) var currentData interface{} = initialData // Execute steps sequentially for _, step := range saga.Steps { log.Printf("Executing step: %s", step.Name) result, err := step.Run(ctx, currentData) if err != nil { log.Printf("Step %s failed: %v", step.Name, err) saga.Status = SagaStatusFailed // Trigger compensation if compErr := o.compensate(ctx, completedSteps); compErr != nil { log.Printf("Compensation failed: %v", compErr) return fmt.Errorf("saga failed and compensation failed: %w", compErr) } saga.Status = SagaStatusCompensated return fmt.Errorf("saga failed at step %s: %w", step.Name, err) } completedSteps = append(completedSteps, step) currentData = result log.Printf("Step %s completed successfully", step.Name) } saga.Status = SagaStatusCompleted log.Printf("Saga %s completed successfully", saga.ID) return nil } // compensate executes compensation for completed steps in reverse order func (o *Orchestrator) compensate(ctx context.Context, steps []*Step) error { log.Println("Starting compensation...") // Compensate in reverse order for i := len(steps) - 1; i >= 0; i-- { step := steps[i] log.Printf("Compensating step: %s", step.Name) if err := step.Compensate(ctx); err != nil { return fmt.Errorf("compensation failed for step %s: %w", step.Name, err) } log.Printf("Step %s compensated successfully", step.Name) } log.Println("Compensation completed") return nil } // GetSaga retrieves a saga by ID func (o *Orchestrator) GetSaga(id string) (*Saga, error) { saga, exists := o.sagas[id] if !exists { return nil, fmt.Errorf("saga not found: %s", id) } return saga, nil } Service Implementations // internal/services/order/service.go package order import ( "context" "fmt" "time" ) type Order struct { ID string UserID string Items []OrderItem Total float64 Status string CreatedAt time.Time } type OrderItem struct { ProductID string Quantity int Price float64 } type Service struct { orders map[string]*Order } func NewService() *Service { return &Service{ orders: make(map[string]*Order), } } func (s *Service) CreateOrder(ctx context.Context, userID string, items []OrderItem) (*Order, error) { var total float64 for _, item := range items { total += item.Price * float64(item.Quantity) } order := &Order{ ID: generateID(), UserID: userID, Items: items, Total: total, Status: "pending", CreatedAt: time.Now(), } s.orders[order.ID] = order fmt.Printf("Order created: %s\n", order.ID) return order, nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { order, exists := s.orders[orderID] if !exists { return fmt.Errorf("order not found") } order.Status = "cancelled" fmt.Printf("Order cancelled: %s\n", orderID) return nil } func generateID() string { return fmt.Sprintf("ord_%d", time.Now().UnixNano()) } // internal/services/payment/service.go package payment import ( "context" "fmt" "time" ) type Payment struct { ID string OrderID string Amount float64 Status string TransactionID string CreatedAt time.Time } type Service struct { payments map[string]*Payment } func NewService() *Service { return &Service{ payments: make(map[string]*Payment), } } func (s *Service) ProcessPayment(ctx context.Context, orderID string, amount float64) (*Payment, error) { // Simulate payment processing if amount > 10000 { return nil, fmt.Errorf("amount exceeds limit") } payment := &Payment{ ID: generateID(), OrderID: orderID, Amount: amount, Status: "completed", TransactionID: fmt.Sprintf("txn_%d", time.Now().UnixNano()), CreatedAt: time.Now(), } s.payments[payment.ID] = payment fmt.Printf("Payment processed: %s for order %s\n", payment.ID, orderID) return payment, nil } func (s *Service) RefundPayment(ctx context.Context, paymentID string) error { payment, exists := s.payments[paymentID] if !exists { return fmt.Errorf("payment not found") } payment.Status = "refunded" fmt.Printf("Payment refunded: %s\n", paymentID) return nil } func generateID() string { return fmt.Sprintf("pay_%d", time.Now().UnixNano()) } // internal/services/inventory/service.go package inventory import ( "context" "fmt" ) type Reservation struct { ID string ProductID string Quantity int OrderID string } type Service struct { stock map[string]int reservations map[string]*Reservation } func NewService() *Service { return &Service{ stock: map[string]int{ "product_1": 100, "product_2": 50, "product_3": 200, }, reservations: make(map[string]*Reservation), } } func (s *Service) ReserveInventory(ctx context.Context, orderID, productID string, quantity int) (*Reservation, error) { currentStock, exists := s.stock[productID] if !exists { return nil, fmt.Errorf("product not found") } if currentStock < quantity { return nil, fmt.Errorf("insufficient stock: have %d, need %d", currentStock, quantity) } s.stock[productID] = currentStock - quantity reservation := &Reservation{ ID: generateID(), ProductID: productID, Quantity: quantity, OrderID: orderID, } s.reservations[reservation.ID] = reservation fmt.Printf("Inventory reserved: %d units of %s for order %s\n", quantity, productID, orderID) return reservation, nil } func (s *Service) ReleaseReservation(ctx context.Context, reservationID string) error { reservation, exists := s.reservations[reservationID] if !exists { return fmt.Errorf("reservation not found") } s.stock[reservation.ProductID] += reservation.Quantity delete(s.reservations, reservationID) fmt.Printf("Reservation released: %s\n", reservationID) return nil } func generateID() string { return fmt.Sprintf("res_%d", time.Now().UnixNano()) } // internal/services/shipping/service.go package shipping import ( "context" "fmt" "time" ) type Shipment struct { ID string OrderID string TrackingNumber string Status string ScheduledDate time.Time } type Service struct { shipments map[string]*Shipment } func NewService() *Service { return &Service{ shipments: make(map[string]*Shipment), } } func (s *Service) ScheduleShipment(ctx context.Context, orderID string) (*Shipment, error) { shipment := &Shipment{ ID: generateID(), OrderID: orderID, TrackingNumber: fmt.Sprintf("TRK%d", time.Now().UnixNano()), Status: "scheduled", ScheduledDate: time.Now().Add(24 * time.Hour), } s.shipments[shipment.ID] = shipment fmt.Printf("Shipment scheduled: %s for order %s\n", shipment.ID, orderID) return shipment, nil } func (s *Service) CancelShipment(ctx context.Context, shipmentID string) error { shipment, exists := s.shipments[shipmentID] if !exists { return fmt.Errorf("shipment not found") } shipment.Status = "cancelled" fmt.Printf("Shipment cancelled: %s\n", shipmentID) return nil } func generateID() string { return fmt.Sprintf("shp_%d", time.Now().UnixNano()) } Saga Workflow Implementation // internal/saga/order_saga.go package saga import ( "context" "fmt" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) type OrderSagaData struct { UserID string Items []order.OrderItem OrderID string PaymentID string ReservationID string ShipmentID string } // CreateOrderSaga creates a saga for order processing func CreateOrderSaga( orderSvc *order.Service, paymentSvc *payment.Service, inventorySvc *inventory.Service, shippingSvc *shipping.Service, ) []*Step { // Step 1: Create Order createOrderStep := NewStep( "CreateOrder", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) order, err := orderSvc.CreateOrder(ctx, sagaData.UserID, sagaData.Items) if err != nil { return nil, err } sagaData.OrderID = order.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return orderSvc.CancelOrder(ctx, sagaData.OrderID) }, ) // Step 2: Process Payment processPaymentStep := NewStep( "ProcessPayment", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) var total float64 for _, item := range sagaData.Items { total += item.Price * float64(item.Quantity) } payment, err := paymentSvc.ProcessPayment(ctx, sagaData.OrderID, total) if err != nil { return nil, err } sagaData.PaymentID = payment.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return paymentSvc.RefundPayment(ctx, sagaData.PaymentID) }, ) // Step 3: Reserve Inventory reserveInventoryStep := NewStep( "ReserveInventory", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) // Reserve first item (simplified) if len(sagaData.Items) == 0 { return nil, fmt.Errorf("no items to reserve") } item := sagaData.Items[0] reservation, err := inventorySvc.ReserveInventory( ctx, sagaData.OrderID, item.ProductID, item.Quantity, ) if err != nil { return nil, err } sagaData.ReservationID = reservation.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return inventorySvc.ReleaseReservation(ctx, sagaData.ReservationID) }, ) // Step 4: Schedule Shipment scheduleShipmentStep := NewStep( "ScheduleShipment", func(ctx context.Context, data interface{}) (interface{}, error) { sagaData := data.(*OrderSagaData) shipment, err := shippingSvc.ScheduleShipment(ctx, sagaData.OrderID) if err != nil { return nil, err } sagaData.ShipmentID = shipment.ID return sagaData, nil }, func(ctx context.Context, data interface{}) error { sagaData := data.(*OrderSagaData) return shippingSvc.CancelShipment(ctx, sagaData.ShipmentID) }, ) return []*Step{ createOrderStep, processPaymentStep, reserveInventoryStep, scheduleShipmentStep, } } Main Application // cmd/api/main.go package main import ( "context" "encoding/json" "log" "net/http" "github.com/gorilla/mux" "app/internal/saga" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) func main() { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Initialize saga orchestrator orchestrator := saga.NewOrchestrator() router := mux.NewRouter() // Create order endpoint (triggers saga) router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Items []order.OrderItem `json:"items"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Create saga steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga(generateSagaID(), steps) // Execute saga sagaData := &saga.OrderSagaData{ UserID: req.UserID, Items: req.Items, } if err := orchestrator.Execute(context.Background(), orderSaga, sagaData); err != nil { http.Error(w, fmt.Sprintf("Saga failed: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(map[string]string{ "saga_id": orderSaga.ID, "order_id": sagaData.OrderID, "status": string(orderSaga.Status), }) }).Methods("POST") // Get saga status endpoint router.HandleFunc("/sagas/{id}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) sagaID := vars["id"] saga, err := orchestrator.GetSaga(sagaID) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } response := map[string]interface{}{ "saga_id": saga.ID, "status": saga.Status, "steps": len(saga.Steps), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) }).Methods("GET") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) log.Println("Saga orchestrator server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } func generateSagaID() string { return fmt.Sprintf("saga_%d", time.Now().UnixNano()) } Testing Saga // internal/saga/order_saga_test.go package saga_test import ( "context" "testing" "app/internal/saga" "app/internal/services/inventory" "app/internal/services/order" "app/internal/services/payment" "app/internal/services/shipping" ) func TestOrderSaga_HappyPath(t *testing.T) { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Create saga orchestrator := saga.NewOrchestrator() steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga("test-saga", steps) // Execute saga sagaData := &saga.OrderSagaData{ UserID: "user_123", Items: []order.OrderItem{ {ProductID: "product_1", Quantity: 2, Price: 100.0}, }, } err := orchestrator.Execute(context.Background(), orderSaga, sagaData) if err != nil { t.Fatalf("Saga execution failed: %v", err) } if orderSaga.Status != saga.SagaStatusCompleted { t.Errorf("Expected status %s, got %s", saga.SagaStatusCompleted, orderSaga.Status) } } func TestOrderSaga_PaymentFailure(t *testing.T) { // Initialize services orderSvc := order.NewService() paymentSvc := payment.NewService() inventorySvc := inventory.NewService() shippingSvc := shipping.NewService() // Create saga orchestrator := saga.NewOrchestrator() steps := saga.CreateOrderSaga(orderSvc, paymentSvc, inventorySvc, shippingSvc) orderSaga := orchestrator.NewSaga("test-saga-fail", steps) // Execute saga with amount that will fail sagaData := &saga.OrderSagaData{ UserID: "user_123", Items: []order.OrderItem{ {ProductID: "product_1", Quantity: 2, Price: 10000.0}, // Exceeds limit }, } err := orchestrator.Execute(context.Background(), orderSaga, sagaData) if err == nil { t.Fatal("Expected saga to fail, but it succeeded") } if orderSaga.Status != saga.SagaStatusCompensated { t.Errorf("Expected status %s, got %s", saga.SagaStatusCompensated, orderSaga.Status) } } Best Practices Idempotency: Ensure all operations can be safely retried Compensating Transactions: Design proper rollback logic Timeout Handling: Set timeouts for each saga step State Persistence: Store saga state for recovery Monitoring: Track saga execution and failures Error Handling: Handle partial failures gracefully Testing: Test both success and failure scenarios Documentation: Document saga workflows clearly Common Pitfalls Missing Compensations: Not implementing proper rollback Non-Idempotent Operations: Operations that can’t be retried safely Circular Dependencies: Services depending on each other in compensation Long-Running Sagas: Sagas that hold resources for too long No State Persistence: Losing saga state on failure Ignoring Partial Failures: Not handling scenarios where compensation fails Over-Compensation: Compensating steps that were never executed When to Use Saga Pattern Use When: ...

    January 24, 2025 · 13 min · Rafiul Alam