Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern →


What is Event Sourcing?

Event Sourcing is an architectural pattern where state changes are stored as a sequence of immutable events rather than updating records in place. The current state is derived by replaying all events from the beginning.

Key Principles:

  • Event Store: All state changes stored as events
  • Immutable Events: Events are never modified or deleted
  • Event Replay: Current state reconstructed by replaying events
  • Complete Audit Trail: Every change is recorded with full context
  • Temporal Queries: Query state at any point in time
  • Event Versioning: Events can evolve while maintaining history

Architecture Overview

graph TB subgraph "Command Side" Command[Command] Aggregate[Aggregate Root] EventStore[(Event Store)] end subgraph "Event Processing" EventBus[Event Bus] Projector[Event Projectors] end subgraph "Query Side" ReadModel1[(Current State View)] ReadModel2[(Analytics View)] ReadModel3[(Audit Log View)] end subgraph "Time Travel" Snapshot[(Snapshots)] Replay[Event Replay] end Command --> Aggregate Aggregate -->|Load Events| EventStore Aggregate -->|Append Events| EventStore EventStore -->|Publish| EventBus EventBus --> Projector Projector --> ReadModel1 Projector --> ReadModel2 Projector --> ReadModel3 EventStore --> Snapshot Snapshot --> Replay Replay --> Aggregate style Aggregate fill:#fff4e1 style EventStore fill:#e1f5ff style Projector fill:#e8f5e9 style ReadModel1 fill:#f3e5f5

Event Sourcing Flow

sequenceDiagram participant Client participant CommandHandler participant Aggregate participant EventStore participant EventBus participant Projection Client->>CommandHandler: Execute Command CommandHandler->>EventStore: Load Events for Aggregate EventStore-->>CommandHandler: Event Stream CommandHandler->>Aggregate: Replay Events Aggregate->>Aggregate: Rebuild State CommandHandler->>Aggregate: Execute Command Aggregate->>Aggregate: Validate Business Rules Aggregate->>Aggregate: Generate Events CommandHandler->>EventStore: Append New Events EventStore-->>CommandHandler: Events Persisted EventStore->>EventBus: Publish Events EventBus->>Projection: Project Events Projection->>Projection: Update Read Model CommandHandler-->>Client: Command Accepted

Real-World Use Cases

  • Banking Systems: Complete transaction history and audit trails
  • E-commerce: Order lifecycle tracking and returns processing
  • Healthcare: Patient medical history and treatment records
  • Trading Systems: Trade execution history and reconciliation
  • Gaming: Player action history and replay functionality
  • Workflow Systems: Process execution history and debugging

Event Sourcing Implementation

Project Structure

event-sourcing-app/
├── cmd/
│   └── api/
│       └── main.go
├── internal/
│   ├── domain/
│   │   ├── account.go
│   │   └── events.go
│   ├── eventstore/
│   │   ├── store.go
│   │   ├── postgres_store.go
│   │   └── snapshot.go
│   ├── commands/
│   │   └── handlers.go
│   ├── projections/
│   │   └── account_projection.go
│   └── queries/
│       └── handlers.go
└── go.mod

Domain Events

// internal/domain/events.go
package domain

import (
    "encoding/json"
    "time"
)

// Event represents a domain event
type Event interface {
    EventType() string
    AggregateID() string
    EventVersion() int
    Timestamp() time.Time
}

// BaseEvent provides common event fields
type BaseEvent struct {
    Type        string    `json:"type"`
    AggrID      string    `json:"aggregate_id"`
    Version     int       `json:"version"`
    OccurredAt  time.Time `json:"occurred_at"`
    Metadata    map[string]string `json:"metadata"`
}

func (e BaseEvent) EventType() string     { return e.Type }
func (e BaseEvent) AggregateID() string   { return e.AggrID }
func (e BaseEvent) EventVersion() int     { return e.Version }
func (e BaseEvent) Timestamp() time.Time  { return e.OccurredAt }

// Account Events
type AccountCreatedEvent struct {
    BaseEvent
    AccountID   string  `json:"account_id"`
    Owner       string  `json:"owner"`
    Currency    string  `json:"currency"`
}

type MoneyDepositedEvent struct {
    BaseEvent
    AccountID string  `json:"account_id"`
    Amount    float64 `json:"amount"`
    Balance   float64 `json:"balance"`
}

