Backend Communication
Current: Server-Sent Events

What are Server-Sent Events (SSE)?

Server-Sent Events (SSE) is a server push technology that enables servers to push real-time updates to clients over a single HTTP connection. Unlike WebSockets, SSE is uni-directional (server → client only) and uses the standard HTTP protocol, making it simpler and more reliable through proxies and firewalls.

SSE is part of the HTML5 specification and provides built-in features like:

  • Automatic reconnection with configurable retry intervals
  • Event IDs for resuming from the last received event
  • Named event types for different message categories
  • Simple text-based protocol easy to debug and implement

Architecture Overview

sequenceDiagram participant C as Client (EventSource) participant S as Server participant R as Redis/Message Broker Note over C,S: Initial Connection C->>S: GET /events (Accept: text/event-stream) S-->>C: 200 OK (Content-Type: text/event-stream) Note over S: Connection established rect rgb(200, 220, 240) Note over R,S: Message Flow R->>S: New event published S->>C: data: message
id: 123
event: update

end rect rgb(240, 220, 200) Note over C,S: Automatic Reconnection C-xS: Connection lost Note over C: Wait retry interval C->>S: GET /events
Last-Event-ID: 123 S-->>C: 200 OK (resume from 123) end

SSE Message Format

SSE uses a simple text-based protocol:

event: message
id: 123
data: {"user": "Alice", "text": "Hello"}
retry: 5000

event: notification
id: 124
data: {"type": "alert", "message": "System update"}
data: {"priority": "high"}

: This is a comment (heartbeat)

Fields:

  • event: - Event type (default: “message”)
  • id: - Event identifier for reconnection
  • data: - Message payload (can be multi-line)
  • retry: - Reconnection interval in milliseconds
  • : - Comment (for heartbeats)

Real-World Use Cases

  • Live Notifications - Social media updates, email alerts, system notifications
  • Real-Time Dashboards - Metrics, analytics, server stats, user activity
  • Live News Feeds - Breaking news, stock prices, sports scores
  • Collaboration Tools - Document changes, presence updates, activity feeds
  • Progress Monitoring - Build status, deployment progress, task completion
  • IoT Data Streams - Sensor readings, device status, telemetry data
  • Chat Applications - One-way announcements, broadcast messages

Implementation in Go

Project Structure

sse-server/
├── main.go
├── sse/
│   ├── broker.go
│   ├── client.go
│   └── event.go
├── handlers/
│   └── stream.go
├── redis/
│   └── pubsub.go
└── go.mod

1. Core SSE Types

sse/event.go

package sse

import (
    "fmt"
    "strings"
)

// Event represents an SSE event
type Event struct {
    ID    string
    Event string
    Data  string
    Retry int
}

// String formats the event according to SSE specification
func (e *Event) String() string {
    var sb strings.Builder

    if e.Event != "" {
        fmt.Fprintf(&sb, "event: %s\n", e.Event)
    }

    if e.ID != "" {
        fmt.Fprintf(&sb, "id: %s\n", e.ID)
    }

    if e.Retry > 0 {
        fmt.Fprintf(&sb, "retry: %d\n", e.Retry)
    }

    // Handle multi-line data
    lines := strings.Split(e.Data, "\n")
    for _, line := range lines {
        fmt.Fprintf(&sb, "data: %s\n", line)
    }

    // SSE messages must end with double newline
    sb.WriteString("\n")

    return sb.String()
}

// Heartbeat creates a comment event (keepalive)
func Heartbeat() string {
    return ": heartbeat\n\n"
}

2. Client Connection Management

sse/client.go

package sse

import (
    "context"
    "fmt"
    "log"
)

// Client represents a connected SSE client
type Client struct {
    ID       string
    Channel  chan *Event
    LastID   string
    ctx      context.Context
    cancel   context.CancelFunc
}

// NewClient creates a new SSE client
func NewClient(id string, lastEventID string) *Client {
    ctx, cancel := context.WithCancel(context.Background())

    return &Client{
        ID:      id,
        Channel: make(chan *Event, 10), // Buffered to prevent blocking
        LastID:  lastEventID,
        ctx:     ctx,
        cancel:  cancel,
    }
}

// Send sends an event to the client
func (c *Client) Send(event *Event) error {
    select {
    case c.Channel <- event:
        return nil
    case <-c.ctx.Done():
        return fmt.Errorf("client disconnected")
    default:
        // Channel full - log and drop message or disconnect client
        log.Printf("Client %s buffer full, dropping message", c.ID)
        return fmt.Errorf("client buffer full")
    }
}

// Close closes the client connection
func (c *Client) Close() {
    c.cancel()
    close(c.Channel)
}

// Context returns the client context
func (c *Client) Context() context.Context {
    return c.ctx
}

3. Event Broker

sse/broker.go

package sse

import (
    "log"
    "sync"
    "time"
)

// Broker manages SSE clients and broadcasts events
type Broker struct {
    clients    map[string]*Client
    mu         sync.RWMutex
    register   chan *Client
    unregister chan *Client
    broadcast  chan *Event
    history    *EventHistory
}

// EventHistory stores recent events for replay
type EventHistory struct {
    events []Event
    mu     sync.RWMutex
    maxSize int
}

func NewEventHistory(maxSize int) *EventHistory {
    return &EventHistory{
        events:  make([]Event, 0, maxSize),
        maxSize: maxSize,
    }
}

func (eh *EventHistory) Add(event Event) {
    eh.mu.Lock()
    defer eh.mu.Unlock()

    eh.events = append(eh.events, event)
    if len(eh.events) > eh.maxSize {
        eh.events = eh.events[1:] // Remove oldest
    }
}

func (eh *EventHistory) GetAfter(lastID string) []Event {
    eh.mu.RLock()
    defer eh.mu.RUnlock()

    if lastID == "" {
        return nil
    }

    var result []Event
    foundLast := false

    for _, event := range eh.events {
        if foundLast {
            result = append(result, event)
        }
        if event.ID == lastID {
            foundLast = true
        }
    }

    return result
}

// NewBroker creates a new SSE broker
func NewBroker() *Broker {
    return &Broker{
        clients:    make(map[string]*Client),
        register:   make(chan *Client),
        unregister: make(chan *Client),
        broadcast:  make(chan *Event, 100),
        history:    NewEventHistory(100), // Keep last 100 events
    }
}

// Start starts the broker event loop
func (b *Broker) Start() {
    go b.run()
    go b.heartbeat()
}

func (b *Broker) run() {
    for {
        select {
        case client := <-b.register:
            b.mu.Lock()
            b.clients[client.ID] = client
            b.mu.Unlock()
            log.Printf("Client %s connected (total: %d)", client.ID, len(b.clients))

            // Send missed events if client reconnected
            if client.LastID != "" {
                missedEvents := b.history.GetAfter(client.LastID)
                for _, event := range missedEvents {
                    evt := event // Create copy
                    client.Send(&evt)
                }
                log.Printf("Replayed %d events to client %s", len(missedEvents), client.ID)
            }

        case client := <-b.unregister:
            b.mu.Lock()
            if _, exists := b.clients[client.ID]; exists {
                delete(b.clients, client.ID)
                client.Close()
                log.Printf("Client %s disconnected (total: %d)", client.ID, len(b.clients))
            }
            b.mu.Unlock()

        case event := <-b.broadcast:
            // Store in history
            if event.ID != "" {
                b.history.Add(*event)
            }

            // Broadcast to all connected clients
            b.mu.RLock()
            for _, client := range b.clients {
                if err := client.Send(event); err != nil {
                    log.Printf("Error sending to client %s: %v", client.ID, err)
                }
            }
            b.mu.RUnlock()

            log.Printf("Broadcast event %s to %d clients", event.ID, len(b.clients))
        }
    }
}

// heartbeat sends periodic heartbeat comments to keep connections alive
func (b *Broker) heartbeat() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        b.mu.RLock()
        for _, client := range b.clients {
            // Send heartbeat as comment (doesn't trigger event in client)
            select {
            case client.Channel <- &Event{Data: ""}: // Empty event acts as heartbeat
            default:
                // Channel full, skip heartbeat
            }
        }
        count := len(b.clients)
        b.mu.RUnlock()

        if count > 0 {
            log.Printf("Sent heartbeat to %d clients", count)
        }
    }
}

// Register registers a new client
func (b *Broker) Register(client *Client) {
    b.register <- client
}

// Unregister unregisters a client
func (b *Broker) Unregister(client *Client) {
    b.unregister <- client
}

// Broadcast broadcasts an event to all clients
func (b *Broker) Broadcast(event *Event) {
    b.broadcast <- event
}

// ClientCount returns the number of connected clients
func (b *Broker) ClientCount() int {
    b.mu.RLock()
    defer b.mu.RUnlock()
    return len(b.clients)
}

4. HTTP Handler

handlers/stream.go

package handlers

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"

    "sse-server/sse"
)

// SSEHandler handles SSE connections
func SSEHandler(broker *sse.Broker) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Verify request accepts event-stream
        if r.Header.Get("Accept") != "text/event-stream" {
            http.Error(w, "Invalid Accept header", http.StatusBadRequest)
            return
        }

        // Set SSE headers
        w.Header().Set("Content-Type", "text/event-stream")
        w.Header().Set("Cache-Control", "no-cache")
        w.Header().Set("Connection", "keep-alive")
        w.Header().Set("Access-Control-Allow-Origin", "*")
        w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering

        // Check if we can flush
        flusher, ok := w.(http.Flusher)
        if !ok {
            http.Error(w, "Streaming not supported", http.StatusInternalServerError)
            return
        }

        // Get Last-Event-ID header for reconnection
        lastEventID := r.Header.Get("Last-Event-ID")

        // Create client
        clientID := generateClientID()
        client := sse.NewClient(clientID, lastEventID)

        // Register client with broker
        broker.Register(client)
        defer broker.Unregister(client)

        // Send initial connection event
        initialEvent := &sse.Event{
            Event: "connected",
            Data:  fmt.Sprintf(`{"client_id": "%s", "time": "%s"}`, clientID, time.Now().Format(time.RFC3339)),
        }
        fmt.Fprint(w, initialEvent.String())
        flusher.Flush()

        // Stream events to client
        ctx := r.Context()
        for {
            select {
            case <-ctx.Done():
                log.Printf("Client %s: context done", clientID)
                return

            case <-client.Context().Done():
                log.Printf("Client %s: client context done", clientID)
                return

            case event, ok := <-client.Channel:
                if !ok {
                    log.Printf("Client %s: channel closed", clientID)
                    return
                }

                // Write event
                if event.Data == "" {
                    // Heartbeat
                    fmt.Fprint(w, sse.Heartbeat())
                } else {
                    fmt.Fprint(w, event.String())
                }

                // Flush immediately
                flusher.Flush()

                // Check for write errors (client disconnect)
                if ctx.Err() != nil {
                    log.Printf("Client %s: write error", clientID)
                    return
                }
            }
        }
    }
}

// Helper to generate unique client IDs
func generateClientID() string {
    return fmt.Sprintf("client_%d", time.Now().UnixNano())
}

5. Main Application

main.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strconv"
    "sync/atomic"
    "time"

    "sse-server/handlers"
    "sse-server/sse"
)

var (
    broker   *sse.Broker
    eventID  atomic.Int64
)

func main() {
    // Create and start broker
    broker = sse.NewBroker()
    broker.Start()

    // Setup routes
    mux := http.NewServeMux()

    // SSE endpoint
    mux.HandleFunc("/events", handlers.SSEHandler(broker))

    // Status endpoint
    mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(map[string]interface{}{
            "clients": broker.ClientCount(),
            "uptime":  time.Since(startTime).String(),
        })
    })

    // Test endpoints to publish events
    mux.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
            return
        }

        message := r.URL.Query().Get("message")
        if message == "" {
            message = "Test message"
        }

        eventType := r.URL.Query().Get("type")
        if eventType == "" {
            eventType = "message"
        }

        id := eventID.Add(1)
        event := &sse.Event{
            ID:    strconv.FormatInt(id, 10),
            Event: eventType,
            Data:  fmt.Sprintf(`{"message": "%s", "timestamp": "%s"}`, message, time.Now().Format(time.RFC3339)),
        }

        broker.Broadcast(event)

        w.WriteHeader(http.StatusCreated)
        fmt.Fprintf(w, "Event %d published\n", id)
    })

    // Start demo publisher
    go demoPublisher()

    // Start server
    log.Println("SSE Server starting on :8080")
    log.Println("Endpoints:")
    log.Println("  - GET  /events              (SSE stream)")
    log.Println("  - GET  /status              (server status)")
    log.Println("  - POST /publish?message=X   (publish event)")
    log.Println("\nTest with: curl -N -H 'Accept: text/event-stream' http://localhost:8080/events")

    if err := http.ListenAndServe(":8080", mux); err != nil {
        log.Fatal(err)
    }
}

var startTime = time.Now()

// demoPublisher publishes demo events
func demoPublisher() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        id := eventID.Add(1)
        event := &sse.Event{
            ID:    strconv.FormatInt(id, 10),
            Event: "update",
            Data:  fmt.Sprintf(`{"id": %d, "message": "Auto update", "time": "%s"}`, id, time.Now().Format(time.RFC3339)),
        }
        broker.Broadcast(event)
    }
}

6. Client Examples

JavaScript Client (EventSource API)

// Modern browser client
const eventSource = new EventSource('http://localhost:8080/events');

// Handle connection open
eventSource.addEventListener('open', () => {
    console.log('SSE connection opened');
});

// Handle default 'message' events
eventSource.addEventListener('message', (event) => {
    const data = JSON.parse(event.data);
    console.log('Message:', data);
});

// Handle custom 'update' events
eventSource.addEventListener('update', (event) => {
    const data = JSON.parse(event.data);
    console.log('Update:', data);
    console.log('Event ID:', event.lastEventId);
});

// Handle custom 'connected' events
eventSource.addEventListener('connected', (event) => {
    const data = JSON.parse(event.data);
    console.log('Connected:', data.client_id);
});

// Handle errors (automatic reconnection)
eventSource.addEventListener('error', (error) => {
    console.error('SSE error:', error);
    if (eventSource.readyState === EventSource.CLOSED) {
        console.log('Connection closed');
    } else {
        console.log('Connection error, will retry...');
    }
});

// Close connection (if needed)
// eventSource.close();

Go Client

package main

import (
    "bufio"
    "fmt"
    "log"
    "net/http"
    "strings"
)

func main() {
    req, err := http.NewRequest("GET", "http://localhost:8080/events", nil)
    if err != nil {
        log.Fatal(err)
    }

    req.Header.Set("Accept", "text/event-stream")
    req.Header.Set("Cache-Control", "no-cache")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        log.Fatalf("Bad status: %s", resp.Status)
    }

    fmt.Println("Connected to SSE stream")

    reader := bufio.NewReader(resp.Body)
    var event SSEEvent

    for {
        line, err := reader.ReadString('\n')
        if err != nil {
            log.Fatal(err)
        }

        line = strings.TrimSpace(line)

        if line == "" {
            // Empty line means end of event
            if event.Data != "" {
                fmt.Printf("Event: %+v\n", event)
                event = SSEEvent{} // Reset
            }
            continue
        }

        if strings.HasPrefix(line, ":") {
            // Comment (heartbeat)
            continue
        }

        parts := strings.SplitN(line, ":", 2)
        if len(parts) != 2 {
            continue
        }

        field := strings.TrimSpace(parts[0])
        value := strings.TrimSpace(parts[1])

        switch field {
        case "event":
            event.Event = value
        case "data":
            if event.Data != "" {
                event.Data += "\n"
            }
            event.Data += value
        case "id":
            event.ID = value
        case "retry":
            event.Retry = value
        }
    }
}

type SSEEvent struct {
    Event string
    Data  string
    ID    string
    Retry string
}

cURL Test

# Subscribe to events
curl -N -H "Accept: text/event-stream" http://localhost:8080/events

# Publish a test event (in another terminal)
curl -X POST "http://localhost:8080/publish?message=Hello&type=notification"

# Check server status
curl http://localhost:8080/status

Horizontal Scaling with Redis Pub/Sub

redis/pubsub.go

package redis

import (
    "context"
    "encoding/json"
    "log"

    "github.com/redis/go-redis/v9"
    "sse-server/sse"
)

type RedisBroker struct {
    client    *redis.Client
    pubsub    *redis.PubSub
    localBroker *sse.Broker
    channel   string
}

func NewRedisBroker(redisAddr string, channel string, localBroker *sse.Broker) *RedisBroker {
    client := redis.NewClient(&redis.Options{
        Addr: redisAddr,
    })

    return &RedisBroker{
        client:      client,
        localBroker: localBroker,
        channel:     channel,
    }
}

func (rb *RedisBroker) Start(ctx context.Context) error {
    // Subscribe to Redis channel
    rb.pubsub = rb.client.Subscribe(ctx, rb.channel)

    // Wait for subscription confirmation
    _, err := rb.pubsub.Receive(ctx)
    if err != nil {
        return err
    }

    log.Printf("Subscribed to Redis channel: %s", rb.channel)

    // Start listening
    go rb.listen(ctx)

    return nil
}

func (rb *RedisBroker) listen(ctx context.Context) {
    ch := rb.pubsub.Channel()

    for {
        select {
        case <-ctx.Done():
            rb.pubsub.Close()
            return

        case msg := <-ch:
            var event sse.Event
            if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
                log.Printf("Error unmarshaling event: %v", err)
                continue
            }

            // Broadcast to local clients
            rb.localBroker.Broadcast(&event)
        }
    }
}

func (rb *RedisBroker) Publish(event *sse.Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }

    return rb.client.Publish(context.Background(), rb.channel, data).Err()
}

Updated main.go with Redis

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "sse-server/redis"
    "sse-server/sse"
)

func main() {
    // Create local broker
    localBroker := sse.NewBroker()
    localBroker.Start()

    // Create Redis broker for scaling
    redisAddr := os.Getenv("REDIS_ADDR")
    if redisAddr == "" {
        redisAddr = "localhost:6379"
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    redisBroker := redis.NewRedisBroker(redisAddr, "sse:events", localBroker)
    if err := redisBroker.Start(ctx); err != nil {
        log.Fatal(err)
    }

    // Modify publish endpoint to use Redis
    mux.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
        // ... create event ...

        // Publish to Redis (will broadcast to all instances)
        if err := redisBroker.Publish(event); err != nil {
            http.Error(w, "Publish failed", http.StatusInternalServerError)
            return
        }

        w.WriteHeader(http.StatusCreated)
    })

    // ... rest of setup ...

    // Graceful shutdown
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-sigCh
        log.Println("Shutting down...")
        cancel()
        os.Exit(0)
    }()

    log.Fatal(http.ListenAndServe(":8080", mux))
}

Architecture Patterns

graph TB subgraph "Single Instance" A[Client A] -->|SSE| S1[Server] B[Client B] -->|SSE| S1 C[Client C] -->|SSE| S1 S1 --> M1[Memory Broker] end subgraph "Scaled with Redis" A2[Client A] -->|SSE| S2[Server 1] B2[Client B] -->|SSE| S3[Server 2] C2[Client C] -->|SSE| S4[Server 3] S2 --> LB1[Local Broker] S3 --> LB2[Local Broker] S4 --> LB3[Local Broker] LB1 <-->|Pub/Sub| R[Redis] LB2 <-->|Pub/Sub| R LB3 <-->|Pub/Sub| R P[Publisher] -->|Publish| R end style M1 fill:#e1f5ff style R fill:#ffe1e1

Best Practices

1. Event ID Management

// Use monotonic IDs for proper ordering
var eventCounter atomic.Int64

func generateEventID() string {
    return strconv.FormatInt(eventCounter.Add(1), 10)
}

// Or use timestamp-based IDs
func generateTimestampID() string {
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

2. Heartbeat for Connection Detection

// Send periodic heartbeats to detect dead connections
func (b *Broker) heartbeat() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        // Send comment as heartbeat (doesn't trigger client event)
        b.mu.RLock()
        for _, client := range b.clients {
            select {
            case client.Channel <- nil: // nil triggers heartbeat comment
            default:
            }
        }
        b.mu.RUnlock()
    }
}

3. Buffer Management

// Use buffered channels to prevent slow clients from blocking
client := &Client{
    Channel: make(chan *Event, 100), // Adjust based on message rate
}

// Drop messages if buffer is full (or disconnect client)
func (c *Client) Send(event *Event) error {
    select {
    case c.Channel <- event:
        return nil
    default:
        return fmt.Errorf("buffer full")
    }
}

4. Connection Limits

// Limit connections per IP
type IPRateLimiter struct {
    connections map[string]int
    mu          sync.Mutex
    maxPerIP    int
}

func (rl *IPRateLimiter) Allow(ip string) bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    if rl.connections[ip] >= rl.maxPerIP {
        return false
    }
    rl.connections[ip]++
    return true
}

5. Error Handling

// Graceful degradation on errors
func (b *Broker) Broadcast(event *Event) {
    b.mu.RLock()
    defer b.mu.RUnlock()

    var failedClients []string
    for id, client := range b.clients {
        if err := client.Send(event); err != nil {
            log.Printf("Failed to send to client %s: %v", id, err)
            failedClients = append(failedClients, id)
        }
    }

    // Clean up failed clients
    for _, id := range failedClients {
        b.Unregister(b.clients[id])
    }
}

Common Pitfalls

1. Not Handling Proxy Buffering

// Bad: Proxies may buffer responses
w.Header().Set("Content-Type", "text/event-stream")

// Good: Disable buffering explicitly
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("X-Accel-Buffering", "no") // Nginx
w.Header().Set("Connection", "keep-alive")

Nginx configuration:

location /events {
    proxy_pass http://backend;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;
    proxy_buffering off;
    proxy_cache off;
}

2. Not Implementing Event Replay

// Bad: Clients lose messages on reconnect
func (b *Broker) Register(client *Client) {
    b.clients[client.ID] = client
}

// Good: Replay missed events
func (b *Broker) Register(client *Client) {
    b.clients[client.ID] = client

    if client.LastID != "" {
        missedEvents := b.history.GetAfter(client.LastID)
        for _, event := range missedEvents {
            client.Send(&event)
        }
    }
}

3. Memory Leaks from Orphaned Connections

// Bad: No cleanup on disconnect
func handler(w http.ResponseWriter, r *http.Request) {
    client := NewClient()
    broker.Register(client)
    // If request context cancelled, client never cleaned up!
}

// Good: Defer cleanup
func handler(w http.ResponseWriter, r *http.Request) {
    client := NewClient()
    broker.Register(client)
    defer broker.Unregister(client)

    // Monitor context
    <-r.Context().Done()
}

4. Not Setting Retry Interval

// Guide client reconnection behavior
event := &Event{
    Retry: 5000, // 5 seconds
    Data:  "...",
}

When to Use SSE

✅ Use SSE When:

  • Uni-directional communication (server → client only)
  • Simple real-time updates (notifications, feeds, dashboards)
  • Working with HTTP/REST architecture
  • Need automatic reconnection with message replay
  • Clients are web browsers with EventSource API support
  • Operating through restrictive proxies/firewalls
  • Want simpler implementation than WebSockets

❌ Avoid SSE When:

  • Need bi-directional communication (client ↔ server)
  • High-frequency messages (> 10/second per client)
  • Binary data streaming (use WebSockets or gRPC)
  • Need guaranteed delivery (use message queues)
  • Complex messaging patterns (pub/sub, point-to-point)
  • Mobile apps requiring efficient battery usage

Advantages

  1. Simple Protocol - Text-based, easy to debug with curl
  2. Built-in Reconnection - Automatic with Last-Event-ID support
  3. HTTP Compatible - Works with existing infrastructure
  4. Firewall Friendly - Uses standard HTTP/HTTPS ports
  5. Efficient - Single long-lived connection
  6. Browser Native - EventSource API built into browsers
  7. Event Types - Multiple event types on same connection
  8. Stateless Server - Can resume from any instance

Disadvantages

  1. Uni-directional Only - Server to client only
  2. Text-Based - Binary data requires encoding
  3. Connection Limits - Browser limits per domain (6 connections)
  4. No Built-in Compression - Must compress data field
  5. HTTP/1.1 Overhead - Less efficient than WebSockets
  6. Proxy Issues - Some proxies buffer responses
  7. No Request Headers - Can’t send auth tokens in messages (only on connect)

Performance Considerations

Connection Limits

  • Browsers limit SSE connections to 6 per domain (HTTP/1.1)
  • Use HTTP/2 to allow unlimited connections
  • Consider domain sharding for HTTP/1.1

Memory Usage

// Monitor memory per client
type Broker struct {
    maxClients int
    // ...
}

func (b *Broker) Register(client *Client) error {
    if len(b.clients) >= b.maxClients {
        return fmt.Errorf("max clients reached")
    }
    // ...
}

Message Rate

  • Keep message frequency reasonable (< 10/second per client)
  • Batch updates when possible
  • Use compression for large messages

Testing

Unit Test Example

package sse

import (
    "testing"
    "time"
)

func TestBrokerBroadcast(t *testing.T) {
    broker := NewBroker()
    broker.Start()

    client1 := NewClient("test1", "")
    client2 := NewClient("test2", "")

    broker.Register(client1)
    broker.Register(client2)

    event := &Event{
        ID:    "1",
        Event: "test",
        Data:  "hello",
    }

    broker.Broadcast(event)

    // Wait for broadcast
    time.Sleep(100 * time.Millisecond)

    // Check both clients received event
    select {
    case evt := <-client1.Channel:
        if evt.Data != "hello" {
            t.Errorf("Client1 got wrong data: %s", evt.Data)
        }
    default:
        t.Error("Client1 didn't receive event")
    }

    select {
    case evt := <-client2.Channel:
        if evt.Data != "hello" {
            t.Errorf("Client2 got wrong data: %s", evt.Data)
        }
    default:
        t.Error("Client2 didn't receive event")
    }
}

Comparison: SSE vs Long Polling vs WebSockets

Feature SSE Long Polling WebSockets
Direction Server → Client Both (via requests) Both (full-duplex)
Protocol HTTP HTTP WebSocket (upgrade)
Reconnection Automatic Manual Manual
Message Replay Yes (Last-Event-ID) No No
Complexity Low Low Medium
Proxy Support Good Excellent Fair
Browser API EventSource Fetch/XHR WebSocket
Overhead Low Medium Very Low

Backend Communication
Current: Server-Sent Events