Go Architecture Patterns Series: ← Previous: Event-Driven Architecture | Series Overview | Next: Event Sourcing →
What is CQRS Pattern?
Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands). This separation allows you to optimize each side independently for better performance, scalability, and maintainability.
Key Principles:
- Command Model: Handles writes, updates, and business logic validation
- Query Model: Handles reads, optimized for data retrieval
- Separate Data Stores: Optional use of different databases for reads and writes
- Eventual Consistency: Query model may lag behind command model
- Single Responsibility: Each model focused on its specific task
- Independent Scaling: Scale read and write operations separately
Architecture Overview
CQRS Flow Pattern
Real-World Use Cases
- E-commerce Platforms: Separate product catalog writes from fast product searches
- Banking Systems: Transaction processing vs. account balance queries
- Analytics Platforms: Data ingestion vs. complex reporting queries
- Social Media: Post creation vs. feed generation
- Inventory Management: Stock updates vs. availability queries
- Audit Systems: Event recording vs. audit log queries
CQRS Pattern Implementation
Project Structure
cqrs-app/
├── cmd/
│ └── api/
│ └── main.go
├── internal/
│ ├── commands/
│ │ ├── create_order.go
│ │ ├── update_order.go
│ │ └── handler.go
│ ├── queries/
│ │ ├── get_order.go
│ │ ├── list_orders.go
│ │ └── handler.go
│ ├── domain/
│ │ └── order.go
│ ├── write/
│ │ ├── repository.go
│ │ └── postgres_repository.go
│ ├── read/
│ │ ├── repository.go
│ │ └── postgres_repository.go
│ ├── projections/
│ │ └── order_projection.go
│ └── events/
│ └── events.go
└── go.mod
Domain Model
// internal/domain/order.go
package domain
import (
"errors"
"time"
)
type OrderID string
type OrderStatus string
const (
OrderStatusPending OrderStatus = "pending"
OrderStatusConfirmed OrderStatus = "confirmed"
OrderStatusShipped OrderStatus = "shipped"
OrderStatusDelivered OrderStatus = "delivered"
OrderStatusCancelled OrderStatus = "cancelled"
)
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
// Write Model - Rich domain model with business logic
type Order struct {
ID OrderID
UserID string
Items []OrderItem
Total float64
Status OrderStatus
CreatedAt time.Time
UpdatedAt time.Time
Version int // For optimistic locking
}
var (
ErrInvalidOrder = errors.New("invalid order")
ErrOrderNotFound = errors.New("order not found")
ErrInvalidStatus = errors.New("invalid status transition")
ErrConcurrentUpdate = errors.New("concurrent update detected")
)
// NewOrder creates a new order with validation
func NewOrder(userID string, items []OrderItem) (*Order, error) {
if userID == "" {
return nil, ErrInvalidOrder
}
if len(items) == 0 {
return nil, ErrInvalidOrder
}
var total float64
for _, item := range items {
if item.Quantity <= 0 || item.Price < 0 {
return nil, ErrInvalidOrder
}
total += item.Price * float64(item.Quantity)
}
return &Order{
ID: OrderID(generateID()),
UserID: userID,
Items: items,
Total: total,
Status: OrderStatusPending,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Version: 1,
}, nil
}
// Confirm transitions order to confirmed status
func (o *Order) Confirm() error {
if o.Status != OrderStatusPending {
return ErrInvalidStatus
}
o.Status = OrderStatusConfirmed
o.UpdatedAt = time.Now()
o.Version++
return nil
}
// Ship transitions order to shipped status
func (o *Order) Ship() error {
if o.Status != OrderStatusConfirmed {
return ErrInvalidStatus
}
o.Status = OrderStatusShipped
o.UpdatedAt = time.Now()
o.Version++
return nil
}
// Cancel cancels the order
func (o *Order) Cancel() error {
if o.Status == OrderStatusDelivered || o.Status == OrderStatusCancelled {
return ErrInvalidStatus
}
o.Status = OrderStatusCancelled
o.UpdatedAt = time.Now()
o.Version++
return nil
}
func generateID() string {
return fmt.Sprintf("order_%d", time.Now().UnixNano())
}
Commands
// internal/commands/create_order.go
package commands
import (
"context"
"fmt"
"app/internal/domain"
"app/internal/events"
)
// CreateOrderCommand represents a command to create an order
type CreateOrderCommand struct {
UserID string `json:"user_id"`
Items []domain.OrderItem `json:"items"`
}
// CreateOrderHandler handles order creation
type CreateOrderHandler struct {
writeRepo domain.WriteRepository
eventBus events.EventBus
}
func NewCreateOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CreateOrderHandler {
return &CreateOrderHandler{
writeRepo: repo,
eventBus: bus,
}
}
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd *CreateOrderCommand) (*domain.Order, error) {
// Create domain model with business logic validation
order, err := domain.NewOrder(cmd.UserID, cmd.Items)
if err != nil {
return nil, fmt.Errorf("invalid command: %w", err)
}
// Persist to write store
if err := h.writeRepo.Save(ctx, order); err != nil {
return nil, fmt.Errorf("failed to save order: %w", err)
}
// Publish event for read model synchronization
event := events.OrderCreatedEvent{
OrderID: string(order.ID),
UserID: order.UserID,
Items: order.Items,
Total: order.Total,
Status: string(order.Status),
CreatedAt: order.CreatedAt,
}
if err := h.eventBus.Publish(ctx, "order.created", event); err != nil {
// Log error but don't fail the command - eventual consistency
fmt.Printf("Failed to publish event: %v\n", err)
}
return order, nil
}
// internal/commands/update_order.go
package commands
import (
"context"
"fmt"
"app/internal/domain"
"app/internal/events"
)
// ConfirmOrderCommand confirms an order
type ConfirmOrderCommand struct {
OrderID string `json:"order_id"`
}
type ConfirmOrderHandler struct {
writeRepo domain.WriteRepository
eventBus events.EventBus
}
func NewConfirmOrderHandler(repo domain.WriteRepository, bus events.EventBus) *ConfirmOrderHandler {
return &ConfirmOrderHandler{
writeRepo: repo,
eventBus: bus,
}
}
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd *ConfirmOrderCommand) error {
// Load from write store
order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID))
if err != nil {
return fmt.Errorf("order not found: %w", err)
}
// Execute domain logic
if err := order.Confirm(); err != nil {
return fmt.Errorf("cannot confirm order: %w", err)
}
// Save with optimistic locking
if err := h.writeRepo.Save(ctx, order); err != nil {
return fmt.Errorf("failed to save order: %w", err)
}
// Publish event
event := events.OrderConfirmedEvent{
OrderID: string(order.ID),
Status: string(order.Status),
ConfirmedAt: order.UpdatedAt,
}
if err := h.eventBus.Publish(ctx, "order.confirmed", event); err != nil {
fmt.Printf("Failed to publish event: %v\n", err)
}
return nil
}
// CancelOrderCommand cancels an order
type CancelOrderCommand struct {
OrderID string `json:"order_id"`
Reason string `json:"reason"`
}
type CancelOrderHandler struct {
writeRepo domain.WriteRepository
eventBus events.EventBus
}
func NewCancelOrderHandler(repo domain.WriteRepository, bus events.EventBus) *CancelOrderHandler {
return &CancelOrderHandler{
writeRepo: repo,
eventBus: bus,
}
}
func (h *CancelOrderHandler) Handle(ctx context.Context, cmd *CancelOrderCommand) error {
order, err := h.writeRepo.GetByID(ctx, domain.OrderID(cmd.OrderID))
if err != nil {
return fmt.Errorf("order not found: %w", err)
}
if err := order.Cancel(); err != nil {
return fmt.Errorf("cannot cancel order: %w", err)
}
if err := h.writeRepo.Save(ctx, order); err != nil {
return fmt.Errorf("failed to save order: %w", err)
}
event := events.OrderCancelledEvent{
OrderID: string(order.ID),
Status: string(order.Status),
Reason: cmd.Reason,
CancelledAt: order.UpdatedAt,
}
if err := h.eventBus.Publish(ctx, "order.cancelled", event); err != nil {
fmt.Printf("Failed to publish event: %v\n", err)
}
return nil
}
Queries
// internal/queries/get_order.go
package queries
import (
"context"
"time"
)
// OrderReadModel - Optimized for reading, denormalized
type OrderReadModel struct {
ID string `json:"id"`
UserID string `json:"user_id"`
UserName string `json:"user_name"` // Denormalized
UserEmail string `json:"user_email"` // Denormalized
Items []OrderItemReadModel `json:"items"`
ItemCount int `json:"item_count"` // Computed
Total float64 `json:"total"`
Status string `json:"status"`
StatusDisplay string `json:"status_display"` // Formatted for display
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type OrderItemReadModel struct {
ProductID string `json:"product_id"`
ProductName string `json:"product_name"` // Denormalized
Quantity int `json:"quantity"`
Price float64 `json:"price"`
Subtotal float64 `json:"subtotal"` // Computed
}
// GetOrderQuery retrieves a single order
type GetOrderQuery struct {
OrderID string `json:"order_id"`
}
type GetOrderHandler struct {
readRepo ReadRepository
}
func NewGetOrderHandler(repo ReadRepository) *GetOrderHandler {
return &GetOrderHandler{readRepo: repo}
}
func (h *GetOrderHandler) Handle(ctx context.Context, query *GetOrderQuery) (*OrderReadModel, error) {
return h.readRepo.GetByID(ctx, query.OrderID)
}
// internal/queries/list_orders.go
package queries
import (
"context"
)
// ListOrdersQuery lists orders with filters
type ListOrdersQuery struct {
UserID string `json:"user_id"`
Status string `json:"status"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
type OrderListResult struct {
Orders []*OrderReadModel `json:"orders"`
Total int `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
TotalPages int `json:"total_pages"`
}
type ListOrdersHandler struct {
readRepo ReadRepository
}
func NewListOrdersHandler(repo ReadRepository) *ListOrdersHandler {
return &ListOrdersHandler{readRepo: repo}
}
func (h *ListOrdersHandler) Handle(ctx context.Context, query *ListOrdersQuery) (*OrderListResult, error) {
// Set defaults
if query.Page < 1 {
query.Page = 1
}
if query.PageSize < 1 || query.PageSize > 100 {
query.PageSize = 20
}
offset := (query.Page - 1) * query.PageSize
orders, total, err := h.readRepo.List(ctx, query.UserID, query.Status, query.PageSize, offset)
if err != nil {
return nil, err
}
totalPages := (total + query.PageSize - 1) / query.PageSize
return &OrderListResult{
Orders: orders,
Total: total,
Page: query.Page,
PageSize: query.PageSize,
TotalPages: totalPages,
}, nil
}
// ReadRepository interface for query side
type ReadRepository interface {
GetByID(ctx context.Context, id string) (*OrderReadModel, error)
List(ctx context.Context, userID, status string, limit, offset int) ([]*OrderReadModel, int, error)
}
Projections
// internal/projections/order_projection.go
package projections
import (
"context"
"database/sql"
"fmt"
"app/internal/events"
"app/internal/queries"
)
// OrderProjection builds read models from events
type OrderProjection struct {
db *sql.DB
}
func NewOrderProjection(db *sql.DB) *OrderProjection {
return &OrderProjection{db: db}
}
// HandleOrderCreated projects OrderCreated event to read model
func (p *OrderProjection) HandleOrderCreated(ctx context.Context, event events.OrderCreatedEvent) error {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Insert denormalized order
query := `
INSERT INTO order_read_model
(id, user_id, user_name, user_email, total, status, status_display, item_count, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
// Fetch user details for denormalization
userName, userEmail := p.getUserDetails(ctx, event.UserID)
_, err = tx.ExecContext(ctx, query,
event.OrderID,
event.UserID,
userName,
userEmail,
event.Total,
event.Status,
formatStatus(event.Status),
len(event.Items),
event.CreatedAt,
event.CreatedAt,
)
if err != nil {
return fmt.Errorf("failed to insert order: %w", err)
}
// Insert order items with denormalized product info
itemQuery := `
INSERT INTO order_item_read_model
(order_id, product_id, product_name, quantity, price, subtotal)
VALUES ($1, $2, $3, $4, $5, $6)
`
for _, item := range event.Items {
productName := p.getProductName(ctx, item.ProductID)
subtotal := item.Price * float64(item.Quantity)
_, err = tx.ExecContext(ctx, itemQuery,
event.OrderID,
item.ProductID,
productName,
item.Quantity,
item.Price,
subtotal,
)
if err != nil {
return fmt.Errorf("failed to insert order item: %w", err)
}
}
return tx.Commit()
}
// HandleOrderConfirmed updates read model when order is confirmed
func (p *OrderProjection) HandleOrderConfirmed(ctx context.Context, event events.OrderConfirmedEvent) error {
query := `
UPDATE order_read_model
SET status = $2, status_display = $3, updated_at = $4
WHERE id = $1
`
_, err := p.db.ExecContext(ctx, query,
event.OrderID,
event.Status,
formatStatus(event.Status),
event.ConfirmedAt,
)
return err
}
// HandleOrderCancelled updates read model when order is cancelled
func (p *OrderProjection) HandleOrderCancelled(ctx context.Context, event events.OrderCancelledEvent) error {
query := `
UPDATE order_read_model
SET status = $2, status_display = $3, updated_at = $4
WHERE id = $1
`
_, err := p.db.ExecContext(ctx, query,
event.OrderID,
event.Status,
formatStatus(event.Status)+" ("+event.Reason+")",
event.CancelledAt,
)
return err
}
func (p *OrderProjection) getUserDetails(ctx context.Context, userID string) (name, email string) {
// Query user service or cache
query := `SELECT name, email FROM users WHERE id = $1`
p.db.QueryRowContext(ctx, query, userID).Scan(&name, &email)
return
}
func (p *OrderProjection) getProductName(ctx context.Context, productID string) string {
// Query product service or cache
var name string
query := `SELECT name FROM products WHERE id = $1`
p.db.QueryRowContext(ctx, query, productID).Scan(&name)
return name
}
func formatStatus(status string) string {
// Format status for display
switch status {
case "pending":
return "Pending"
case "confirmed":
return "Confirmed"
case "shipped":
return "Shipped"
case "delivered":
return "Delivered"
case "cancelled":
return "Cancelled"
default:
return status
}
}
// RegisterHandlers registers projection handlers with event bus
func (p *OrderProjection) RegisterHandlers(bus events.EventBus) {
bus.Subscribe("order.created", func(ctx context.Context, data interface{}) error {
event := data.(events.OrderCreatedEvent)
return p.HandleOrderCreated(ctx, event)
})
bus.Subscribe("order.confirmed", func(ctx context.Context, data interface{}) error {
event := data.(events.OrderConfirmedEvent)
return p.HandleOrderConfirmed(ctx, event)
})
bus.Subscribe("order.cancelled", func(ctx context.Context, data interface{}) error {
event := data.(events.OrderCancelledEvent)
return p.HandleOrderCancelled(ctx, event)
})
}
Main Application
// cmd/api/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"github.com/gorilla/mux"
_ "github.com/lib/pq"
"app/internal/commands"
"app/internal/events"
"app/internal/projections"
"app/internal/queries"
"app/internal/read"
"app/internal/write"
)
func main() {
// Connect to write database
writeDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_write?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer writeDB.Close()
// Connect to read database (could be same or different DB)
readDB, err := sql.Open("postgres", "postgres://user:pass@localhost/orders_read?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer readDB.Close()
// Initialize event bus
eventBus := events.NewInMemoryEventBus()
// Initialize repositories
writeRepo := write.NewPostgresRepository(writeDB)
readRepo := read.NewPostgresRepository(readDB)
// Initialize projections
orderProjection := projections.NewOrderProjection(readDB)
orderProjection.RegisterHandlers(eventBus)
// Initialize command handlers
createOrderHandler := commands.NewCreateOrderHandler(writeRepo, eventBus)
confirmOrderHandler := commands.NewConfirmOrderHandler(writeRepo, eventBus)
cancelOrderHandler := commands.NewCancelOrderHandler(writeRepo, eventBus)
// Initialize query handlers
getOrderHandler := queries.NewGetOrderHandler(readRepo)
listOrdersHandler := queries.NewListOrdersHandler(readRepo)
// Setup HTTP routes
router := mux.NewRouter()
// Command endpoints
router.HandleFunc("/commands/orders", func(w http.ResponseWriter, r *http.Request) {
var cmd commands.CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
order, err := createOrderHandler.Handle(r.Context(), &cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"order_id": string(order.ID)})
}).Methods("POST")
router.HandleFunc("/commands/orders/{id}/confirm", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cmd := commands.ConfirmOrderCommand{OrderID: vars["id"]}
if err := confirmOrderHandler.Handle(r.Context(), &cmd); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusAccepted)
}).Methods("POST")
// Query endpoints
router.HandleFunc("/queries/orders/{id}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
query := queries.GetOrderQuery{OrderID: vars["id"]}
order, err := getOrderHandler.Handle(r.Context(), &query)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(order)
}).Methods("GET")
router.HandleFunc("/queries/orders", func(w http.ResponseWriter, r *http.Request) {
query := queries.ListOrdersQuery{
UserID: r.URL.Query().Get("user_id"),
Status: r.URL.Query().Get("status"),
}
result, err := listOrdersHandler.Handle(r.Context(), &query)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}).Methods("GET")
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", router))
}
Best Practices
- Clear Command/Query Separation: Never mix command and query logic
- Eventual Consistency: Accept and handle eventual consistency gracefully
- Idempotent Projections: Ensure projections can be replayed safely
- Versioning: Version commands and events for evolution
- Validation: Validate commands before processing
- Optimistic Locking: Use versioning to handle concurrent updates
- Monitoring: Track projection lag and command processing times
- Testing: Test commands and queries independently
Common Pitfalls
- Over-Engineering: Using CQRS for simple CRUD applications
- Synchronous Projections: Making projections synchronous defeats the purpose
- Shared Models: Reusing write models in read side
- Complex Queries in Write Model: Adding query logic to command side
- No Monitoring: Not tracking eventual consistency lag
- Poor Error Handling: Not handling projection failures
- Inconsistent State: Not handling projection rebuild scenarios
When to Use CQRS Pattern
Use When:
- Read and write workloads have different scaling requirements
- Need to optimize read performance with denormalized views
- Complex business logic on write side, simple queries on read side
- Different teams work on read and write sides
- Need multiple read models for different use cases
- System benefits from eventual consistency
Avoid When:
- Simple CRUD applications with balanced read/write
- Team lacks experience with eventual consistency
- Strong consistency is required for all operations
- Additional complexity is not justified
- Single database can handle both reads and writes efficiently
Advantages
- Independent Scaling: Scale reads and writes independently
- Optimized Performance: Optimize each side for its specific needs
- Simplified Queries: Denormalized read models simplify queries
- Better Security: Separate models reduce attack surface
- Flexible Read Models: Multiple read models for different needs
- Clear Separation: Commands and queries have distinct responsibilities
- Evolution: Easier to evolve read and write models independently
Disadvantages
- Increased Complexity: More moving parts to manage
- Eventual Consistency: Read model may lag behind write model
- Data Duplication: Same data stored in multiple formats
- Synchronization Overhead: Need to keep read models in sync
- Testing Complexity: More components to test
- Operational Overhead: Multiple databases to maintain
- Learning Curve: Team needs to understand CQRS concepts
CQRS Pattern provides powerful benefits for systems with different read and write characteristics, enabling independent optimization and scaling of each side while maintaining clear separation of concerns.
Go Architecture Patterns Series: ← Previous: Event-Driven Architecture | Series Overview | Next: Event Sourcing →