type MoneyWithdrawnEvent struct {
    BaseEvent
    AccountID string  `json:"account_id"`
    Amount    float64 `json:"amount"`
    Balance   float64 `json:"balance"`
}

type AccountClosedEvent struct {
    BaseEvent
    AccountID string `json:"account_id"`
    Reason    string `json:"reason"`
}

// EventEnvelope wraps events for storage
type EventEnvelope struct {
    EventID     string          `json:"event_id"`
    EventType   string          `json:"event_type"`
    AggregateID string          `json:"aggregate_id"`
    Version     int             `json:"version"`
    Data        json.RawMessage `json:"data"`
    Metadata    map[string]string `json:"metadata"`
    CreatedAt   time.Time       `json:"created_at"`
}

// Serialize serializes event to envelope
func SerializeEvent(event Event) (*EventEnvelope, error) {
    data, err := json.Marshal(event)
    if err != nil {
        return nil, err
    }

    return &EventEnvelope{
        EventID:     generateEventID(),
        EventType:   event.EventType(),
        AggregateID: event.AggregateID(),
        Version:     event.EventVersion(),
        Data:        data,
        CreatedAt:   event.Timestamp(),
    }, nil
}

// DeserializeEvent deserializes envelope to event
func DeserializeEvent(envelope *EventEnvelope) (Event, error) {
    var event Event

    switch envelope.EventType {
    case "AccountCreated":
        event = &AccountCreatedEvent{}
    case "MoneyDeposited":
        event = &MoneyDepositedEvent{}
    case "MoneyWithdrawn":
        event = &MoneyWithdrawnEvent{}
    case "AccountClosed":
        event = &AccountClosedEvent{}
    default:
        return nil, fmt.Errorf("unknown event type: %s", envelope.EventType)
    }

    if err := json.Unmarshal(envelope.Data, event); err != nil {
        return nil, err
    }

    return event, nil
}

func generateEventID() string {
    return fmt.Sprintf("evt_%d", time.Now().UnixNano())
}

Aggregate Root

// internal/domain/account.go
package domain

import (
    "errors"
    "fmt"
)

var (
    ErrInsufficientFunds = errors.New("insufficient funds")
    ErrAccountClosed     = errors.New("account is closed")
    ErrInvalidAmount     = errors.New("invalid amount")
)

// Account is an aggregate root that uses event sourcing
type Account struct {
    id            string
    owner         string
    currency      string
    balance       float64
    isClosed      bool
    version       int
    uncommittedEvents []Event
}

// NewAccount creates a new account
func NewAccount(id, owner, currency string) (*Account, error) {
    if id == "" || owner == "" || currency == "" {
        return nil, errors.New("invalid account parameters")
    }

    account := &Account{
        uncommittedEvents: make([]Event, 0),
    }

    // Raise domain event
    event := AccountCreatedEvent{
        BaseEvent: BaseEvent{
            Type:       "AccountCreated",
            AggrID:     id,
            Version:    1,
            OccurredAt: time.Now(),
        },
        AccountID: id,
        Owner:     owner,
        Currency:  currency,
    }

    account.raiseEvent(&event)
    return account, nil
}

// LoadFromHistory rebuilds account state from events
func LoadFromHistory(events []Event) (*Account, error) {
    if len(events) == 0 {
        return nil, errors.New("no events to load")
    }

    account := &Account{
        uncommittedEvents: make([]Event, 0),
    }

    for _, event := range events {
        if err := account.apply(event); err != nil {
            return nil, err
        }
        account.version = event.EventVersion()
    }

    return account, nil
}

// Deposit adds money to account
func (a *Account) Deposit(amount float64) error {
    if a.isClosed {
        return ErrAccountClosed
    }

    if amount <= 0 {
        return ErrInvalidAmount
    }

    newBalance := a.balance + amount

    event := MoneyDepositedEvent{
        BaseEvent: BaseEvent{
            Type:       "MoneyDeposited",
            AggrID:     a.id,
            Version:    a.version + 1,
            OccurredAt: time.Now(),
        },
        AccountID: a.id,
        Amount:    amount,
        Balance:   newBalance,
    }

    a.raiseEvent(&event)
    return nil
}

