Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern →


What is Event-Driven Architecture?

Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling.

Key Principles:

  • Event Producers: Components that generate events when state changes
  • Event Consumers: Components that react to events
  • Event Broker: Middleware that routes events from producers to consumers
  • Asynchronous Communication: Decoupled, non-blocking interactions
  • Event Immutability: Events represent past facts that cannot be changed
  • Eventual Consistency: Systems converge to consistent state over time

Architecture Overview

graph TB subgraph "Event Producers" UserService[User Service] OrderService[Order Service] PaymentService[Payment Service] end subgraph "Event Broker" EventBus[Event Bus / Message Broker] UserTopic[User Events Topic] OrderTopic[Order Events Topic] PaymentTopic[Payment Events Topic] end subgraph "Event Consumers" EmailService[Email Service] AnalyticsService[Analytics Service] NotificationService[Notification Service] InventoryService[Inventory Service] end subgraph "Event Store" EventLog[(Event Log DB)] end UserService -->|UserCreated| UserTopic OrderService -->|OrderPlaced| OrderTopic OrderService -->|OrderConfirmed| OrderTopic PaymentService -->|PaymentProcessed| PaymentTopic UserTopic --> EventBus OrderTopic --> EventBus PaymentTopic --> EventBus EventBus --> EmailService EventBus --> AnalyticsService EventBus --> NotificationService EventBus --> InventoryService EventBus --> EventLog style UserService fill:#e1f5ff style OrderService fill:#fff4e1 style PaymentService fill:#f3e5f5 style EventBus fill:#e8f5e9

Event Flow Pattern

sequenceDiagram participant User participant OrderService participant EventBroker participant EmailService participant InventoryService participant AnalyticsService User->>OrderService: Place Order OrderService->>OrderService: Validate Order OrderService->>EventBroker: Publish OrderPlaced Event Note over EventBroker: Event is distributed to all subscribers EventBroker->>EmailService: OrderPlaced Event EventBroker->>InventoryService: OrderPlaced Event EventBroker->>AnalyticsService: OrderPlaced Event EmailService->>EmailService: Send Confirmation Email InventoryService->>InventoryService: Update Stock AnalyticsService->>AnalyticsService: Record Metrics InventoryService->>EventBroker: Publish StockUpdated Event OrderService-->>User: Order Accepted (202)

Real-World Use Cases

  • E-commerce: Order processing, inventory updates, shipping notifications
  • Financial Systems: Transaction processing, fraud detection, account updates
  • IoT Platforms: Sensor data processing, device state changes
  • Social Media: Post updates, notifications, activity feeds
  • Streaming Services: Video processing, recommendation updates
  • Gaming: Player actions, leaderboard updates, achievements

Event-Driven Architecture Implementation

Project Structure

event-driven-app/
├── cmd/
│   ├── order-service/
│   │   └── main.go
│   ├── email-service/
│   │   └── main.go
│   └── inventory-service/
│       └── main.go
├── internal/
│   ├── events/
│   │   ├── event.go
│   │   ├── bus.go
│   │   └── types.go
│   ├── broker/
│   │   ├── rabbitmq.go
│   │   └── kafka.go
│   └── services/
│       ├── order/
│       ├── email/
│       └── inventory/
├── pkg/
│   └── messaging/
│       └── publisher.go
└── go.mod

Core Event System

// internal/events/event.go
package events

import (
    "encoding/json"
    "time"
)

// Event represents a domain event
type Event struct {
    ID          string                 `json:"id"`
    Type        string                 `json:"type"`
    AggregateID string                 `json:"aggregate_id"`
    Payload     map[string]interface{} `json:"payload"`
    Metadata    map[string]string      `json:"metadata"`
    Timestamp   time.Time              `json:"timestamp"`
    Version     int                    `json:"version"`
}

// NewEvent creates a new event
func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event {
    return &Event{
        ID:          generateEventID(),
        Type:        eventType,
        AggregateID: aggregateID,
        Payload:     payload,
        Metadata:    make(map[string]string),
        Timestamp:   time.Now(),
        Version:     1,
    }
}

// ToJSON serializes event to JSON
func (e *Event) ToJSON() ([]byte, error) {
    return json.Marshal(e)
}

// FromJSON deserializes event from JSON
func FromJSON(data []byte) (*Event, error) {
    var event Event
    if err := json.Unmarshal(data, &event); err != nil {
        return nil, err
    }
    return &event, nil
}

func generateEventID() string {
    return fmt.Sprintf("evt_%d", time.Now().UnixNano())
}

// internal/events/types.go
package events

const (
    // User Events
    UserCreated = "user.created"
    UserUpdated = "user.updated"
    UserDeleted = "user.deleted"

    // Order Events
    OrderPlaced     = "order.placed"
    OrderConfirmed  = "order.confirmed"
    OrderCancelled  = "order.cancelled"
    OrderShipped    = "order.shipped"
    OrderDelivered  = "order.delivered"

    // Payment Events
    PaymentProcessed = "payment.processed"
    PaymentFailed    = "payment.failed"
    PaymentRefunded  = "payment.refunded"

    // Inventory Events
    StockUpdated   = "inventory.stock_updated"
    StockDepleted  = "inventory.stock_depleted"
    StockRestocked = "inventory.stock_restocked"
)

// internal/events/bus.go
package events

import (
    "context"
    "fmt"
    "sync"
)

// Handler is a function that processes events
type Handler func(ctx context.Context, event *Event) error

// EventBus manages event publishing and subscriptions
type EventBus struct {
    mu         sync.RWMutex
    handlers   map[string][]Handler
    middleware []Middleware
}

// Middleware wraps event handlers
type Middleware func(Handler) Handler

// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
    return &EventBus{
        handlers:   make(map[string][]Handler),
        middleware: make([]Middleware, 0),
    }
}

// Subscribe registers a handler for an event type
func (b *EventBus) Subscribe(eventType string, handler Handler) {
    b.mu.Lock()
    defer b.mu.Unlock()

    // Apply middleware
    for i := len(b.middleware) - 1; i >= 0; i-- {
        handler = b.middleware[i](handler)
    }

    b.handlers[eventType] = append(b.handlers[eventType], handler)
}

// Publish publishes an event to all subscribers
func (b *EventBus) Publish(ctx context.Context, event *Event) error {
    b.mu.RLock()
    handlers, exists := b.handlers[event.Type]
    b.mu.RUnlock()

    if !exists {
        return nil // No handlers registered
    }

    var wg sync.WaitGroup
    errors := make(chan error, len(handlers))

    for _, handler := range handlers {
        wg.Add(1)
        go func(h Handler) {
            defer wg.Done()
            if err := h(ctx, event); err != nil {
                errors <- fmt.Errorf("handler error: %w", err)
            }
        }(handler)
    }

    wg.Wait()
    close(errors)

    // Collect errors
    var handlerErrors []error
    for err := range errors {
        handlerErrors = append(handlerErrors, err)
    }

    if len(handlerErrors) > 0 {
        return fmt.Errorf("some handlers failed: %v", handlerErrors)
    }

    return nil
}

// Use adds middleware to the event bus
func (b *EventBus) Use(middleware Middleware) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.middleware = append(b.middleware, middleware)
}

Message Broker Integration (RabbitMQ)

// internal/broker/rabbitmq.go
package broker

import (
    "context"
    "fmt"
    "log"

    "github.com/rabbitmq/amqp091-go"

    "app/internal/events"
)

// RabbitMQBroker wraps RabbitMQ for event publishing/consuming
type RabbitMQBroker struct {
    conn    *amqp091.Connection
    channel *amqp091.Channel
}

