Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern →
What is Event Sourcing?
Event Sourcing is an architectural pattern where state changes are stored as a sequence of immutable events rather than updating records in place. The current state is derived by replaying all events from the beginning.
Key Principles:
- Event Store: All state changes stored as events
- Immutable Events: Events are never modified or deleted
- Event Replay: Current state reconstructed by replaying events
- Complete Audit Trail: Every change is recorded with full context
- Temporal Queries: Query state at any point in time
- Event Versioning: Events can evolve while maintaining history
Architecture Overview
graph TB
subgraph "Command Side"
Command[Command]
Aggregate[Aggregate Root]
EventStore[(Event Store)]
end
subgraph "Event Processing"
EventBus[Event Bus]
Projector[Event Projectors]
end
subgraph "Query Side"
ReadModel1[(Current State View)]
ReadModel2[(Analytics View)]
ReadModel3[(Audit Log View)]
end
subgraph "Time Travel"
Snapshot[(Snapshots)]
Replay[Event Replay]
end
Command --> Aggregate
Aggregate -->|Load Events| EventStore
Aggregate -->|Append Events| EventStore
EventStore -->|Publish| EventBus
EventBus --> Projector
Projector --> ReadModel1
Projector --> ReadModel2
Projector --> ReadModel3
EventStore --> Snapshot
Snapshot --> Replay
Replay --> Aggregate
style Aggregate fill:#fff4e1
style EventStore fill:#e1f5ff
style Projector fill:#e8f5e9
style ReadModel1 fill:#f3e5f5
Event Sourcing Flow
sequenceDiagram
participant Client
participant CommandHandler
participant Aggregate
participant EventStore
participant EventBus
participant Projection
Client->>CommandHandler: Execute Command
CommandHandler->>EventStore: Load Events for Aggregate
EventStore-->>CommandHandler: Event Stream
CommandHandler->>Aggregate: Replay Events
Aggregate->>Aggregate: Rebuild State
CommandHandler->>Aggregate: Execute Command
Aggregate->>Aggregate: Validate Business Rules
Aggregate->>Aggregate: Generate Events
CommandHandler->>EventStore: Append New Events
EventStore-->>CommandHandler: Events Persisted
EventStore->>EventBus: Publish Events
EventBus->>Projection: Project Events
Projection->>Projection: Update Read Model
CommandHandler-->>Client: Command Accepted
Real-World Use Cases
- Banking Systems: Complete transaction history and audit trails
- E-commerce: Order lifecycle tracking and returns processing
- Healthcare: Patient medical history and treatment records
- Trading Systems: Trade execution history and reconciliation
- Gaming: Player action history and replay functionality
- Workflow Systems: Process execution history and debugging
Event Sourcing Implementation
Project Structure
event-sourcing-app/
├── cmd/
│ └── api/
│ └── main.go
├── internal/
│ ├── domain/
│ │ ├── account.go
│ │ └── events.go
│ ├── eventstore/
│ │ ├── store.go
│ │ ├── postgres_store.go
│ │ └── snapshot.go
│ ├── commands/
│ │ └── handlers.go
│ ├── projections/
│ │ └── account_projection.go
│ └── queries/
│ └── handlers.go
└── go.mod
Domain Events
// internal/domain/events.go
package domain
import (
"encoding/json"
"time"
)
// Event represents a domain event
type Event interface {
EventType() string
AggregateID() string
EventVersion() int
Timestamp() time.Time
}
// BaseEvent provides common event fields
type BaseEvent struct {
Type string `json:"type"`
AggrID string `json:"aggregate_id"`
Version int `json:"version"`
OccurredAt time.Time `json:"occurred_at"`
Metadata map[string]string `json:"metadata"`
}
func (e BaseEvent) EventType() string { return e.Type }
func (e BaseEvent) AggregateID() string { return e.AggrID }
func (e BaseEvent) EventVersion() int { return e.Version }
func (e BaseEvent) Timestamp() time.Time { return e.OccurredAt }
// Account Events
type AccountCreatedEvent struct {
BaseEvent
AccountID string `json:"account_id"`
Owner string `json:"owner"`
Currency string `json:"currency"`
}
type MoneyDepositedEvent struct {
BaseEvent
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
Balance float64 `json:"balance"`
}
type MoneyWithdrawnEvent struct {
BaseEvent
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
Balance float64 `json:"balance"`
}
type AccountClosedEvent struct {
BaseEvent
AccountID string `json:"account_id"`
Reason string `json:"reason"`
}
// EventEnvelope wraps events for storage
type EventEnvelope struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
AggregateID string `json:"aggregate_id"`
Version int `json:"version"`
Data json.RawMessage `json:"data"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
}
// Serialize serializes event to envelope
func SerializeEvent(event Event) (*EventEnvelope, error) {
data, err := json.Marshal(event)
if err != nil {
return nil, err
}
return &EventEnvelope{
EventID: generateEventID(),
EventType: event.EventType(),
AggregateID: event.AggregateID(),
Version: event.EventVersion(),
Data: data,
CreatedAt: event.Timestamp(),
}, nil
}
// DeserializeEvent deserializes envelope to event
func DeserializeEvent(envelope *EventEnvelope) (Event, error) {
var event Event
switch envelope.EventType {
case "AccountCreated":
event = &AccountCreatedEvent{}
case "MoneyDeposited":
event = &MoneyDepositedEvent{}
case "MoneyWithdrawn":
event = &MoneyWithdrawnEvent{}
case "AccountClosed":
event = &AccountClosedEvent{}
default:
return nil, fmt.Errorf("unknown event type: %s", envelope.EventType)
}
if err := json.Unmarshal(envelope.Data, event); err != nil {
return nil, err
}
return event, nil
}
func generateEventID() string {
return fmt.Sprintf("evt_%d", time.Now().UnixNano())
}
Aggregate Root
// internal/domain/account.go
package domain
import (
"errors"
"fmt"
)
var (
ErrInsufficientFunds = errors.New("insufficient funds")
ErrAccountClosed = errors.New("account is closed")
ErrInvalidAmount = errors.New("invalid amount")
)
// Account is an aggregate root that uses event sourcing
type Account struct {
id string
owner string
currency string
balance float64
isClosed bool
version int
uncommittedEvents []Event
}
// NewAccount creates a new account
func NewAccount(id, owner, currency string) (*Account, error) {
if id == "" || owner == "" || currency == "" {
return nil, errors.New("invalid account parameters")
}
account := &Account{
uncommittedEvents: make([]Event, 0),
}
// Raise domain event
event := AccountCreatedEvent{
BaseEvent: BaseEvent{
Type: "AccountCreated",
AggrID: id,
Version: 1,
OccurredAt: time.Now(),
},
AccountID: id,
Owner: owner,
Currency: currency,
}
account.raiseEvent(&event)
return account, nil
}
// LoadFromHistory rebuilds account state from events
func LoadFromHistory(events []Event) (*Account, error) {
if len(events) == 0 {
return nil, errors.New("no events to load")
}
account := &Account{
uncommittedEvents: make([]Event, 0),
}
for _, event := range events {
if err := account.apply(event); err != nil {
return nil, err
}
account.version = event.EventVersion()
}
return account, nil
}
// Deposit adds money to account
func (a *Account) Deposit(amount float64) error {
if a.isClosed {
return ErrAccountClosed
}
if amount <= 0 {
return ErrInvalidAmount
}
newBalance := a.balance + amount
event := MoneyDepositedEvent{
BaseEvent: BaseEvent{
Type: "MoneyDeposited",
AggrID: a.id,
Version: a.version + 1,
OccurredAt: time.Now(),
},
AccountID: a.id,
Amount: amount,
Balance: newBalance,
}
a.raiseEvent(&event)
return nil
}
// Withdraw removes money from account
func (a *Account) Withdraw(amount float64) error {
if a.isClosed {
return ErrAccountClosed
}
if amount <= 0 {
return ErrInvalidAmount
}
if a.balance < amount {
return ErrInsufficientFunds
}
newBalance := a.balance - amount
event := MoneyWithdrawnEvent{
BaseEvent: BaseEvent{
Type: "MoneyWithdrawn",
AggrID: a.id,
Version: a.version + 1,
OccurredAt: time.Now(),
},
AccountID: a.id,
Amount: amount,
Balance: newBalance,
}
a.raiseEvent(&event)
return nil
}
// Close closes the account
func (a *Account) Close(reason string) error {
if a.isClosed {
return ErrAccountClosed
}
event := AccountClosedEvent{
BaseEvent: BaseEvent{
Type: "AccountClosed",
AggrID: a.id,
Version: a.version + 1,
OccurredAt: time.Now(),
},
AccountID: a.id,
Reason: reason,
}
a.raiseEvent(&event)
return nil
}
// GetUncommittedEvents returns uncommitted events
func (a *Account) GetUncommittedEvents() []Event {
return a.uncommittedEvents
}
// MarkEventsAsCommitted clears uncommitted events
func (a *Account) MarkEventsAsCommitted() {
a.uncommittedEvents = make([]Event, 0)
}
// raiseEvent adds event to uncommitted and applies it
func (a *Account) raiseEvent(event Event) {
a.uncommittedEvents = append(a.uncommittedEvents, event)
a.apply(event)
}
// apply applies an event to the aggregate state
func (a *Account) apply(event Event) error {
switch e := event.(type) {
case *AccountCreatedEvent:
a.id = e.AccountID
a.owner = e.Owner
a.currency = e.Currency
a.balance = 0
a.isClosed = false
case *MoneyDepositedEvent:
a.balance = e.Balance
case *MoneyWithdrawnEvent:
a.balance = e.Balance
case *AccountClosedEvent:
a.isClosed = true
default:
return fmt.Errorf("unknown event type: %T", event)
}
return nil
}
// Getters
func (a *Account) ID() string { return a.id }
func (a *Account) Balance() float64 { return a.balance }
func (a *Account) Version() int { return a.version }
func (a *Account) IsClosed() bool { return a.isClosed }
Event Store
// internal/eventstore/store.go
package eventstore
import (
"context"
"app/internal/domain"
)
// EventStore interface
type EventStore interface {
SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error
GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error)
GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error)
GetAllEvents(ctx context.Context) ([]domain.Event, error)
}
// internal/eventstore/postgres_store.go
package eventstore
import (
"context"
"database/sql"
"fmt"
"app/internal/domain"
)
type PostgresEventStore struct {
db *sql.DB
}
func NewPostgresEventStore(db *sql.DB) *PostgresEventStore {
return &PostgresEventStore{db: db}
}
// SaveEvents appends events to the event store with optimistic concurrency
func (s *PostgresEventStore) SaveEvents(ctx context.Context, aggregateID string, events []domain.Event, expectedVersion int) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Check current version for optimistic locking
var currentVersion int
err = tx.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID,
).Scan(¤tVersion)
if err != nil && err != sql.ErrNoRows {
return err
}
if currentVersion != expectedVersion {
return fmt.Errorf("concurrency conflict: expected version %d, got %d", expectedVersion, currentVersion)
}
// Insert events
query := `
INSERT INTO events (event_id, event_type, aggregate_id, version, data, metadata, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`
for _, event := range events {
envelope, err := domain.SerializeEvent(event)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, query,
envelope.EventID,
envelope.EventType,
envelope.AggregateID,
envelope.Version,
envelope.Data,
envelope.Metadata,
envelope.CreatedAt,
)
if err != nil {
return err
}
}
return tx.Commit()
}
// GetEvents retrieves all events for an aggregate
func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]domain.Event, error) {
query := `
SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
FROM events
WHERE aggregate_id = $1
ORDER BY version ASC
`
rows, err := s.db.QueryContext(ctx, query, aggregateID)
if err != nil {
return nil, err
}
defer rows.Close()
var events []domain.Event
for rows.Next() {
envelope := &domain.EventEnvelope{}
err := rows.Scan(
&envelope.EventID,
&envelope.EventType,
&envelope.AggregateID,
&envelope.Version,
&envelope.Data,
&envelope.Metadata,
&envelope.CreatedAt,
)
if err != nil {
return nil, err
}
event, err := domain.DeserializeEvent(envelope)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
// GetEventsSince retrieves events after a specific version
func (s *PostgresEventStore) GetEventsSince(ctx context.Context, aggregateID string, version int) ([]domain.Event, error) {
query := `
SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
`
rows, err := s.db.QueryContext(ctx, query, aggregateID, version)
if err != nil {
return nil, err
}
defer rows.Close()
var events []domain.Event
for rows.Next() {
envelope := &domain.EventEnvelope{}
err := rows.Scan(
&envelope.EventID,
&envelope.EventType,
&envelope.AggregateID,
&envelope.Version,
&envelope.Data,
&envelope.Metadata,
&envelope.CreatedAt,
)
if err != nil {
return nil, err
}
event, err := domain.DeserializeEvent(envelope)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
// GetAllEvents retrieves all events from the store
func (s *PostgresEventStore) GetAllEvents(ctx context.Context) ([]domain.Event, error) {
query := `
SELECT event_id, event_type, aggregate_id, version, data, metadata, created_at
FROM events
ORDER BY created_at ASC
`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var events []domain.Event
for rows.Next() {
envelope := &domain.EventEnvelope{}
err := rows.Scan(
&envelope.EventID,
&envelope.EventType,
&envelope.AggregateID,
&envelope.Version,
&envelope.Data,
&envelope.Metadata,
&envelope.CreatedAt,
)
if err != nil {
return nil, err
}
event, err := domain.DeserializeEvent(envelope)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
Snapshots for Performance
// internal/eventstore/snapshot.go
package eventstore
import (
"context"
"database/sql"
"encoding/json"
"time"
)
// Snapshot represents a point-in-time state
type Snapshot struct {
AggregateID string
Version int
State json.RawMessage
CreatedAt time.Time
}
type SnapshotStore struct {
db *sql.DB
}
func NewSnapshotStore(db *sql.DB) *SnapshotStore {
return &SnapshotStore{db: db}
}
// SaveSnapshot saves a snapshot
func (s *SnapshotStore) SaveSnapshot(ctx context.Context, snapshot *Snapshot) error {
query := `
INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id) DO UPDATE
SET version = $2, state = $3, created_at = $4
`
_, err := s.db.ExecContext(ctx, query,
snapshot.AggregateID,
snapshot.Version,
snapshot.State,
snapshot.CreatedAt,
)
return err
}
// GetSnapshot retrieves the latest snapshot
func (s *SnapshotStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
query := `
SELECT aggregate_id, version, state, created_at
FROM snapshots
WHERE aggregate_id = $1
`
snapshot := &Snapshot{}
err := s.db.QueryRowContext(ctx, query, aggregateID).Scan(
&snapshot.AggregateID,
&snapshot.Version,
&snapshot.State,
&snapshot.CreatedAt,
)
if err == sql.ErrNoRows {
return nil, nil
}
return snapshot, err
}
Command Handlers
// internal/commands/handlers.go
package commands
import (
"context"
"fmt"
"app/internal/domain"
"app/internal/eventstore"
)
// CreateAccountCommand
type CreateAccountCommand struct {
AccountID string
Owner string
Currency string
}
type CreateAccountHandler struct {
eventStore eventstore.EventStore
}
func NewCreateAccountHandler(store eventstore.EventStore) *CreateAccountHandler {
return &CreateAccountHandler{eventStore: store}
}
func (h *CreateAccountHandler) Handle(ctx context.Context, cmd *CreateAccountCommand) error {
// Create new aggregate
account, err := domain.NewAccount(cmd.AccountID, cmd.Owner, cmd.Currency)
if err != nil {
return err
}
// Save events
events := account.GetUncommittedEvents()
if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, events, 0); err != nil {
return fmt.Errorf("failed to save events: %w", err)
}
account.MarkEventsAsCommitted()
return nil
}
// DepositMoneyCommand
type DepositMoneyCommand struct {
AccountID string
Amount float64
}
type DepositMoneyHandler struct {
eventStore eventstore.EventStore
}
func NewDepositMoneyHandler(store eventstore.EventStore) *DepositMoneyHandler {
return &DepositMoneyHandler{eventStore: store}
}
func (h *DepositMoneyHandler) Handle(ctx context.Context, cmd *DepositMoneyCommand) error {
// Load aggregate from events
events, err := h.eventStore.GetEvents(ctx, cmd.AccountID)
if err != nil {
return err
}
account, err := domain.LoadFromHistory(events)
if err != nil {
return err
}
// Execute command
if err := account.Deposit(cmd.Amount); err != nil {
return err
}
// Save new events
newEvents := account.GetUncommittedEvents()
if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil {
return fmt.Errorf("failed to save events: %w", err)
}
account.MarkEventsAsCommitted()
return nil
}
// WithdrawMoneyCommand
type WithdrawMoneyCommand struct {
AccountID string
Amount float64
}
type WithdrawMoneyHandler struct {
eventStore eventstore.EventStore
}
func NewWithdrawMoneyHandler(store eventstore.EventStore) *WithdrawMoneyHandler {
return &WithdrawMoneyHandler{eventStore: store}
}
func (h *WithdrawMoneyHandler) Handle(ctx context.Context, cmd *WithdrawMoneyCommand) error {
events, err := h.eventStore.GetEvents(ctx, cmd.AccountID)
if err != nil {
return err
}
account, err := domain.LoadFromHistory(events)
if err != nil {
return err
}
if err := account.Withdraw(cmd.Amount); err != nil {
return err
}
newEvents := account.GetUncommittedEvents()
if err := h.eventStore.SaveEvents(ctx, cmd.AccountID, newEvents, account.Version()); err != nil {
return fmt.Errorf("failed to save events: %w", err)
}
account.MarkEventsAsCommitted()
return nil
}
Projections
// internal/projections/account_projection.go
package projections
import (
"context"
"database/sql"
"app/internal/domain"
)
// AccountReadModel represents the current state
type AccountReadModel struct {
ID string
Owner string
Currency string
Balance float64
IsClosed bool
TotalDeposits float64
TotalWithdrawals float64
TransactionCount int
}
type AccountProjection struct {
db *sql.DB
}
func NewAccountProjection(db *sql.DB) *AccountProjection {
return &AccountProjection{db: db}
}
// Project processes events and updates read model
func (p *AccountProjection) Project(ctx context.Context, event domain.Event) error {
switch e := event.(type) {
case *domain.AccountCreatedEvent:
return p.handleAccountCreated(ctx, e)
case *domain.MoneyDepositedEvent:
return p.handleMoneyDeposited(ctx, e)
case *domain.MoneyWithdrawnEvent:
return p.handleMoneyWithdrawn(ctx, e)
case *domain.AccountClosedEvent:
return p.handleAccountClosed(ctx, e)
}
return nil
}
func (p *AccountProjection) handleAccountCreated(ctx context.Context, event *domain.AccountCreatedEvent) error {
query := `
INSERT INTO account_read_model (id, owner, currency, balance, is_closed, total_deposits, total_withdrawals, transaction_count)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
_, err := p.db.ExecContext(ctx, query, event.AccountID, event.Owner, event.Currency, 0, false, 0, 0, 0)
return err
}
func (p *AccountProjection) handleMoneyDeposited(ctx context.Context, event *domain.MoneyDepositedEvent) error {
query := `
UPDATE account_read_model
SET balance = $2,
total_deposits = total_deposits + $3,
transaction_count = transaction_count + 1
WHERE id = $1
`
_, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount)
return err
}
func (p *AccountProjection) handleMoneyWithdrawn(ctx context.Context, event *domain.MoneyWithdrawnEvent) error {
query := `
UPDATE account_read_model
SET balance = $2,
total_withdrawals = total_withdrawals + $3,
transaction_count = transaction_count + 1
WHERE id = $1
`
_, err := p.db.ExecContext(ctx, query, event.AccountID, event.Balance, event.Amount)
return err
}
func (p *AccountProjection) handleAccountClosed(ctx context.Context, event *domain.AccountClosedEvent) error {
query := `UPDATE account_read_model SET is_closed = true WHERE id = $1`
_, err := p.db.ExecContext(ctx, query, event.AccountID)
return err
}
// RebuildProjections rebuilds all projections from events
func (p *AccountProjection) RebuildProjections(ctx context.Context, eventStore eventstore.EventStore) error {
// Clear existing projections
if _, err := p.db.ExecContext(ctx, "TRUNCATE account_read_model"); err != nil {
return err
}
// Replay all events
events, err := eventStore.GetAllEvents(ctx)
if err != nil {
return err
}
for _, event := range events {
if err := p.Project(ctx, event); err != nil {
return err
}
}
return nil
}
Main Application
// cmd/api/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"github.com/gorilla/mux"
_ "github.com/lib/pq"
"app/internal/commands"
"app/internal/eventstore"
"app/internal/projections"
)
func main() {
db, err := sql.Open("postgres", "postgres://user:pass@localhost/eventsourcing?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Initialize stores
eventStore := eventstore.NewPostgresEventStore(db)
projection := projections.NewAccountProjection(db)
// Initialize command handlers
createAccountHandler := commands.NewCreateAccountHandler(eventStore)
depositHandler := commands.NewDepositMoneyHandler(eventStore)
withdrawHandler := commands.NewWithdrawMoneyHandler(eventStore)
router := mux.NewRouter()
// Command endpoints
router.HandleFunc("/accounts", func(w http.ResponseWriter, r *http.Request) {
var cmd commands.CreateAccountCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := createAccountHandler.Handle(r.Context(), &cmd); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusCreated)
}).Methods("POST")
router.HandleFunc("/accounts/{id}/deposit", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
var req struct {
Amount float64 `json:"amount"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
cmd := commands.DepositMoneyCommand{
AccountID: vars["id"],
Amount: req.Amount,
}
if err := depositHandler.Handle(r.Context(), &cmd); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
}).Methods("POST")
// Event replay endpoint
router.HandleFunc("/admin/rebuild-projections", func(w http.ResponseWriter, r *http.Request) {
if err := projection.RebuildProjections(r.Context(), eventStore); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Projections rebuilt successfully"))
}).Methods("POST")
log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", router))
}
Best Practices
- Immutable Events: Never modify or delete events
- Event Versioning: Version events for schema evolution
- Snapshots: Use snapshots to optimize replay performance
- Idempotency: Ensure event handlers are idempotent
- Event Schema: Keep events small and focused
- Testing: Test event replay and projections
- Monitoring: Track event store size and replay performance
- Documentation: Document all event types and their purpose
Common Pitfalls
- Mutable Events: Modifying or deleting events
- Missing Events: Not capturing all state changes
- Large Events: Storing too much data in events
- No Snapshots: Performance degradation from replaying all events
- Tight Coupling: Events containing implementation details
- No Versioning: Breaking changes to event schemas
- Synchronous Projections: Slowing down command processing
When to Use Event Sourcing
Use When:
- Need complete audit trail of all changes
- Temporal queries required (state at any point in time)
- Complex business logic with state transitions
- Need to replay events for debugging or analytics
- Building event-driven systems
- Regulatory requirements for audit trails
Avoid When:
- Simple CRUD applications
- No need for audit trails
- Team lacks experience with event sourcing
- Performance overhead not acceptable
- Eventual consistency is problematic
- Storage costs are prohibitive
Advantages
- Complete Audit Trail: Every change is recorded
- Time Travel: Query state at any point in time
- Event Replay: Rebuild state or create new projections
- Debugging: Trace exact sequence of events
- Event-Driven: Natural fit for event-driven architecture
- Immutability: Events are immutable and append-only
- Business Insights: Rich data for analytics
Disadvantages
- Complexity: More complex than traditional CRUD
- Storage: Events accumulate over time
- Performance: Replaying events can be slow
- Learning Curve: Requires mindset shift
- Eventual Consistency: Read models may lag
- Event Evolution: Managing event schema changes
- Query Complexity: Need separate read models
Event Sourcing provides powerful capabilities for audit trails and temporal queries but requires careful consideration of trade-offs and operational complexity.
Go Architecture Patterns Series: ← Previous: CQRS Pattern | Series Overview | Next: Saga Pattern →