// Withdraw removes money from account
func (a *Account) Withdraw(amount float64) error {
    if a.isClosed {
        return ErrAccountClosed
    }

    if amount <= 0 {
        return ErrInvalidAmount
    }

    if a.balance < amount {
        return ErrInsufficientFunds
    }

    newBalance := a.balance - amount

    event := MoneyWithdrawnEvent{
        BaseEvent: BaseEvent{
            Type:       "MoneyWithdrawn",
            AggrID:     a.id,
            Version:    a.version + 1,
            OccurredAt: time.Now(),
        },
        AccountID: a.id,
        Amount:    amount,
        Balance:   newBalance,
    }

    a.raiseEvent(&event)
    return nil
}

// Close closes the account
func (a *Account) Close(reason string) error {
    if a.isClosed {
        return ErrAccountClosed
    }

    event := AccountClosedEvent{
        BaseEvent: BaseEvent{
            Type:       "AccountClosed",
            AggrID:     a.id,
            Version:    a.version + 1,
            OccurredAt: time.Now(),
        },
        AccountID: a.id,
        Reason:    reason,
    }

    a.raiseEvent(&event)
    return nil
}

// GetUncommittedEvents returns uncommitted events
func (a *Account) GetUncommittedEvents() []Event {
    return a.uncommittedEvents
}

// MarkEventsAsCommitted clears uncommitted events
func (a *Account) MarkEventsAsCommitted() {
    a.uncommittedEvents = make([]Event, 0)
}

// raiseEvent adds event to uncommitted and applies it
func (a *Account) raiseEvent(event Event) {
    a.uncommittedEvents = append(a.uncommittedEvents, event)
    a.apply(event)
}

// apply applies an event to the aggregate state
func (a *Account) apply(event Event) error {
    switch e := event.(type) {
    case *AccountCreatedEvent:
        a.id = e.AccountID
        a.owner = e.Owner
        a.currency = e.Currency
        a.balance = 0
        a.isClosed = false

    case *MoneyDepositedEvent:
        a.balance = e.Balance

    case *MoneyWithdrawnEvent:
        a.balance = e.Balance

    case *AccountClosedEvent:
        a.isClosed = true

    default:
        return fmt.Errorf("unknown event type: %T", event)
    }

    return nil
}

// Getters
func (a *Account) ID() string       { return a.id }
func (a *Account) Balance() float64 { return a.balance }
func (a *Account) Version() int     { return a.version }
func (a *Account) IsClosed() bool   { return a.isClosed }

Event Store

// internal/eventstore/store.go
package eventstore

import (
    "context"

    "app/internal/domain"
)

// EventStore interface
type EventStore interface {
    SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error
    GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error)
    GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error)
    GetAllEvents(ctx context.Context) ([]domain.Event, error)
}

// internal/eventstore/postgres_store.go
package eventstore

import (
    "context"
    "database/sql"
    "fmt"

    "app/internal/domain"
)

type PostgresEventStore struct {
    db *sql.DB
}

func NewPostgresEventStore(db *sql.DB) *PostgresEventStore {
    return &PostgresEventStore{db: db}
}

// SaveEvents appends events to the event store with optimistic concurrency
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Check current version for optimistic locking
    var currentVersion int
    err = tx.QueryRowContext(ctx,
        "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
        aggregateID,
    ).Scan(&currentVersion)

    if err != nil && err != sql.ErrNoRows {
        return err
    }

    if currentVersion != expectedVersion {
        return fmt.Errorf("concurrency conflict: expected version %d, got %d", expectedVersion, currentVersion)
    }

    // Insert events
    query := `
        INSERT INTO events (event_id, event_type, aggregate_id, version, data, metadata, created_at)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
    `

    for _, event := range events {
        envelope, err := domain.SerializeEvent(event)
        if err != nil {
            return err
        }

        _, err = tx.ExecContext(ctx, query,
            envelope.EventID,
            envelope.EventType,
            envelope.AggregateID,
            envelope.Version,
            envelope.Data,
            envelope.Metadata,
            envelope.CreatedAt,
        )

        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

// GetEvents retrieves all events for an aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) {
    query := `
        SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
        FROM events
        WHERE aggregate_id = $1
        ORDER BY version ASC
    `

    rows, err := s.db.QueryContext(ctx, query, aggregateID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var events []domain.Event

    for rows.Next() {
        envelope := &domain.EventEnvelope{}
        err := rows.Scan(
            &envelope.EventID,
            &envelope.EventType,
            &envelope.AggregateID,
            &envelope.Version,
            &envelope.Data,
            &envelope.Metadata,
            &envelope.CreatedAt,
        )

        if err != nil {
            return nil, err
        }

        event, err := domain.DeserializeEvent(envelope)
        if err != nil {
            return nil, err
        }

        events = append(events, event)
    }

    return events, nil
}

// GetEventsSince retrieves events after a specific version
func (s *PostgresEventStore) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) {
    query := `
        SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
        FROM events
        WHERE aggregate_id = $1 AND version > $2
        ORDER BY version ASC
    `

    rows, err := s.db.QueryContext(ctx, query, aggregateID, version)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var events []domain.Event

    for rows.Next() {
        envelope := &domain.EventEnvelope{}
        err := rows.Scan(
            &envelope.EventID,
            &envelope.EventType,
            &envelope.AggregateID,
            &envelope.Version,
            &envelope.Data,
            &envelope.Metadata,
            &envelope.CreatedAt,
        )

        if err != nil {
            return nil, err
        }

        event, err := domain.DeserializeEvent(envelope)
        if err != nil {
            return nil, err
        }

        events = append(events, event)
    }

    return events, nil
}

// GetAllEvents retrieves all events from the store
func (s *PostgresEventStore) GetAllEvents(ctx context.Context) ([]domain.Event, error) {
    query := `
        SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
        FROM events
        ORDER BY created_at ASC
    `

    rows, err := s.db.QueryContext(ctx, query)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var events []domain.Event

    for rows.Next() {
        envelope := &domain.EventEnvelope{}
        err := rows.Scan(
            &envelope.EventID,
            &envelope.EventType,
            &envelope.AggregateID,
            &envelope.Version,
            &envelope.Data,
            &envelope.Metadata,
            &envelope.CreatedAt,
        )

        if err != nil {
            return nil, err
        }

        event, err := domain.DeserializeEvent(envelope)
        if err != nil {
            return nil, err
        }

        events = append(events, event)
    }

    return events, nil
}

Snapshots for Performance

// internal/eventstore/snapshot.go
package eventstore

import (
    "context"
    "database/sql"
    "encoding/json"
    "time"
)

// Snapshot represents a point-in-time state
type Snapshot struct {
    AggregateID string
    Version     int
    State       json.RawMessage
    CreatedAt   time.Time
}

type SnapshotStore struct {
    db *sql.DB
}

func NewSnapshotStore(db *sql.DB) *SnapshotStore {
    return &SnapshotStore{db: db}
}

// SaveSnapshot saves a snapshot
func (s *SnapshotStore) SaveSnapshot(ctx context.Context, snapshot *Snapshot) error {
    query := `
        INSERT INTO snapshots (aggregate_id, version, state, created_at)
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (aggregate_id) DO UPDATE
        SET version = $2, state = $3, created_at = $4
    `

    _, err := s.db.ExecContext(ctx, query,
        snapshot.AggregateID,
        snapshot.Version,
        snapshot.State,
        snapshot.CreatedAt,
    )

    return err
}

// GetSnapshot retrieves the latest snapshot
func (s *SnapshotStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
    query := `
        SELECT aggregate_id, version, state, created_at
        FROM snapshots
        WHERE aggregate_id = $1
    `

    snapshot := &Snapshot{}
    err := s.db.QueryRowContext(ctx, query, aggregateID).Scan(
        &snapshot.AggregateID,
        &snapshot.Version,
        &snapshot.State,
        &snapshot.CreatedAt,
    )

    if err == sql.ErrNoRows {
        return nil, nil
    }

    return snapshot, err
}

Command Handlers

// internal/commands/handlers.go
package commands

import (
    "context"
    "fmt"

    "app/internal/domain"
    "app/internal/eventstore"
)

// CreateAccountCommand
type CreateAccountCommand struct {
    AccountID string
    Owner     string
    Currency  string
}

type CreateAccountHandler struct {
    eventStore eventstore.EventStore
}

func NewCreateAccountHandler(store eventstore.EventStore) *CreateAccountHandler {
    return &CreateAccountHandler{eventStore: store}
}

func (h *CreateAccountHandler) Handle(ctx context.Context, cmd *CreateAccountCommand) error {
    // Create new aggregate
    account, err := domain.NewAccount(cmd.AccountID, cmd.Owner, cmd.Currency)
    if err != nil {
        return err
    }

    // Save events
    events := account.GetUncommittedEvents()
    if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, events, 0); err != nil {
        return fmt.Errorf("failed to save events: %w", err)
    }

    account.MarkEventsAsCommitted()
    return nil
}

// DepositMoneyCommand
type DepositMoneyCommand struct {
    AccountID string
    Amount    float64
}

type DepositMoneyHandler struct {
    eventStore eventstore.EventStore
}

func NewDepositMoneyHandler(store eventstore.EventStore) *DepositMoneyHandler {
    return &DepositMoneyHandler{eventStore: store}
}

func (h *DepositMoneyHandler) Handle(ctx context.Context, cmd *DepositMoneyCommand) error {
    // Load aggregate from events
    events, err := h.eventStore.GetEvents(ctx, cmd.AccountID)
    if err != nil {
        return err
    }

    account, err := domain.LoadFromHistory(events)
    if err != nil {
        return err
    }

    // Execute command
    if err := account.Deposit(cmd.Amount); err != nil {
        return err
    }

    // Save new events
    newEvents := account.GetUncommittedEvents()
    if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil {
        return fmt.Errorf("failed to save events: %w", err)
    }

    account.MarkEventsAsCommitted()
    return nil
}

// WithdrawMoneyCommand
type WithdrawMoneyCommand struct {
    AccountID string
    Amount    float64
}

type WithdrawMoneyHandler struct {
    eventStore eventstore.EventStore
}

func NewWithdrawMoneyHandler(store eventstore.EventStore) *WithdrawMoneyHandler {
    return &WithdrawMoneyHandler{eventStore: store}
}

func (h *WithdrawMoneyHandler) Handle(ctx context.Context, cmd *WithdrawMoneyCommand) error {
    events, err := h.eventStore.GetEvents(ctx, cmd.AccountID)
    if err != nil {
        return err
    }

    account, err := domain.LoadFromHistory(events)
    if err != nil {
        return err
    }

    if err := account.Withdraw(cmd.Amount); err != nil {
        return err
    }

    newEvents := account.GetUncommittedEvents()
    if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil {
        return fmt.Errorf("failed to save events: %w", err)
    }

    account.MarkEventsAsCommitted()
    return nil
}

Projections

// internal/projections/account_projection.go
package projections

import (
    "context"
    "database/sql"

    "app/internal/domain"
)

// AccountReadModel represents the current state
type AccountReadModel struct {
    ID            string
    Owner         string
    Currency      string
    Balance       float64
    IsClosed      bool
    TotalDeposits float64
    TotalWithdrawals float64
    TransactionCount int
}

type AccountProjection struct {
    db *sql.DB
}

func NewAccountProjection(db *sql.DB) *AccountProjection {
    return &AccountProjection{db: db}
}

// Project processes events and updates read model
func (p *AccountProjection) Project(ctx context.Context, event domain.Event) error {
    switch e := event.(type) {
    case *domain.AccountCreatedEvent:
        return p.handleAccountCreated(ctx, e)
    case *domain.MoneyDepositedEvent:
        return p.handleMoneyDeposited(ctx, e)
    case *domain.MoneyWithdrawnEvent:
        return p.handleMoneyWithdrawn(ctx, e)
    case *domain.AccountClosedEvent:
        return p.handleAccountClosed(ctx, e)
    }
    return nil
}

func (p *AccountProjection) handleAccountCreated(ctx context.Context, event *domain.AccountCreatedEvent) error {
    query := `
        INSERT INTO account_read_model (id, owner, currency, balance, is_closed, total_deposits, total_withdrawals, transaction_count)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
    `
    _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Owner, event.Currency, 0, false, 0, 0, 0)
    return err
}

func (p *AccountProjection) handleMoneyDeposited(ctx context.Context, event *domain.MoneyDepositedEvent) error {
    query := `
        UPDATE account_read_model
        SET balance = $2,
            total_deposits = total_deposits + $3,
            transaction_count = transaction_count + 1
        WHERE id = $1
    `
    _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount)
    return err
}

func (p *AccountProjection) handleMoneyWithdrawn(ctx context.Context, event *domain.MoneyWithdrawnEvent) error {
    query := `
        UPDATE account_read_model
        SET balance = $2,
            total_withdrawals = total_withdrawals + $3,
            transaction_count = transaction_count + 1
        WHERE id = $1
    `
    _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount)
    return err
}

func (p *AccountProjection) handleAccountClosed(ctx context.Context, event *domain.AccountClosedEvent) error {
    query := `UPDATE account_read_model SET is_closed = true WHERE id = $1`
    _, err := p.db.ExecContext(ctx, query, event.AccountID)
    return err
}

// RebuildProjections rebuilds all projections from events
func (p *AccountProjection) RebuildProjections(ctx context.Context, eventStore eventstore.EventStore) error {
    // Clear existing projections
    if _, err := p.db.ExecContext(ctx, "TRUNCATE account_read_model"); err != nil {
        return err
    }

    // Replay all events
    events, err := eventStore.GetAllEvents(ctx)
    if err != nil {
        return err
    }

    for _, event := range events {
        if err := p.Project(ctx, event); err != nil {
            return err
        }
    }

    return nil
}

Main Application

// cmd/api/main.go
package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "net/http"

    "github.com/gorilla/mux"
    _ "github.com/lib/pq"

    "app/internal/commands"
    "app/internal/eventstore"
    "app/internal/projections"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/eventsourcing?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // Initialize stores
    eventStore := eventstore.NewPostgresEventStore(db)
    projection := projections.NewAccountProjection(db)

    // Initialize command handlers
    createAccountHandler := commands.NewCreateAccountHandler(eventStore)
    depositHandler := commands.NewDepositMoneyHandler(eventStore)
    withdrawHandler := commands.NewWithdrawMoneyHandler(eventStore)

    router := mux.NewRouter()

    // Command endpoints
    router.HandleFunc("/accounts", func(w http.ResponseWriter, r *http.Request) {
        var cmd commands.CreateAccountCommand
        if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        if err := createAccountHandler.Handle(r.Context(), &cmd); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        w.WriteHeader(http.StatusCreated)
    }).Methods("POST")

    router.HandleFunc("/accounts/{id}/deposit", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        var req struct {
            Amount float64 `json:"amount"`
        }

        if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        cmd := commands.DepositMoneyCommand{
            AccountID: vars["id"],
            Amount:    req.Amount,
        }

        if err := depositHandler.Handle(r.Context(), &cmd); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        w.WriteHeader(http.StatusOK)
    }).Methods("POST")

    // Event replay endpoint
    router.HandleFunc("/admin/rebuild-projections", func(w http.ResponseWriter, r *http.Request) {
        if err := projection.RebuildProjections(r.Context(), eventStore); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        w.WriteHeader(http.StatusOK)
        w.Write([]byte("Projections rebuilt successfully"))
    }).Methods("POST")

    log.Println("Server starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", router))
}

Best Practices

  1. Immutable Events: Never modify or delete events
  2. Event Versioning: Version events for schema evolution
  3. Snapshots: Use snapshots to optimize replay performance
  4. Idempotency: Ensure event handlers are idempotent
  5. Event Schema: Keep events small and focused
  6. Testing: Test event replay and projections
  7. Monitoring: Track event store size and replay performance
  8. Documentation: Document all event types and their purpose

Common Pitfalls

  1. Mutable Events: Modifying or deleting events
  2. Missing Events: Not capturing all state changes
  3. Large Events: Storing too much data in events
  4. No Snapshots: Performance degradation from replaying all events
  5. Tight Coupling: Events containing implementation details
  6. No Versioning: Breaking changes to event schemas
  7. Synchronous Projections: Slowing down command processing

When to Use Event Sourcing

Use When:

  • Need complete audit trail of all changes
  • Temporal queries required (state at any point in time)
  • Complex business logic with state transitions
  • Need to replay events for debugging or analytics
  • Building event-driven systems
  • Regulatory requirements for audit trails

Avoid When:

  • Simple CRUD applications
  • No need for audit trails
  • Team lacks experience with event sourcing
  • Performance overhead not acceptable
  • Eventual consistency is problematic
  • Storage costs are prohibitive

Advantages

  • Complete Audit Trail: Every change is recorded
  • Time Travel: Query state at any point in time
  • Event Replay: Rebuild state or create new projections
  • Debugging: Trace exact sequence of events
  • Event-Driven: Natural fit for event-driven architecture
  • Immutability: Events are immutable and append-only
  • Business Insights: Rich data for analytics

Disadvantages

  • Complexity: More complex than traditional CRUD
  • Storage: Events accumulate over time
  • Performance: Replaying events can be slow
  • Learning Curve: Requires mindset shift
  • Eventual Consistency: Read models may lag
  • Event Evolution: Managing event schema changes
  • Query Complexity: Need separate read models

Event Sourcing provides powerful capabilities for audit trails and temporal queries but requires careful consideration of trade-offs and operational complexity.


Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern →