Event-Driven Architecture in Go: Building Reactive Systems

    Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern → What is Event-Driven Architecture? Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling. Key Principles: Event Producers: Components that generate events when state changes Event Consumers: Components that react to events Event Broker: Middleware that routes events from producers to consumers Asynchronous Communication: Decoupled, non-blocking interactions Event Immutability: Events represent past facts that cannot be changed Eventual Consistency: Systems converge to consistent state over time Architecture Overview graph TB subgraph "Event Producers" UserService[User Service] OrderService[Order Service] PaymentService[Payment Service] end subgraph "Event Broker" EventBus[Event Bus / Message Broker] UserTopic[User Events Topic] OrderTopic[Order Events Topic] PaymentTopic[Payment Events Topic] end subgraph "Event Consumers" EmailService[Email Service] AnalyticsService[Analytics Service] NotificationService[Notification Service] InventoryService[Inventory Service] end subgraph "Event Store" EventLog[(Event Log DB)] end UserService -->|UserCreated| UserTopic OrderService -->|OrderPlaced| OrderTopic OrderService -->|OrderConfirmed| OrderTopic PaymentService -->|PaymentProcessed| PaymentTopic UserTopic --> EventBus OrderTopic --> EventBus PaymentTopic --> EventBus EventBus --> EmailService EventBus --> AnalyticsService EventBus --> NotificationService EventBus --> InventoryService EventBus --> EventLog style UserService fill:#e1f5ff style OrderService fill:#fff4e1 style PaymentService fill:#f3e5f5 style EventBus fill:#e8f5e9 Event Flow Pattern sequenceDiagram participant User participant OrderService participant EventBroker participant EmailService participant InventoryService participant AnalyticsService User->>OrderService: Place Order OrderService->>OrderService: Validate Order OrderService->>EventBroker: Publish OrderPlaced Event Note over EventBroker: Event is distributed to all subscribers EventBroker->>EmailService: OrderPlaced Event EventBroker->>InventoryService: OrderPlaced Event EventBroker->>AnalyticsService: OrderPlaced Event EmailService->>EmailService: Send Confirmation Email InventoryService->>InventoryService: Update Stock AnalyticsService->>AnalyticsService: Record Metrics InventoryService->>EventBroker: Publish StockUpdated Event OrderService-->>User: Order Accepted (202) Real-World Use Cases E-commerce: Order processing, inventory updates, shipping notifications Financial Systems: Transaction processing, fraud detection, account updates IoT Platforms: Sensor data processing, device state changes Social Media: Post updates, notifications, activity feeds Streaming Services: Video processing, recommendation updates Gaming: Player actions, leaderboard updates, achievements Event-Driven Architecture Implementation Project Structure event-driven-app/ ├── cmd/ │ ├── order-service/ │ │ └── main.go │ ├── email-service/ │ │ └── main.go │ └── inventory-service/ │ └── main.go ├── internal/ │ ├── events/ │ │ ├── event.go │ │ ├── bus.go │ │ └── types.go │ ├── broker/ │ │ ├── rabbitmq.go │ │ └── kafka.go │ └── services/ │ ├── order/ │ ├── email/ │ └── inventory/ ├── pkg/ │ └── messaging/ │ └── publisher.go └── go.mod Core Event System // internal/events/event.go package events import ( "encoding/json" "time" ) // Event represents a domain event type Event struct { ID string `json:"id"` Type string `json:"type"` AggregateID string `json:"aggregate_id"` Payload map[string]interface{} `json:"payload"` Metadata map[string]string `json:"metadata"` Timestamp time.Time `json:"timestamp"` Version int `json:"version"` } // NewEvent creates a new event func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event { return &Event{ ID: generateEventID(), Type: eventType, AggregateID: aggregateID, Payload: payload, Metadata: make(map[string]string), Timestamp: time.Now(), Version: 1, } } // ToJSON serializes event to JSON func (e *Event) ToJSON() ([]byte, error) { return json.Marshal(e) } // FromJSON deserializes event from JSON func FromJSON(data []byte) (*Event, error) { var event Event if err := json.Unmarshal(data, &event); err != nil { return nil, err } return &event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } // internal/events/types.go package events const ( // User Events UserCreated = "user.created" UserUpdated = "user.updated" UserDeleted = "user.deleted" // Order Events OrderPlaced = "order.placed" OrderConfirmed = "order.confirmed" OrderCancelled = "order.cancelled" OrderShipped = "order.shipped" OrderDelivered = "order.delivered" // Payment Events PaymentProcessed = "payment.processed" PaymentFailed = "payment.failed" PaymentRefunded = "payment.refunded" // Inventory Events StockUpdated = "inventory.stock_updated" StockDepleted = "inventory.stock_depleted" StockRestocked = "inventory.stock_restocked" ) // internal/events/bus.go package events import ( "context" "fmt" "sync" ) // Handler is a function that processes events type Handler func(ctx context.Context, event *Event) error // EventBus manages event publishing and subscriptions type EventBus struct { mu sync.RWMutex handlers map[string][]Handler middleware []Middleware } // Middleware wraps event handlers type Middleware func(Handler) Handler // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ handlers: make(map[string][]Handler), middleware: make([]Middleware, 0), } } // Subscribe registers a handler for an event type func (b *EventBus) Subscribe(eventType string, handler Handler) { b.mu.Lock() defer b.mu.Unlock() // Apply middleware for i := len(b.middleware) - 1; i >= 0; i-- { handler = b.middleware[i](handler) } b.handlers[eventType] = append(b.handlers[eventType], handler) } // Publish publishes an event to all subscribers func (b *EventBus) Publish(ctx context.Context, event *Event) error { b.mu.RLock() handlers, exists := b.handlers[event.Type] b.mu.RUnlock() if !exists { return nil // No handlers registered } var wg sync.WaitGroup errors := make(chan error, len(handlers)) for _, handler := range handlers { wg.Add(1) go func(h Handler) { defer wg.Done() if err := h(ctx, event); err != nil { errors <- fmt.Errorf("handler error: %w", err) } }(handler) } wg.Wait() close(errors) // Collect errors var handlerErrors []error for err := range errors { handlerErrors = append(handlerErrors, err) } if len(handlerErrors) > 0 { return fmt.Errorf("some handlers failed: %v", handlerErrors) } return nil } // Use adds middleware to the event bus func (b *EventBus) Use(middleware Middleware) { b.mu.Lock() defer b.mu.Unlock() b.middleware = append(b.middleware, middleware) } Message Broker Integration (RabbitMQ) // internal/broker/rabbitmq.go package broker import ( "context" "fmt" "log" "github.com/rabbitmq/amqp091-go" "app/internal/events" ) // RabbitMQBroker wraps RabbitMQ for event publishing/consuming type RabbitMQBroker struct { conn *amqp091.Connection channel *amqp091.Channel } // NewRabbitMQBroker creates a new RabbitMQ broker func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) { conn, err := amqp091.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } channel, err := conn.Channel() if err != nil { return nil, fmt.Errorf("failed to open channel: %w", err) } return &RabbitMQBroker{ conn: conn, channel: channel, }, nil } // DeclareExchange declares an exchange func (b *RabbitMQBroker) DeclareExchange(name, kind string) error { return b.channel.ExchangeDeclare( name, // name kind, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) } // Publish publishes an event to RabbitMQ func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error { body, err := event.ToJSON() if err != nil { return fmt.Errorf("failed to serialize event: %w", err) } err = b.channel.PublishWithContext( ctx, exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp091.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp091.Persistent, MessageId: event.ID, Timestamp: event.Timestamp, }, ) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } // Subscribe subscribes to events from a queue func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error { // Declare queue queue, err := b.channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } // Bind queue to exchange err = b.channel.QueueBind( queue.Name, // queue name routingKey, // routing key exchange, // exchange false, nil, ) if err != nil { return fmt.Errorf("failed to bind queue: %w", err) } // Consume messages msgs, err := b.channel.Consume( queue.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("failed to register consumer: %w", err) } // Process messages go func() { for msg := range msgs { event, err := events.FromJSON(msg.Body) if err != nil { log.Printf("Failed to deserialize event: %v", err) msg.Nack(false, false) continue } if err := handler(context.Background(), event); err != nil { log.Printf("Handler error: %v", err) msg.Nack(false, true) // Requeue on error continue } msg.Ack(false) } }() return nil } // Close closes the broker connection func (b *RabbitMQBroker) Close() error { if err := b.channel.Close(); err != nil { return err } return b.conn.Close() } Order Service (Event Producer) // internal/services/order/service.go package order import ( "context" "fmt" "time" "app/internal/events" ) type Order struct { ID string `json:"id"` UserID string `json:"user_id"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) { order := &Order{ ID: generateOrderID(), UserID: userID, Total: total, Status: "pending", CreatedAt: time.Now(), } // Publish OrderPlaced event event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{ "order_id": order.ID, "user_id": order.UserID, "total": order.Total, "status": order.Status, }) if err := s.eventBus.Publish(ctx, event); err != nil { return nil, fmt.Errorf("failed to publish event: %w", err) } return order, nil } func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error { // Publish OrderConfirmed event event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{ "order_id": orderID, "status": "confirmed", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { // Publish OrderCancelled event event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{ "order_id": orderID, "status": "cancelled", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func generateOrderID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Email Service (Event Consumer) // internal/services/email/service.go package email import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { // SMTP client or email service client } func NewService() *Service { return &Service{} } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) userID := event.Payload["user_id"].(string) total := event.Payload["total"].(float64) log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID) // Send email if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf( "Your order %s for $%.2f has been placed successfully.", orderID, total, )); err != nil { return fmt.Errorf("failed to send email: %w", err) } return nil } // HandleOrderShipped handles order shipped events func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) trackingNumber := event.Payload["tracking_number"].(string) log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber) // Send email return nil } func (s *Service) sendEmail(recipient, subject, body string) error { // Implementation of email sending log.Printf("Email sent to %s: %s - %s", recipient, subject, body) return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderShipped, s.HandleOrderShipped) } Inventory Service (Event Consumer) // internal/services/inventory/service.go package inventory import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Updating inventory for order %s", orderID) // Update stock levels if err := s.updateStock(orderID); err != nil { return fmt.Errorf("failed to update stock: %w", err) } // Publish StockUpdated event stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{ "order_id": orderID, "stock_level": 100, "updated_at": time.Now(), }) if err := s.eventBus.Publish(ctx, stockEvent); err != nil { return fmt.Errorf("failed to publish stock event: %w", err) } return nil } // HandleOrderCancelled handles order cancelled events func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Restoring inventory for cancelled order %s", orderID) // Restore stock if err := s.restoreStock(orderID); err != nil { return fmt.Errorf("failed to restore stock: %w", err) } // Publish StockRestocked event stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{ "order_id": orderID, "restored": true, }) return s.eventBus.Publish(ctx, stockEvent) } func (s *Service) updateStock(orderID string) error { // Implementation of stock update return nil } func (s *Service) restoreStock(orderID string) error { // Implementation of stock restoration return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled) } Main Application // cmd/order-service/main.go package main import ( "context" "encoding/json" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gorilla/mux" "app/internal/broker" "app/internal/events" "app/internal/services/email" "app/internal/services/inventory" "app/internal/services/order" ) func main() { // Initialize event bus eventBus := events.NewEventBus() // Add logging middleware eventBus.Use(loggingMiddleware) // Initialize RabbitMQ broker rabbitURL := os.Getenv("RABBITMQ_URL") if rabbitURL == "" { rabbitURL = "amqp://guest:guest@localhost:5672/" } mqBroker, err := broker.NewRabbitMQBroker(rabbitURL) if err != nil { log.Fatal(err) } defer mqBroker.Close() // Declare exchange if err := mqBroker.DeclareExchange("events", "topic"); err != nil { log.Fatal(err) } // Initialize services orderService := order.NewService(eventBus) emailService := email.NewService() inventoryService := inventory.NewService(eventBus) // Register event handlers emailService.RegisterHandlers(eventBus) inventoryService.RegisterHandlers(eventBus) // Bridge event bus to RabbitMQ eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error { return mqBroker.Publish(ctx, "events", "order.placed", event) }) // Setup HTTP server router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Total float64 `json:"total"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) server := &http.Server{ Addr: ":8080", Handler: router, } // Start server go func() { log.Println("Server starting on :8080") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting down server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Fatal(err) } } func loggingMiddleware(next events.Handler) events.Handler { return func(ctx context.Context, event *events.Event) error { start := time.Now() log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID) err := next(ctx, event) log.Printf("Event %s handled in %v", event.ID, time.Since(start)) return err } } Best Practices Event Schema Versioning: Version your events to handle schema evolution Idempotent Handlers: Ensure handlers can process same event multiple times Error Handling: Implement retry logic and dead letter queues Event Ordering: Be aware that events may not arrive in order Event Store: Persist events for audit trail and replay Monitoring: Track event processing metrics and failures Testing: Test event handlers independently Documentation: Document all event types and their payloads Common Pitfalls Event Coupling: Events containing too much internal implementation details Lost Messages: Not ensuring message delivery guarantees Duplicate Processing: Not handling duplicate events No Versioning: Breaking changes to event schemas Synchronous Expectations: Expecting immediate consistency Overusing Events: Using events for simple request-response patterns Poor Error Handling: Not handling failures gracefully When to Use Event-Driven Architecture Use When: ...

    January 21, 2025 · 12 min · Rafiul Alam