Event Sourcing in Go: Building Audit-Trail Applications
Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern → What is Event Sourcing? Event Sourcing is an architectural pattern where state changes are stored as a sequence of immutable events rather than updating records in place. The current state is derived by replaying all events from the beginning. Key Principles: Event Store: All state changes stored as events Immutable Events: Events are never modified or deleted Event Replay: Current state reconstructed by replaying events Complete Audit Trail: Every change is recorded with full context Temporal Queries: Query state at any point in time Event Versioning: Events can evolve while maintaining history Architecture Overview graph TB subgraph "Command Side" Command[Command] Aggregate[Aggregate Root] EventStore[(Event Store)] end subgraph "Event Processing" EventBus[Event Bus] Projector[Event Projectors] end subgraph "Query Side" ReadModel1[(Current State View)] ReadModel2[(Analytics View)] ReadModel3[(Audit Log View)] end subgraph "Time Travel" Snapshot[(Snapshots)] Replay[Event Replay] end Command --> Aggregate Aggregate -->|Load Events| EventStore Aggregate -->|Append Events| EventStore EventStore -->|Publish| EventBus EventBus --> Projector Projector --> ReadModel1 Projector --> ReadModel2 Projector --> ReadModel3 EventStore --> Snapshot Snapshot --> Replay Replay --> Aggregate style Aggregate fill:#fff4e1 style EventStore fill:#e1f5ff style Projector fill:#e8f5e9 style ReadModel1 fill:#f3e5f5 Event Sourcing Flow sequenceDiagram participant Client participant CommandHandler participant Aggregate participant EventStore participant EventBus participant Projection Client->>CommandHandler: Execute Command CommandHandler->>EventStore: Load Events for Aggregate EventStore-->>CommandHandler: Event Stream CommandHandler->>Aggregate: Replay Events Aggregate->>Aggregate: Rebuild State CommandHandler->>Aggregate: Execute Command Aggregate->>Aggregate: Validate Business Rules Aggregate->>Aggregate: Generate Events CommandHandler->>EventStore: Append New Events EventStore-->>CommandHandler: Events Persisted EventStore->>EventBus: Publish Events EventBus->>Projection: Project Events Projection->>Projection: Update Read Model CommandHandler-->>Client: Command Accepted Real-World Use Cases Banking Systems: Complete transaction history and audit trails E-commerce: Order lifecycle tracking and returns processing Healthcare: Patient medical history and treatment records Trading Systems: Trade execution history and reconciliation Gaming: Player action history and replay functionality Workflow Systems: Process execution history and debugging Event Sourcing Implementation Project Structure event-sourcing-app/ ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── domain/ │ │ ├── account.go │ │ └── events.go │ ├── eventstore/ │ │ ├── store.go │ │ ├── postgres_store.go │ │ └── snapshot.go │ ├── commands/ │ │ └── handlers.go │ ├── projections/ │ │ └── account_projection.go │ └── queries/ │ └── handlers.go └── go.mod Domain Events // internal/domain/events.go package domain import ( "encoding/json" "time" ) // Event represents a domain event type Event interface { EventType() string AggregateID() string EventVersion() int Timestamp() time.Time } // BaseEvent provides common event fields type BaseEvent struct { Type string `json:"type"` AggrID string `json:"aggregate_id"` Version int `json:"version"` OccurredAt time.Time `json:"occurred_at"` Metadata map[string]string `json:"metadata"` } func (e BaseEvent) EventType() string { return e.Type } func (e BaseEvent) AggregateID() string { return e.AggrID } func (e BaseEvent) EventVersion() int { return e.Version } func (e BaseEvent) Timestamp() time.Time { return e.OccurredAt } // Account Events type AccountCreatedEvent struct { BaseEvent AccountID string `json:"account_id"` Owner string `json:"owner"` Currency string `json:"currency"` } type MoneyDepositedEvent struct { BaseEvent AccountID string `json:"account_id"` Amount float64 `json:"amount"` Balance float64 `json:"balance"` } type MoneyWithdrawnEvent struct { BaseEvent AccountID string `json:"account_id"` Amount float64 `json:"amount"` Balance float64 `json:"balance"` } type AccountClosedEvent struct { BaseEvent AccountID string `json:"account_id"` Reason string `json:"reason"` } // EventEnvelope wraps events for storage type EventEnvelope struct { EventID string `json:"event_id"` EventType string `json:"event_type"` AggregateID string `json:"aggregate_id"` Version int `json:"version"` Data json.RawMessage `json:"data"` Metadata map[string]string `json:"metadata"` CreatedAt time.Time `json:"created_at"` } // Serialize serializes event to envelope func SerializeEvent(event Event) (*EventEnvelope, error) { data, err := json.Marshal(event) if err != nil { return nil, err } return &EventEnvelope{ EventID: generateEventID(), EventType: event.EventType(), AggregateID: event.AggregateID(), Version: event.EventVersion(), Data: data, CreatedAt: event.Timestamp(), }, nil } // DeserializeEvent deserializes envelope to event func DeserializeEvent(envelope *EventEnvelope) (Event, error) { var event Event switch envelope.EventType { case "AccountCreated": event = &AccountCreatedEvent{} case "MoneyDeposited": event = &MoneyDepositedEvent{} case "MoneyWithdrawn": event = &MoneyWithdrawnEvent{} case "AccountClosed": event = &AccountClosedEvent{} default: return nil, fmt.Errorf("unknown event type: %s", envelope.EventType) } if err := json.Unmarshal(envelope.Data, event); err != nil { return nil, err } return event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } Aggregate Root // internal/domain/account.go package domain import ( "errors" "fmt" ) var ( ErrInsufficientFunds = errors.New("insufficient funds") ErrAccountClosed = errors.New("account is closed") ErrInvalidAmount = errors.New("invalid amount") ) // Account is an aggregate root that uses event sourcing type Account struct { id string owner string currency string balance float64 isClosed bool version int uncommittedEvents []Event } // NewAccount creates a new account func NewAccount(id, owner, currency string) (*Account, error) { if id == "" || owner == "" || currency == "" { return nil, errors.New("invalid account parameters") } account := &Account{ uncommittedEvents: make([]Event, 0), } // Raise domain event event := AccountCreatedEvent{ BaseEvent: BaseEvent{ Type: "AccountCreated", AggrID: id, Version: 1, OccurredAt: time.Now(), }, AccountID: id, Owner: owner, Currency: currency, } account.raiseEvent(&event) return account, nil } // LoadFromHistory rebuilds account state from events func LoadFromHistory(events []Event) (*Account, error) { if len(events) == 0 { return nil, errors.New("no events to load") } account := &Account{ uncommittedEvents: make([]Event, 0), } for _, event := range events { if err := account.apply(event); err != nil { return nil, err } account.version = event.EventVersion() } return account, nil } // Deposit adds money to account func (a *Account) Deposit(amount float64) error { if a.isClosed { return ErrAccountClosed } if amount <= 0 { return ErrInvalidAmount } newBalance := a.balance + amount event := MoneyDepositedEvent{ BaseEvent: BaseEvent{ Type: "MoneyDeposited", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Amount: amount, Balance: newBalance, } a.raiseEvent(&event) return nil } // Withdraw removes money from account func (a *Account) Withdraw(amount float64) error { if a.isClosed { return ErrAccountClosed } if amount <= 0 { return ErrInvalidAmount } if a.balance < amount { return ErrInsufficientFunds } newBalance := a.balance - amount event := MoneyWithdrawnEvent{ BaseEvent: BaseEvent{ Type: "MoneyWithdrawn", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Amount: amount, Balance: newBalance, } a.raiseEvent(&event) return nil } // Close closes the account func (a *Account) Close(reason string) error { if a.isClosed { return ErrAccountClosed } event := AccountClosedEvent{ BaseEvent: BaseEvent{ Type: "AccountClosed", AggrID: a.id, Version: a.version + 1, OccurredAt: time.Now(), }, AccountID: a.id, Reason: reason, } a.raiseEvent(&event) return nil } // GetUncommittedEvents returns uncommitted events func (a *Account) GetUncommittedEvents() []Event { return a.uncommittedEvents } // MarkEventsAsCommitted clears uncommitted events func (a *Account) MarkEventsAsCommitted() { a.uncommittedEvents = make([]Event, 0) } // raiseEvent adds event to uncommitted and applies it func (a *Account) raiseEvent(event Event) { a.uncommittedEvents = append(a.uncommittedEvents, event) a.apply(event) } // apply applies an event to the aggregate state func (a *Account) apply(event Event) error { switch e := event.(type) { case *AccountCreatedEvent: a.id = e.AccountID a.owner = e.Owner a.currency = e.Currency a.balance = 0 a.isClosed = false case *MoneyDepositedEvent: a.balance = e.Balance case *MoneyWithdrawnEvent: a.balance = e.Balance case *AccountClosedEvent: a.isClosed = true default: return fmt.Errorf("unknown event type: %T", event) } return nil } // Getters func (a *Account) ID() string { return a.id } func (a *Account) Balance() float64 { return a.balance } func (a *Account) Version() int { return a.version } func (a *Account) IsClosed() bool { return a.isClosed } Event Store // internal/eventstore/store.go package eventstore import ( "context" "app/internal/domain" ) // EventStore interface type EventStore interface { SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) GetAllEvents(ctx context.Context) ([]domain.Event, error) } // internal/eventstore/postgres_store.go package eventstore import ( "context" "database/sql" "fmt" "app/internal/domain" ) type PostgresEventStore struct { db *sql.DB } func NewPostgresEventStore(db *sql.DB) *PostgresEventStore { return &PostgresEventStore{db: db} } // SaveEvents appends events to the event store with optimistic concurrency func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Check current version for optimistic locking var currentVersion int err = tx.QueryRowContext(ctx, "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", aggregateID, ).Scan(¤tVersion) if err != nil && err != sql.ErrNoRows { return err } if currentVersion != expectedVersion { return fmt.Errorf("concurrency conflict: expected version %d, got %d", expectedVersion, currentVersion) } // Insert events query := ` INSERT INTO events (event_id, event_type, aggregate_id, version, data, metadata, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ` for _, event := range events { envelope, err := domain.SerializeEvent(event) if err != nil { return err } _, err = tx.ExecContext(ctx, query, envelope.EventID, envelope.EventType, envelope.AggregateID, envelope.Version, envelope.Data, envelope.Metadata, envelope.CreatedAt, ) if err != nil { return err } } return tx.Commit() } // GetEvents retrieves all events for an aggregate func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events WHERE aggregate_id = $1 ORDER BY version ASC ` rows, err := s.db.QueryContext(ctx, query, aggregateID) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } // GetEventsSince retrieves events after a specific version func (s *PostgresEventStore) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC ` rows, err := s.db.QueryContext(ctx, query, aggregateID, version) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } // GetAllEvents retrieves all events from the store func (s *PostgresEventStore) GetAllEvents(ctx context.Context) ([]domain.Event, error) { query := ` SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at FROM events ORDER BY created_at ASC ` rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var events []domain.Event for rows.Next() { envelope := &domain.EventEnvelope{} err := rows.Scan( &envelope.EventID, &envelope.EventType, &envelope.AggregateID, &envelope.Version, &envelope.Data, &envelope.Metadata, &envelope.CreatedAt, ) if err != nil { return nil, err } event, err := domain.DeserializeEvent(envelope) if err != nil { return nil, err } events = append(events, event) } return events, nil } Snapshots for Performance // internal/eventstore/snapshot.go package eventstore import ( "context" "database/sql" "encoding/json" "time" ) // Snapshot represents a point-in-time state type Snapshot struct { AggregateID string Version int State json.RawMessage CreatedAt time.Time } type SnapshotStore struct { db *sql.DB } func NewSnapshotStore(db *sql.DB) *SnapshotStore { return &SnapshotStore{db: db} } // SaveSnapshot saves a snapshot func (s *SnapshotStore) SaveSnapshot(ctx context.Context, snapshot *Snapshot) error { query := ` INSERT INTO snapshots (aggregate_id, version, state, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, state = $3, created_at = $4 ` _, err := s.db.ExecContext(ctx, query, snapshot.AggregateID, snapshot.Version, snapshot.State, snapshot.CreatedAt, ) return err } // GetSnapshot retrieves the latest snapshot func (s *SnapshotStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) { query := ` SELECT aggregate_id, version, state, created_at FROM snapshots WHERE aggregate_id = $1 ` snapshot := &Snapshot{} err := s.db.QueryRowContext(ctx, query, aggregateID).Scan( &snapshot.AggregateID, &snapshot.Version, &snapshot.State, &snapshot.CreatedAt, ) if err == sql.ErrNoRows { return nil, nil } return snapshot, err } Command Handlers // internal/commands/handlers.go package commands import ( "context" "fmt" "app/internal/domain" "app/internal/eventstore" ) // CreateAccountCommand type CreateAccountCommand struct { AccountID string Owner string Currency string } type CreateAccountHandler struct { eventStore eventstore.EventStore } func NewCreateAccountHandler(store eventstore.EventStore) *CreateAccountHandler { return &CreateAccountHandler{eventStore: store} } func (h *CreateAccountHandler) Handle(ctx context.Context, cmd *CreateAccountCommand) error { // Create new aggregate account, err := domain.NewAccount(cmd.AccountID, cmd.Owner, cmd.Currency) if err != nil { return err } // Save events events := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, events, 0); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } // DepositMoneyCommand type DepositMoneyCommand struct { AccountID string Amount float64 } type DepositMoneyHandler struct { eventStore eventstore.EventStore } func NewDepositMoneyHandler(store eventstore.EventStore) *DepositMoneyHandler { return &DepositMoneyHandler{eventStore: store} } func (h *DepositMoneyHandler) Handle(ctx context.Context, cmd *DepositMoneyCommand) error { // Load aggregate from events events, err := h.eventStore.GetEvents(ctx, cmd.AccountID) if err != nil { return err } account, err := domain.LoadFromHistory(events) if err != nil { return err } // Execute command if err := account.Deposit(cmd.Amount); err != nil { return err } // Save new events newEvents := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } // WithdrawMoneyCommand type WithdrawMoneyCommand struct { AccountID string Amount float64 } type WithdrawMoneyHandler struct { eventStore eventstore.EventStore } func NewWithdrawMoneyHandler(store eventstore.EventStore) *WithdrawMoneyHandler { return &WithdrawMoneyHandler{eventStore: store} } func (h *WithdrawMoneyHandler) Handle(ctx context.Context, cmd *WithdrawMoneyCommand) error { events, err := h.eventStore.GetEvents(ctx, cmd.AccountID) if err != nil { return err } account, err := domain.LoadFromHistory(events) if err != nil { return err } if err := account.Withdraw(cmd.Amount); err != nil { return err } newEvents := account.GetUncommittedEvents() if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil { return fmt.Errorf("failed to save events: %w", err) } account.MarkEventsAsCommitted() return nil } Projections // internal/projections/account_projection.go package projections import ( "context" "database/sql" "app/internal/domain" ) // AccountReadModel represents the current state type AccountReadModel struct { ID string Owner string Currency string Balance float64 IsClosed bool TotalDeposits float64 TotalWithdrawals float64 TransactionCount int } type AccountProjection struct { db *sql.DB } func NewAccountProjection(db *sql.DB) *AccountProjection { return &AccountProjection{db: db} } // Project processes events and updates read model func (p *AccountProjection) Project(ctx context.Context, event domain.Event) error { switch e := event.(type) { case *domain.AccountCreatedEvent: return p.handleAccountCreated(ctx, e) case *domain.MoneyDepositedEvent: return p.handleMoneyDeposited(ctx, e) case *domain.MoneyWithdrawnEvent: return p.handleMoneyWithdrawn(ctx, e) case *domain.AccountClosedEvent: return p.handleAccountClosed(ctx, e) } return nil } func (p *AccountProjection) handleAccountCreated(ctx context.Context, event *domain.AccountCreatedEvent) error { query := ` INSERT INTO account_read_model (id, owner, currency, balance, is_closed, total_deposits, total_withdrawals, transaction_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Owner, event.Currency, 0, false, 0, 0, 0) return err } func (p *AccountProjection) handleMoneyDeposited(ctx context.Context, event *domain.MoneyDepositedEvent) error { query := ` UPDATE account_read_model SET balance = $2, total_deposits = total_deposits + $3, transaction_count = transaction_count + 1 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount) return err } func (p *AccountProjection) handleMoneyWithdrawn(ctx context.Context, event *domain.MoneyWithdrawnEvent) error { query := ` UPDATE account_read_model SET balance = $2, total_withdrawals = total_withdrawals + $3, transaction_count = transaction_count + 1 WHERE id = $1 ` _, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount) return err } func (p *AccountProjection) handleAccountClosed(ctx context.Context, event *domain.AccountClosedEvent) error { query := `UPDATE account_read_model SET is_closed = true WHERE id = $1` _, err := p.db.ExecContext(ctx, query, event.AccountID) return err } // RebuildProjections rebuilds all projections from events func (p *AccountProjection) RebuildProjections(ctx context.Context, eventStore eventstore.EventStore) error { // Clear existing projections if _, err := p.db.ExecContext(ctx, "TRUNCATE account_read_model"); err != nil { return err } // Replay all events events, err := eventStore.GetAllEvents(ctx) if err != nil { return err } for _, event := range events { if err := p.Project(ctx, event); err != nil { return err } } return nil } Main Application // cmd/api/main.go package main import ( "context" "database/sql" "encoding/json" "log" "net/http" "github.com/gorilla/mux" _ "github.com/lib/pq" "app/internal/commands" "app/internal/eventstore" "app/internal/projections" ) func main() { db, err := sql.Open("postgres", "postgres://user:pass@localhost/eventsourcing?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize stores eventStore := eventstore.NewPostgresEventStore(db) projection := projections.NewAccountProjection(db) // Initialize command handlers createAccountHandler := commands.NewCreateAccountHandler(eventStore) depositHandler := commands.NewDepositMoneyHandler(eventStore) withdrawHandler := commands.NewWithdrawMoneyHandler(eventStore) router := mux.NewRouter() // Command endpoints router.HandleFunc("/accounts", func(w http.ResponseWriter, r *http.Request) { var cmd commands.CreateAccountCommand if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := createAccountHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusCreated) }).Methods("POST") router.HandleFunc("/accounts/{id}/deposit", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) var req struct { Amount float64 `json:"amount"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } cmd := commands.DepositMoneyCommand{ AccountID: vars["id"], Amount: req.Amount, } if err := depositHandler.Handle(r.Context(), &cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) }).Methods("POST") // Event replay endpoint router.HandleFunc("/admin/rebuild-projections", func(w http.ResponseWriter, r *http.Request) { if err := projection.RebuildProjections(r.Context(), eventStore); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write([]byte("Projections rebuilt successfully")) }).Methods("POST") log.Println("Server starting on :8080") log.Fatal(http.ListenAndServe(":8080", router)) } Best Practices Immutable Events: Never modify or delete events Event Versioning: Version events for schema evolution Snapshots: Use snapshots to optimize replay performance Idempotency: Ensure event handlers are idempotent Event Schema: Keep events small and focused Testing: Test event replay and projections Monitoring: Track event store size and replay performance Documentation: Document all event types and their purpose Common Pitfalls Mutable Events: Modifying or deleting events Missing Events: Not capturing all state changes Large Events: Storing too much data in events No Snapshots: Performance degradation from replaying all events Tight Coupling: Events containing implementation details No Versioning: Breaking changes to event schemas Synchronous Projections: Slowing down command processing When to Use Event Sourcing Use When: ...