Building Scalable Event-Driven Microservices in Go: A User and Notes Service Example

    In the world of modern software development, the question isn’t whether you’ll need to scale-it’s when. If you’ve ever watched a monolithic application groan under increasing load, fought to deploy a single feature without breaking everything else, or felt trapped by technology choices made years ago, you’re not alone. Let’s explore how event-driven microservices in Go can solve these challenges and build systems that scale gracefully with your ambitions. The Pain of the Monolith Picture this: Your application has grown from a simple CRUD app to a complex beast handling users, notes, notifications, analytics, and more. Every deployment is a nail-biting experience because changing one module might break three others. Your database has become a bottleneck, and adding more servers doesn’t help because everything shares the same database connection pool. Different teams step on each other’s toes, and that cool new technology? Sorry, the entire stack is locked into decisions made in 2015. ...

    October 15, 2025 · 11 min · Rafiul Alam

    Event-Driven Architecture in Go: Building Reactive Systems

    Go Architecture Patterns Series: ← Previous: Microservices Architecture | Series Overview | Next: CQRS Pattern → What is Event-Driven Architecture? Event-Driven Architecture (EDA) is a design pattern where components communicate by producing and consuming events. Events represent state changes or significant occurrences in the system, allowing components to react asynchronously without direct coupling. Key Principles: Event Producers: Components that generate events when state changes Event Consumers: Components that react to events Event Broker: Middleware that routes events from producers to consumers Asynchronous Communication: Decoupled, non-blocking interactions Event Immutability: Events represent past facts that cannot be changed Eventual Consistency: Systems converge to consistent state over time Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% graph TB subgraph "Event Producers" UserService[User Service] OrderService[Order Service] PaymentService[Payment Service] end subgraph "Event Broker" EventBus[Event Bus / Message Broker] UserTopic[User Events Topic] OrderTopic[Order Events Topic] PaymentTopic[Payment Events Topic] end subgraph "Event Consumers" EmailService[Email Service] AnalyticsService[Analytics Service] NotificationService[Notification Service] InventoryService[Inventory Service] end subgraph "Event Store" EventLog[(Event Log DB)] end UserService -->|UserCreated| UserTopic OrderService -->|OrderPlaced| OrderTopic OrderService -->|OrderConfirmed| OrderTopic PaymentService -->|PaymentProcessed| PaymentTopic UserTopic --> EventBus OrderTopic --> EventBus PaymentTopic --> EventBus EventBus --> EmailService EventBus --> AnalyticsService EventBus --> NotificationService EventBus --> InventoryService EventBus --> EventLog style UserService fill:#1e3a5f,color:#fff style OrderService fill:#78350f,color:#fff style PaymentService fill:#4c1d95,color:#fff style EventBus fill:#134e4a,color:#fff Event Flow Pattern %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#e5e7eb','secondaryTextColor':'#e5e7eb','tertiaryTextColor':'#e5e7eb','textColor':'#e5e7eb','nodeTextColor':'#e5e7eb','edgeLabelText':'#e5e7eb','clusterTextColor':'#e5e7eb','actorTextColor':'#e5e7eb'}}}%% sequenceDiagram participant User participant OrderService participant EventBroker participant EmailService participant InventoryService participant AnalyticsService User->>OrderService: Place Order OrderService->>OrderService: Validate Order OrderService->>EventBroker: Publish OrderPlaced Event Note over EventBroker: Event is distributed to all subscribers EventBroker->>EmailService: OrderPlaced Event EventBroker->>InventoryService: OrderPlaced Event EventBroker->>AnalyticsService: OrderPlaced Event EmailService->>EmailService: Send Confirmation Email InventoryService->>InventoryService: Update Stock AnalyticsService->>AnalyticsService: Record Metrics InventoryService->>EventBroker: Publish StockUpdated Event OrderService-->>User: Order Accepted (202) Real-World Use Cases E-commerce: Order processing, inventory updates, shipping notifications Financial Systems: Transaction processing, fraud detection, account updates IoT Platforms: Sensor data processing, device state changes Social Media: Post updates, notifications, activity feeds Streaming Services: Video processing, recommendation updates Gaming: Player actions, leaderboard updates, achievements Event-Driven Architecture Implementation Project Structure event-driven-app/ ├── cmd/ │ ├── order-service/ │ │ └── main.go │ ├── email-service/ │ │ └── main.go │ └── inventory-service/ │ └── main.go ├── internal/ │ ├── events/ │ │ ├── event.go │ │ ├── bus.go │ │ └── types.go │ ├── broker/ │ │ ├── rabbitmq.go │ │ └── kafka.go │ └── services/ │ ├── order/ │ ├── email/ │ └── inventory/ ├── pkg/ │ └── messaging/ │ └── publisher.go └── go.mod Core Event System // internal/events/event.go package events import ( "encoding/json" "time" ) // Event represents a domain event type Event struct { ID string `json:"id"` Type string `json:"type"` AggregateID string `json:"aggregate_id"` Payload map[string]interface{} `json:"payload"` Metadata map[string]string `json:"metadata"` Timestamp time.Time `json:"timestamp"` Version int `json:"version"` } // NewEvent creates a new event func NewEvent(eventType, aggregateID string, payload map[string]interface{}) *Event { return &Event{ ID: generateEventID(), Type: eventType, AggregateID: aggregateID, Payload: payload, Metadata: make(map[string]string), Timestamp: time.Now(), Version: 1, } } // ToJSON serializes event to JSON func (e *Event) ToJSON() ([]byte, error) { return json.Marshal(e) } // FromJSON deserializes event from JSON func FromJSON(data []byte) (*Event, error) { var event Event if err := json.Unmarshal(data, &event); err != nil { return nil, err } return &event, nil } func generateEventID() string { return fmt.Sprintf("evt_%d", time.Now().UnixNano()) } // internal/events/types.go package events const ( // User Events UserCreated = "user.created" UserUpdated = "user.updated" UserDeleted = "user.deleted" // Order Events OrderPlaced = "order.placed" OrderConfirmed = "order.confirmed" OrderCancelled = "order.cancelled" OrderShipped = "order.shipped" OrderDelivered = "order.delivered" // Payment Events PaymentProcessed = "payment.processed" PaymentFailed = "payment.failed" PaymentRefunded = "payment.refunded" // Inventory Events StockUpdated = "inventory.stock_updated" StockDepleted = "inventory.stock_depleted" StockRestocked = "inventory.stock_restocked" ) // internal/events/bus.go package events import ( "context" "fmt" "sync" ) // Handler is a function that processes events type Handler func(ctx context.Context, event *Event) error // EventBus manages event publishing and subscriptions type EventBus struct { mu sync.RWMutex handlers map[string][]Handler middleware []Middleware } // Middleware wraps event handlers type Middleware func(Handler) Handler // NewEventBus creates a new event bus func NewEventBus() *EventBus { return &EventBus{ handlers: make(map[string][]Handler), middleware: make([]Middleware, 0), } } // Subscribe registers a handler for an event type func (b *EventBus) Subscribe(eventType string, handler Handler) { b.mu.Lock() defer b.mu.Unlock() // Apply middleware for i := len(b.middleware) - 1; i >= 0; i-- { handler = b.middleware[i](handler) } b.handlers[eventType] = append(b.handlers[eventType], handler) } // Publish publishes an event to all subscribers func (b *EventBus) Publish(ctx context.Context, event *Event) error { b.mu.RLock() handlers, exists := b.handlers[event.Type] b.mu.RUnlock() if !exists { return nil // No handlers registered } var wg sync.WaitGroup errors := make(chan error, len(handlers)) for _, handler := range handlers { wg.Add(1) go func(h Handler) { defer wg.Done() if err := h(ctx, event); err != nil { errors <- fmt.Errorf("handler error: %w", err) } }(handler) } wg.Wait() close(errors) // Collect errors var handlerErrors []error for err := range errors { handlerErrors = append(handlerErrors, err) } if len(handlerErrors) > 0 { return fmt.Errorf("some handlers failed: %v", handlerErrors) } return nil } // Use adds middleware to the event bus func (b *EventBus) Use(middleware Middleware) { b.mu.Lock() defer b.mu.Unlock() b.middleware = append(b.middleware, middleware) } Message Broker Integration (RabbitMQ) // internal/broker/rabbitmq.go package broker import ( "context" "fmt" "log" "github.com/rabbitmq/amqp091-go" "app/internal/events" ) // RabbitMQBroker wraps RabbitMQ for event publishing/consuming type RabbitMQBroker struct { conn *amqp091.Connection channel *amqp091.Channel } // NewRabbitMQBroker creates a new RabbitMQ broker func NewRabbitMQBroker(url string) (*RabbitMQBroker, error) { conn, err := amqp091.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } channel, err := conn.Channel() if err != nil { return nil, fmt.Errorf("failed to open channel: %w", err) } return &RabbitMQBroker{ conn: conn, channel: channel, }, nil } // DeclareExchange declares an exchange func (b *RabbitMQBroker) DeclareExchange(name, kind string) error { return b.channel.ExchangeDeclare( name, // name kind, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) } // Publish publishes an event to RabbitMQ func (b *RabbitMQBroker) Publish(ctx context.Context, exchange, routingKey string, event *events.Event) error { body, err := event.ToJSON() if err != nil { return fmt.Errorf("failed to serialize event: %w", err) } err = b.channel.PublishWithContext( ctx, exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp091.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp091.Persistent, MessageId: event.ID, Timestamp: event.Timestamp, }, ) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } // Subscribe subscribes to events from a queue func (b *RabbitMQBroker) Subscribe(queueName, exchange, routingKey string, handler events.Handler) error { // Declare queue queue, err := b.channel.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } // Bind queue to exchange err = b.channel.QueueBind( queue.Name, // queue name routingKey, // routing key exchange, // exchange false, nil, ) if err != nil { return fmt.Errorf("failed to bind queue: %w", err) } // Consume messages msgs, err := b.channel.Consume( queue.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("failed to register consumer: %w", err) } // Process messages go func() { for msg := range msgs { event, err := events.FromJSON(msg.Body) if err != nil { log.Printf("Failed to deserialize event: %v", err) msg.Nack(false, false) continue } if err := handler(context.Background(), event); err != nil { log.Printf("Handler error: %v", err) msg.Nack(false, true) // Requeue on error continue } msg.Ack(false) } }() return nil } // Close closes the broker connection func (b *RabbitMQBroker) Close() error { if err := b.channel.Close(); err != nil { return err } return b.conn.Close() } Order Service (Event Producer) // internal/services/order/service.go package order import ( "context" "fmt" "time" "app/internal/events" ) type Order struct { ID string `json:"id"` UserID string `json:"user_id"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } func (s *Service) PlaceOrder(ctx context.Context, userID string, total float64) (*Order, error) { order := &Order{ ID: generateOrderID(), UserID: userID, Total: total, Status: "pending", CreatedAt: time.Now(), } // Publish OrderPlaced event event := events.NewEvent(events.OrderPlaced, order.ID, map[string]interface{}{ "order_id": order.ID, "user_id": order.UserID, "total": order.Total, "status": order.Status, }) if err := s.eventBus.Publish(ctx, event); err != nil { return nil, fmt.Errorf("failed to publish event: %w", err) } return order, nil } func (s *Service) ConfirmOrder(ctx context.Context, orderID string) error { // Publish OrderConfirmed event event := events.NewEvent(events.OrderConfirmed, orderID, map[string]interface{}{ "order_id": orderID, "status": "confirmed", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func (s *Service) CancelOrder(ctx context.Context, orderID string) error { // Publish OrderCancelled event event := events.NewEvent(events.OrderCancelled, orderID, map[string]interface{}{ "order_id": orderID, "status": "cancelled", }) if err := s.eventBus.Publish(ctx, event); err != nil { return fmt.Errorf("failed to publish event: %w", err) } return nil } func generateOrderID() string { return fmt.Sprintf("order_%d", time.Now().UnixNano()) } Email Service (Event Consumer) // internal/services/email/service.go package email import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { // SMTP client or email service client } func NewService() *Service { return &Service{} } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) userID := event.Payload["user_id"].(string) total := event.Payload["total"].(float64) log.Printf("Sending order confirmation email for order %s to user %s", orderID, userID) // Send email if err := s.sendEmail(userID, "Order Confirmation", fmt.Sprintf( "Your order %s for $%.2f has been placed successfully.", orderID, total, )); err != nil { return fmt.Errorf("failed to send email: %w", err) } return nil } // HandleOrderShipped handles order shipped events func (s *Service) HandleOrderShipped(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) trackingNumber := event.Payload["tracking_number"].(string) log.Printf("Sending shipping notification for order %s with tracking %s", orderID, trackingNumber) // Send email return nil } func (s *Service) sendEmail(recipient, subject, body string) error { // Implementation of email sending log.Printf("Email sent to %s: %s - %s", recipient, subject, body) return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderShipped, s.HandleOrderShipped) } Inventory Service (Event Consumer) // internal/services/inventory/service.go package inventory import ( "context" "fmt" "log" "app/internal/events" ) type Service struct { eventBus *events.EventBus } func NewService(eventBus *events.EventBus) *Service { return &Service{ eventBus: eventBus, } } // HandleOrderPlaced handles order placed events func (s *Service) HandleOrderPlaced(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Updating inventory for order %s", orderID) // Update stock levels if err := s.updateStock(orderID); err != nil { return fmt.Errorf("failed to update stock: %w", err) } // Publish StockUpdated event stockEvent := events.NewEvent(events.StockUpdated, orderID, map[string]interface{}{ "order_id": orderID, "stock_level": 100, "updated_at": time.Now(), }) if err := s.eventBus.Publish(ctx, stockEvent); err != nil { return fmt.Errorf("failed to publish stock event: %w", err) } return nil } // HandleOrderCancelled handles order cancelled events func (s *Service) HandleOrderCancelled(ctx context.Context, event *events.Event) error { orderID := event.Payload["order_id"].(string) log.Printf("Restoring inventory for cancelled order %s", orderID) // Restore stock if err := s.restoreStock(orderID); err != nil { return fmt.Errorf("failed to restore stock: %w", err) } // Publish StockRestocked event stockEvent := events.NewEvent(events.StockRestocked, orderID, map[string]interface{}{ "order_id": orderID, "restored": true, }) return s.eventBus.Publish(ctx, stockEvent) } func (s *Service) updateStock(orderID string) error { // Implementation of stock update return nil } func (s *Service) restoreStock(orderID string) error { // Implementation of stock restoration return nil } // RegisterHandlers registers event handlers func (s *Service) RegisterHandlers(bus *events.EventBus) { bus.Subscribe(events.OrderPlaced, s.HandleOrderPlaced) bus.Subscribe(events.OrderCancelled, s.HandleOrderCancelled) } Main Application // cmd/order-service/main.go package main import ( "context" "encoding/json" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gorilla/mux" "app/internal/broker" "app/internal/events" "app/internal/services/email" "app/internal/services/inventory" "app/internal/services/order" ) func main() { // Initialize event bus eventBus := events.NewEventBus() // Add logging middleware eventBus.Use(loggingMiddleware) // Initialize RabbitMQ broker rabbitURL := os.Getenv("RABBITMQ_URL") if rabbitURL == "" { rabbitURL = "amqp://guest:guest@localhost:5672/" } mqBroker, err := broker.NewRabbitMQBroker(rabbitURL) if err != nil { log.Fatal(err) } defer mqBroker.Close() // Declare exchange if err := mqBroker.DeclareExchange("events", "topic"); err != nil { log.Fatal(err) } // Initialize services orderService := order.NewService(eventBus) emailService := email.NewService() inventoryService := inventory.NewService(eventBus) // Register event handlers emailService.RegisterHandlers(eventBus) inventoryService.RegisterHandlers(eventBus) // Bridge event bus to RabbitMQ eventBus.Subscribe(events.OrderPlaced, func(ctx context.Context, event *events.Event) error { return mqBroker.Publish(ctx, "events", "order.placed", event) }) // Setup HTTP server router := mux.NewRouter() router.HandleFunc("/orders", func(w http.ResponseWriter, r *http.Request) { var req struct { UserID string `json:"user_id"` Total float64 `json:"total"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } order, err := orderService.PlaceOrder(r.Context(), req.UserID, req.Total) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(order) }).Methods("POST") router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) server := &http.Server{ Addr: ":8080", Handler: router, } // Start server go func() { log.Println("Server starting on :8080") if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("Shutting down server...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Fatal(err) } } func loggingMiddleware(next events.Handler) events.Handler { return func(ctx context.Context, event *events.Event) error { start := time.Now() log.Printf("Handling event: %s (ID: %s)", event.Type, event.ID) err := next(ctx, event) log.Printf("Event %s handled in %v", event.ID, time.Since(start)) return err } } Best Practices Event Schema Versioning: Version your events to handle schema evolution Idempotent Handlers: Ensure handlers can process same event multiple times Error Handling: Implement retry logic and dead letter queues Event Ordering: Be aware that events may not arrive in order Event Store: Persist events for audit trail and replay Monitoring: Track event processing metrics and failures Testing: Test event handlers independently Documentation: Document all event types and their payloads Common Pitfalls Event Coupling: Events containing too much internal implementation details Lost Messages: Not ensuring message delivery guarantees Duplicate Processing: Not handling duplicate events No Versioning: Breaking changes to event schemas Synchronous Expectations: Expecting immediate consistency Overusing Events: Using events for simple request-response patterns Poor Error Handling: Not handling failures gracefully When to Use Event-Driven Architecture Use When: ...

    February 12, 2025 · 12 min · Rafiul Alam

    Event-Driven Note Sharing: Building Real-Time Microservices with NATS, Go, and Vue.js

    Building a real-time note-sharing application is a perfect use case for exploring event-driven architecture. In this comprehensive guide, we’ll build a production-ready system with two microservices (User and Note services) that communicate through NATS, delivering instant updates to a Vue.js frontend. Why Event-Driven Architecture? Traditional request-response patterns create tight coupling between services. When your Note service needs to notify users about changes, you don’t want to make synchronous HTTP calls to every service that cares about notes. Event-driven architecture solves this with loose coupling: ...

    January 27, 2025 · 22 min · Rafiul Alam