// NewRabbitMQBroker creates a new RabbitMQ broker
func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) {
    conn, err := amqp091.Dial(url)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
    }

    channel, err := conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("failed to open channel: %w", err)
    }

    return &RabbitMQBroker{
        conn:    conn,
        channel: channel,
    }, nil
}

// DeclareExchange declares an exchange
func (b *RabbitMQBroker) DeclareExchange(name, kind string) error {
    return b.channel.ExchangeDeclare(
        name,   // name
        kind,   // type
        true,   // durable
        false,  // auto-deleted
        false,  // internal
        false,  // no-wait
        nil,    // arguments
    )
}

// Publish publishes an event to RabbitMQ
func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error {
    body, err := event.ToJSON()
    if err != nil {
        return fmt.Errorf("failed to serialize event: %w", err)
    }

    err = b.channel.PublishWithContext(
        ctx,
        exchange,   // exchange
        routingKey, // routing key
        false,      // mandatory
        false,      // immediate
        amqp091.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp091.Persistent,
            MessageId:    event.ID,
            Timestamp:    event.Timestamp,
        },
    )

    if err != nil {
        return fmt.Errorf("failed to publish event: %w", err)
    }

    return nil
}

// Subscribe subscribes to events from a queue
func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error {
    // Declare queue
    queue, err := b.channel.QueueDeclare(
        queueName, // name
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        return fmt.Errorf("failed to declare queue: %w", err)
    }

    // Bind queue to exchange
    err = b.channel.QueueBind(
        queue.Name, // queue name
        routingKey, // routing key
        exchange,   // exchange
        false,
        nil,
    )
    if err != nil {
        return fmt.Errorf("failed to bind queue: %w", err)
    }

    // Consume messages
    msgs, err := b.channel.Consume(
        queue.Name, // queue
        "",         // consumer
        false,      // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )
    if err != nil {
        return fmt.Errorf("failed to register consumer: %w", err)
    }

    // Process messages
    go func() {
        for msg := range msgs {
            event, err := events.FromJSON(msg.Body)
            if err != nil {
                log.Printf("Failed to deserialize event: %v", err)
                msg.Nack(false, false)
                continue
            }

            if err := handler(context.Background(), event); err != nil {
                log.Printf("Handler error: %v", err)
                msg.Nack(false, true) // Requeue on error
                continue
            }

            msg.Ack(false)
        }
    }()

    return nil
}

// Close closes the broker connection
func (b *RabbitMQBroker) Close() error {
    if err := b.channel.Close(); err != nil {
        return err
    }
    return b.conn.Close()
}

Order Service (Event Producer)

// internal/services/order/service.go
package order

import (
    "context"
    "fmt"
    "time"

    "app/internal/events"
)

type Order struct {
    ID        string    `json:"id"`
    UserID    string    `json:"user_id"`
    Total     float64   `json:"total"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
}

type Service struct {
    eventBus *events.EventBus
}

func NewService(eventBus *events.EventBus) *Service {
    return &Service{
        eventBus: eventBus,
    }
}

func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) {
    order := &Order{
        ID:        generateOrderID(),
        UserID:    userID,
        Total:     total,
        Status:    "pending",
        CreatedAt: time.Now(),
    }

    // Publish OrderPlaced event
    event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{
        "order_id": order.ID,
        "user_id":  order.UserID,
        "total":    order.Total,
        "status":   order.Status,
    })

    if err := s.eventBus.Publish(ctx, event); err != nil {
        return nil, fmt.Errorf("failed to publish event: %w", err)
    }

    return order, nil
}

func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error {
    // Publish OrderConfirmed event
    event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{
        "order_id": orderID,
        "status":   "confirmed",
    })

    if err := s.eventBus.Publish(ctx, event); err != nil {
        return fmt.Errorf("failed to publish event: %w", err)
    }

    return nil
}

func (s *Service) CancelOrder(ctx context.Context, orderID string) error {
    // Publish OrderCancelled event
    event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{
        "order_id": orderID,
        "status":   "cancelled",
    })

    if err := s.eventBus.Publish(ctx, event); err != nil {
        return fmt.Errorf("failed to publish event: %w", err)
    }

    return nil
}

func generateOrderID() string {
    return fmt.Sprintf("order_%d", time.Now().UnixNano())
}

Email Service (Event Consumer)

// internal/services/email/service.go
package email

import (
    "context"
    "fmt"
    "log"

    "app/internal/events"
)

type Service struct {
    // SMTP client or email service client
}

func NewService() *Service {
    return &Service{}
}

// HandleOrderPlaced handles order placed events
func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error {
    orderID := event.Payload["order_id"].(string)
    userID := event.Payload["user_id"].(string)
    total := event.Payload["total"].(float64)

    log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID)

    // Send email
    if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf(
        "Your order %s for $%.2f has been placed successfully.", orderID, total,
    )); err != nil {
        return fmt.Errorf("failed to send email: %w", err)
    }

    return nil
}

// HandleOrderShipped handles order shipped events
func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error {
    orderID := event.Payload["order_id"].(string)
    trackingNumber := event.Payload["tracking_number"].(string)

    log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber)

    // Send email
    return nil
}

func (s *Service) sendEmail(recipient, subject, body string) error {
    // Implementation of email sending
    log.Printf("Email sent to %s: %s - %s", recipient, subject, body)
    return nil
}

// RegisterHandlers registers event handlers
func (s *Service) RegisterHandlers(bus *events.EventBus) {
    bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced)
    bus.Subscribe(events.OrderShipped, s.HandleOrderShipped)
}

Inventory Service (Event Consumer)

// internal/services/inventory/service.go
package inventory

import (
    "context"
    "fmt"
    "log"

    "app/internal/events"
)

type Service struct {
    eventBus *events.EventBus
}

func NewService(eventBus *events.EventBus) *Service {
    return &Service{
        eventBus: eventBus,
    }
}

// HandleOrderPlaced handles order placed events
func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error {
    orderID := event.Payload["order_id"].(string)

    log.Printf("Updating inventory for order %s", orderID)

    // Update stock levels
    if err := s.updateStock(orderID); err != nil {
        return fmt.Errorf("failed to update stock: %w", err)
    }

    // Publish StockUpdated event
    stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{
        "order_id":     orderID,
        "stock_level":  100,
        "updated_at":   time.Now(),
    })

    if err := s.eventBus.Publish(ctx, stockEvent); err != nil {
        return fmt.Errorf("failed to publish stock event: %w", err)
    }

    return nil
}

// HandleOrderCancelled handles order cancelled events
func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error {
    orderID := event.Payload["order_id"].(string)

    log.Printf("Restoring inventory for cancelled order %s", orderID)

    // Restore stock
    if err := s.restoreStock(orderID); err != nil {
        return fmt.Errorf("failed to restore stock: %w", err)
    }

    // Publish StockRestocked event
    stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{
        "order_id":   orderID,
        "restored":   true,
    })

    return s.eventBus.Publish(ctx, stockEvent)
}

func (s *Service) updateStock(orderID string) error {
    // Implementation of stock update
    return nil
}

func (s *Service) restoreStock(orderID string) error {
    // Implementation of stock restoration
    return nil
}

// RegisterHandlers registers event handlers
func (s *Service) RegisterHandlers(bus *events.EventBus) {
    bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced)
    bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled)
}

Main Application

// cmd/order-service/main.go
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/gorilla/mux"

    "app/internal/broker"
    "app/internal/events"
    "app/internal/services/email"
    "app/internal/services/inventory"
    "app/internal/services/order"
)

func main() {
    // Initialize event bus
    eventBus := events.NewEventBus()

    // Add logging middleware
    eventBus.Use(loggingMiddleware)

    // Initialize RabbitMQ broker
    rabbitURL := os.Getenv("RABBITMQ_URL")
    if rabbitURL == "" {
        rabbitURL = "amqp://guest:guest@localhost:5672/"
    }

    mqBroker, err := broker.NewRabbitMQBroker(rabbitURL)
    if err != nil {
        log.Fatal(err)
    }
    defer mqBroker.Close()

    // Declare exchange
    if err := mqBroker.DeclareExchange("events", "topic"); err != nil {
        log.Fatal(err)
    }

    // Initialize services
    orderService := order.NewService(eventBus)
    emailService := email.NewService()
    inventoryService := inventory.NewService(eventBus)

    // Register event handlers
    emailService.RegisterHandlers(eventBus)
    inventoryService.RegisterHandlers(eventBus)

    // Bridge event bus to RabbitMQ
    eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error {
        return mqBroker.Publish(ctx, "events", "order.placed", event)
    })

    // Setup HTTP server
    router := mux.NewRouter()

    router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) {
        var req struct {
            UserID string  `json:"user_id"`
            Total  float64 `json:"total"`
        }

        if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusCreated)
        json.NewEncoder(w).Encode(order)
    }).Methods("POST")

    router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
    })

    server := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Start server
    go func() {
        log.Println("Server starting on :8080")
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down server...")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := server.Shutdown(ctx); err != nil {
        log.Fatal(err)
    }
}

func loggingMiddleware(next events.Handler) events.Handler {
    return func(ctx context.Context, event *events.Event) error {
        start := time.Now()
        log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID)

        err := next(ctx, event)

        log.Printf("Event %s handled in %v", event.ID, time.Since(start))
        return err
    }
}

Best Practices

  1. Event Schema Versioning: Version your events to handle schema evolution
  2. Idempotent Handlers: Ensure handlers can process same event multiple times
  3. Error Handling: Implement retry logic and dead letter queues
  4. Event Ordering: Be aware that events may not arrive in order
  5. Event Store: Persist events for audit trail and replay
  6. Monitoring: Track event processing metrics and failures
  7. Testing: Test event handlers independently
  8. Documentation: Document all event types and their payloads

Common Pitfalls

  1. Event Coupling: Events containing too much internal implementation details
  2. Lost Messages: Not ensuring message delivery guarantees
  3. Duplicate Processing: Not handling duplicate events
  4. No Versioning: Breaking changes to event schemas
  5. Synchronous Expectations: Expecting immediate consistency
  6. Overusing Events: Using events for simple request-response patterns
  7. Poor Error Handling: Not handling failures gracefully

When to Use Event-Driven Architecture

Use When:

  • Need to decouple services and reduce dependencies
  • Building reactive systems that respond to state changes
  • Need to scale components independently
  • Want to add new functionality without modifying existing code
  • Building real-time data processing pipelines
  • Need audit trail of all system changes

Avoid When:

  • Need strong consistency guarantees
  • System requires simple request-response patterns
  • Team lacks experience with async programming
  • Debugging asynchronous flows is too complex
  • Real-time responses are critical

Advantages

  • Loose Coupling: Services don’t need to know about each other
  • Scalability: Easy to scale event producers and consumers independently
  • Flexibility: Easy to add new event consumers without changing producers
  • Resilience: Failures in one component don’t cascade
  • Audit Trail: Events provide complete history of system changes
  • Real-time Processing: React to events as they happen
  • Better Performance: Asynchronous processing improves throughput

Disadvantages

  • Eventual Consistency: System state is not immediately consistent
  • Complexity: Harder to understand and debug than synchronous systems
  • Testing Challenges: Difficult to test asynchronous event flows
  • Operational Overhead: Need message broker infrastructure
  • Event Ordering: Hard to guarantee order of events
  • Duplicate Events: Must handle duplicate event delivery
  • Error Handling: More complex error handling and retry logic

Event-Driven Architecture enables building highly scalable, loosely coupled systems that can react to changes in real-time, making it ideal for modern distributed applications.


Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern →