Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern →
What is Event-Driven Architecture?
Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling.
Key Principles:
- Event Producers: Components that generate events when state changes
- Event Consumers: Components that react to events
- Event Broker: Middleware that routes events from producers to consumers
- Asynchronous Communication: Decoupled, non-blocking interactions
- Event Immutability: Events represent past facts that cannot be changed
- Eventual Consistency: Systems converge to consistent state over time
Architecture Overview
Event Flow Pattern
Real-World Use Cases
- E-commerce: Order processing, inventory updates, shipping notifications
- Financial Systems: Transaction processing, fraud detection, account updates
- IoT Platforms: Sensor data processing, device state changes
- Social Media: Post updates, notifications, activity feeds
- Streaming Services: Video processing, recommendation updates
- Gaming: Player actions, leaderboard updates, achievements
Event-Driven Architecture Implementation
Project Structure
event-driven-app/
├── cmd/
│ ├── order-service/
│ │ └── main.go
│ ├── email-service/
│ │ └── main.go
│ └── inventory-service/
│ └── main.go
├── internal/
│ ├── events/
│ │ ├── event.go
│ │ ├── bus.go
│ │ └── types.go
│ ├── broker/
│ │ ├── rabbitmq.go
│ │ └── kafka.go
│ └── services/
│ ├── order/
│ ├── email/
│ └── inventory/
├── pkg/
│ └── messaging/
│ └── publisher.go
└── go.mod
Core Event System
// internal/events/event.go
package events
import (
"encoding/json"
"time"
)
// Event represents a domain event
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
AggregateID string `json:"aggregate_id"`
Payload map[string]interface{} `json:"payload"`
Metadata map[string]string `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
// NewEvent creates a new event
func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event {
return &Event{
ID: generateEventID(),
Type: eventType,
AggregateID: aggregateID,
Payload: payload,
Metadata: make(map[string]string),
Timestamp: time.Now(),
Version: 1,
}
}
// ToJSON serializes event to JSON
func (e *Event) ToJSON() ([]byte, error) {
return json.Marshal(e)
}
// FromJSON deserializes event from JSON
func FromJSON(data []byte) (*Event, error) {
var event Event
if err := json.Unmarshal(data, &event); err != nil {
return nil, err
}
return &event, nil
}
func generateEventID() string {
return fmt.Sprintf("evt_%d", time.Now().UnixNano())
}
// internal/events/types.go
package events
const (
// User Events
UserCreated = "user.created"
UserUpdated = "user.updated"
UserDeleted = "user.deleted"
// Order Events
OrderPlaced = "order.placed"
OrderConfirmed = "order.confirmed"
OrderCancelled = "order.cancelled"
OrderShipped = "order.shipped"
OrderDelivered = "order.delivered"
// Payment Events
PaymentProcessed = "payment.processed"
PaymentFailed = "payment.failed"
PaymentRefunded = "payment.refunded"
// Inventory Events
StockUpdated = "inventory.stock_updated"
StockDepleted = "inventory.stock_depleted"
StockRestocked = "inventory.stock_restocked"
)
// internal/events/bus.go
package events
import (
"context"
"fmt"
"sync"
)
// Handler is a function that processes events
type Handler func(ctx context.Context, event *Event) error
// EventBus manages event publishing and subscriptions
type EventBus struct {
mu sync.RWMutex
handlers map[string][]Handler
middleware []Middleware
}
// Middleware wraps event handlers
type Middleware func(Handler) Handler
// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]Handler),
middleware: make([]Middleware, 0),
}
}
// Subscribe registers a handler for an event type
func (b *EventBus) Subscribe(eventType string, handler Handler) {
b.mu.Lock()
defer b.mu.Unlock()
// Apply middleware
for i := len(b.middleware) - 1; i >= 0; i-- {
handler = b.middleware[i](handler)
}
b.handlers[eventType] = append(b.handlers[eventType], handler)
}
// Publish publishes an event to all subscribers
func (b *EventBus) Publish(ctx context.Context, event *Event) error {
b.mu.RLock()
handlers, exists := b.handlers[event.Type]
b.mu.RUnlock()
if !exists {
return nil // No handlers registered
}
var wg sync.WaitGroup
errors := make(chan error, len(handlers))
for _, handler := range handlers {
wg.Add(1)
go func(h Handler) {
defer wg.Done()
if err := h(ctx, event); err != nil {
errors <- fmt.Errorf("handler error: %w", err)
}
}(handler)
}
wg.Wait()
close(errors)
// Collect errors
var handlerErrors []error
for err := range errors {
handlerErrors = append(handlerErrors, err)
}
if len(handlerErrors) > 0 {
return fmt.Errorf("some handlers failed: %v", handlerErrors)
}
return nil
}
// Use adds middleware to the event bus
func (b *EventBus) Use(middleware Middleware) {
b.mu.Lock()
defer b.mu.Unlock()
b.middleware = append(b.middleware, middleware)
}
Message Broker Integration (RabbitMQ)
// internal/broker/rabbitmq.go
package broker
import (
"context"
"fmt"
"log"
"github.com/rabbitmq/amqp091-go"
"app/internal/events"
)
// RabbitMQBroker wraps RabbitMQ for event publishing/consuming
type RabbitMQBroker struct {
conn *amqp091.Connection
channel *amqp091.Channel
}
// NewRabbitMQBroker creates a new RabbitMQ broker
func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) {
conn, err := amqp091.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
channel, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to open channel: %w", err)
}
return &RabbitMQBroker{
conn: conn,
channel: channel,
}, nil
}
// DeclareExchange declares an exchange
func (b *RabbitMQBroker) DeclareExchange(name, kind string) error {
return b.channel.ExchangeDeclare(
name, // name
kind, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}
// Publish publishes an event to RabbitMQ
func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error {
body, err := event.ToJSON()
if err != nil {
return fmt.Errorf("failed to serialize event: %w", err)
}
err = b.channel.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp091.Persistent,
MessageId: event.ID,
Timestamp: event.Timestamp,
},
)
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}
// Subscribe subscribes to events from a queue
func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error {
// Declare queue
queue, err := b.channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}
// Bind queue to exchange
err = b.channel.QueueBind(
queue.Name, // queue name
routingKey, // routing key
exchange, // exchange
false,
nil,
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}
// Consume messages
msgs, err := b.channel.Consume(
queue.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to register consumer: %w", err)
}
// Process messages
go func() {
for msg := range msgs {
event, err := events.FromJSON(msg.Body)
if err != nil {
log.Printf("Failed to deserialize event: %v", err)
msg.Nack(false, false)
continue
}
if err := handler(context.Background(), event); err != nil {
log.Printf("Handler error: %v", err)
msg.Nack(false, true) // Requeue on error
continue
}
msg.Ack(false)
}
}()
return nil
}
// Close closes the broker connection
func (b *RabbitMQBroker) Close() error {
if err := b.channel.Close(); err != nil {
return err
}
return b.conn.Close()
}
Order Service (Event Producer)
// internal/services/order/service.go
package order
import (
"context"
"fmt"
"time"
"app/internal/events"
)
type Order struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Total float64 `json:"total"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
type Service struct {
eventBus *events.EventBus
}
func NewService(eventBus *events.EventBus) *Service {
return &Service{
eventBus: eventBus,
}
}
func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) {
order := &Order{
ID: generateOrderID(),
UserID: userID,
Total: total,
Status: "pending",
CreatedAt: time.Now(),
}
// Publish OrderPlaced event
event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{
"order_id": order.ID,
"user_id": order.UserID,
"total": order.Total,
"status": order.Status,
})
if err := s.eventBus.Publish(ctx, event); err != nil {
return nil, fmt.Errorf("failed to publish event: %w", err)
}
return order, nil
}
func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error {
// Publish OrderConfirmed event
event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{
"order_id": orderID,
"status": "confirmed",
})
if err := s.eventBus.Publish(ctx, event); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}
func (s *Service) CancelOrder(ctx context.Context, orderID string) error {
// Publish OrderCancelled event
event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{
"order_id": orderID,
"status": "cancelled",
})
if err := s.eventBus.Publish(ctx, event); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}
func generateOrderID() string {
return fmt.Sprintf("order_%d", time.Now().UnixNano())
}
Email Service (Event Consumer)
// internal/services/email/service.go
package email
import (
"context"
"fmt"
"log"
"app/internal/events"
)
type Service struct {
// SMTP client or email service client
}
func NewService() *Service {
return &Service{}
}
// HandleOrderPlaced handles order placed events
func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error {
orderID := event.Payload["order_id"].(string)
userID := event.Payload["user_id"].(string)
total := event.Payload["total"].(float64)
log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID)
// Send email
if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf(
"Your order %s for $%.2f has been placed successfully.", orderID, total,
)); err != nil {
return fmt.Errorf("failed to send email: %w", err)
}
return nil
}
// HandleOrderShipped handles order shipped events
func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error {
orderID := event.Payload["order_id"].(string)
trackingNumber := event.Payload["tracking_number"].(string)
log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber)
// Send email
return nil
}
func (s *Service) sendEmail(recipient, subject, body string) error {
// Implementation of email sending
log.Printf("Email sent to %s: %s - %s", recipient, subject, body)
return nil
}
// RegisterHandlers registers event handlers
func (s *Service) RegisterHandlers(bus *events.EventBus) {
bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced)
bus.Subscribe(events.OrderShipped, s.HandleOrderShipped)
}
Inventory Service (Event Consumer)
// internal/services/inventory/service.go
package inventory
import (
"context"
"fmt"
"log"
"app/internal/events"
)
type Service struct {
eventBus *events.EventBus
}
func NewService(eventBus *events.EventBus) *Service {
return &Service{
eventBus: eventBus,
}
}
// HandleOrderPlaced handles order placed events
func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error {
orderID := event.Payload["order_id"].(string)
log.Printf("Updating inventory for order %s", orderID)
// Update stock levels
if err := s.updateStock(orderID); err != nil {
return fmt.Errorf("failed to update stock: %w", err)
}
// Publish StockUpdated event
stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{
"order_id": orderID,
"stock_level": 100,
"updated_at": time.Now(),
})
if err := s.eventBus.Publish(ctx, stockEvent); err != nil {
return fmt.Errorf("failed to publish stock event: %w", err)
}
return nil
}
// HandleOrderCancelled handles order cancelled events
func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error {
orderID := event.Payload["order_id"].(string)
log.Printf("Restoring inventory for cancelled order %s", orderID)
// Restore stock
if err := s.restoreStock(orderID); err != nil {
return fmt.Errorf("failed to restore stock: %w", err)
}
// Publish StockRestocked event
stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{
"order_id": orderID,
"restored": true,
})
return s.eventBus.Publish(ctx, stockEvent)
}
func (s *Service) updateStock(orderID string) error {
// Implementation of stock update
return nil
}
func (s *Service) restoreStock(orderID string) error {
// Implementation of stock restoration
return nil
}
// RegisterHandlers registers event handlers
func (s *Service) RegisterHandlers(bus *events.EventBus) {
bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced)
bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled)
}
Main Application
// cmd/order-service/main.go
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gorilla/mux"
"app/internal/broker"
"app/internal/events"
"app/internal/services/email"
"app/internal/services/inventory"
"app/internal/services/order"
)
func main() {
// Initialize event bus
eventBus := events.NewEventBus()
// Add logging middleware
eventBus.Use(loggingMiddleware)
// Initialize RabbitMQ broker
rabbitURL := os.Getenv("RABBITMQ_URL")
if rabbitURL == "" {
rabbitURL = "amqp://guest:guest@localhost:5672/"
}
mqBroker, err := broker.NewRabbitMQBroker(rabbitURL)
if err != nil {
log.Fatal(err)
}
defer mqBroker.Close()
// Declare exchange
if err := mqBroker.DeclareExchange("events", "topic"); err != nil {
log.Fatal(err)
}
// Initialize services
orderService := order.NewService(eventBus)
emailService := email.NewService()
inventoryService := inventory.NewService(eventBus)
// Register event handlers
emailService.RegisterHandlers(eventBus)
inventoryService.RegisterHandlers(eventBus)
// Bridge event bus to RabbitMQ
eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error {
return mqBroker.Publish(ctx, "events", "order.placed", event)
})
// Setup HTTP server
router := mux.NewRouter()
router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) {
var req struct {
UserID string `json:"user_id"`
Total float64 `json:"total"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(order)
}).Methods("POST")
router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
server := &http.Server{
Addr: ":8080",
Handler: router,
}
// Start server
go func() {
log.Println("Server starting on :8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}
func loggingMiddleware(next events.Handler) events.Handler {
return func(ctx context.Context, event *events.Event) error {
start := time.Now()
log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID)
err := next(ctx, event)
log.Printf("Event %s handled in %v", event.ID, time.Since(start))
return err
}
}
Best Practices
- Event Schema Versioning: Version your events to handle schema evolution
- Idempotent Handlers: Ensure handlers can process same event multiple times
- Error Handling: Implement retry logic and dead letter queues
- Event Ordering: Be aware that events may not arrive in order
- Event Store: Persist events for audit trail and replay
- Monitoring: Track event processing metrics and failures
- Testing: Test event handlers independently
- Documentation: Document all event types and their payloads
Common Pitfalls
- Event Coupling: Events containing too much internal implementation details
- Lost Messages: Not ensuring message delivery guarantees
- Duplicate Processing: Not handling duplicate events
- No Versioning: Breaking changes to event schemas
- Synchronous Expectations: Expecting immediate consistency
- Overusing Events: Using events for simple request-response patterns
- Poor Error Handling: Not handling failures gracefully
When to Use Event-Driven Architecture
Use When:
- Need to decouple services and reduce dependencies
- Building reactive systems that respond to state changes
- Need to scale components independently
- Want to add new functionality without modifying existing code
- Building real-time data processing pipelines
- Need audit trail of all system changes
Avoid When:
- Need strong consistency guarantees
- System requires simple request-response patterns
- Team lacks experience with async programming
- Debugging asynchronous flows is too complex
- Real-time responses are critical
Advantages
- Loose Coupling: Services don’t need to know about each other
- Scalability: Easy to scale event producers and consumers independently
- Flexibility: Easy to add new event consumers without changing producers
- Resilience: Failures in one component don’t cascade
- Audit Trail: Events provide complete history of system changes
- Real-time Processing: React to events as they happen
- Better Performance: Asynchronous processing improves throughput
Disadvantages
- Eventual Consistency: System state is not immediately consistent
- Complexity: Harder to understand and debug than synchronous systems
- Testing Challenges: Difficult to test asynchronous event flows
- Operational Overhead: Need message broker infrastructure
- Event Ordering: Hard to guarantee order of events
- Duplicate Events: Must handle duplicate event delivery
- Error Handling: More complex error handling and retry logic
Event-Driven Architecture enables building highly scalable, loosely coupled systems that can react to changes in real-time, making it ideal for modern distributed applications.
Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern →