{{series_nav(current_post=11)}}

GraphQL Subscriptions bring real-time capabilities to GraphQL APIs, allowing clients to subscribe to specific events and receive updates when data changes. Unlike queries and mutations, subscriptions maintain a long-lived connection and push data from server to client, providing type-safe, schema-driven real-time communication.

What are GraphQL Subscriptions?

GraphQL Subscriptions are a GraphQL operation type (alongside Query and Mutation) that enables real-time data flow from server to client. They combine the declarative nature of GraphQL with event-driven architecture, allowing clients to specify exactly what data they want to receive when specific events occur.

Core Concepts

Subscription: A GraphQL operation that establishes a long-lived connection for receiving updates.

Schema: Type-safe definition of available subscriptions and their payloads.

Resolver: Server-side function that manages subscription lifecycle and emits events.

Transport: WebSocket protocol for bidirectional communication (typically graphql-ws).

Filtering: Server-side or client-side filtering of subscription events.

Event Source: Backend system generating events (database, message queue, service).

Architecture

graph TB subgraph "Clients" C1[Web App] C2[Mobile App] C3[Dashboard] end subgraph "GraphQL Layer" GQL[GraphQL Server
gqlgen] subgraph "Schema" S1[Subscription Types] S2[Query Types] S3[Mutation Types] end subgraph "Resolvers" R1[Subscription Resolvers] R2[Query Resolvers] R3[Mutation Resolvers] end subgraph "WebSocket Manager" WS[WebSocket Connections] SUB[Active Subscriptions] end end subgraph "Event Sources" DB[(Database)] MQ[Message Queue
NATS/Kafka] CACHE[Redis Pub/Sub] API[External APIs] end C1 <-->|WebSocket
graphql-ws| WS C2 <-->|WebSocket| WS C3 <-->|WebSocket| WS WS --> SUB SUB --> R1 R1 --> S1 R1 -.->|Listen| DB R1 -.->|Subscribe| MQ R1 -.->|Subscribe| CACHE R1 -.->|Poll| API R3 -->|Trigger Event| R1

Subscription Flow

sequenceDiagram participant C as Client participant WS as WebSocket participant GQL as GraphQL Server participant R as Resolver participant E as Event Source Note over C,E: Connection Setup C->>WS: Connect (graphql-ws) WS->>C: connection_ack Note over C,E: Subscription C->>WS: subscribe(subscription { ... }) WS->>GQL: Parse & Validate GQL->>R: Create Subscription R->>E: Register Listener E->>R: Listener Registered R->>WS: Subscription Active WS->>C: next (initial data) Note over C,E: Event Emission E->>R: Event Occurred R->>R: Filter & Transform R->>WS: next (event data) WS->>C: next (event data) E->>R: Another Event R->>WS: next (event data) WS->>C: next (event data) Note over C,E: Unsubscribe C->>WS: complete WS->>R: Stop Subscription R->>E: Unregister Listener WS->>C: complete

Transport Protocol (graphql-ws)

sequenceDiagram participant C as Client participant S as Server Note over C,S: Connection Phase C->>S: connection_init { payload } S->>C: connection_ack Note over C,S: Operation Phase C->>S: subscribe { id, payload } S->>C: next { id, payload } S->>C: next { id, payload } Note over C,S: Ping/Pong (Optional) C->>S: ping S->>C: pong Note over C,S: Completion C->>S: complete { id } S->>C: complete { id } Note over C,S: Termination C->>S: connection_terminate S->>C: Close WebSocket

Real-World Use Cases

1. Real-Time Chat

Subscribe to new messages in chat rooms with typing indicators.

Perfect for: Team collaboration, customer support, social messaging

2. Live Dashboards

Monitor metrics, KPIs, and system health in real-time.

Perfect for: Analytics dashboards, monitoring tools, admin panels

3. Collaborative Editing

Sync document changes across multiple users.

Perfect for: Google Docs-style apps, code editors, design tools

4. Social Media Feeds

Receive new posts, comments, likes, and notifications instantly.

Perfect for: Social networks, activity feeds, news aggregators

5. E-commerce Updates

Track order status, inventory changes, and price updates.

Perfect for: Shopping carts, auction sites, stock trading platforms

6. Gaming

Sync game state, player positions, and scores in real-time.

Perfect for: Multiplayer games, leaderboards, tournaments

7. IoT Monitoring

Stream sensor data and device status updates.

Perfect for: Smart home dashboards, industrial monitoring, fleet tracking

8. Notifications

Deliver targeted notifications based on user preferences.

Perfect for: Alert systems, push notifications, activity streams

Project Structure

graphql-subscriptions/
├── cmd/
│   └── server/
│       └── main.go           # Server entry point
├── graph/
│   ├── schema.graphqls       # GraphQL schema
│   ├── schema.resolvers.go   # Generated resolvers
│   ├── resolver.go           # Resolver root
│   ├── model/
│   │   └── models_gen.go     # Generated models
│   └── generated/
│       └── generated.go      # gqlgen generated code
├── internal/
│   ├── pubsub/
│   │   ├── pubsub.go         # Pub/Sub abstraction
│   │   └── redis.go          # Redis implementation
│   ├── service/
│   │   ├── chat.go           # Chat service
│   │   ├── notification.go   # Notification service
│   │   └── activity.go       # Activity service
│   └── middleware/
│       ├── auth.go           # Authentication
│       └── logging.go        # Logging
├── pkg/
│   └── ws/
│       └── transport.go      # WebSocket transport
├── gqlgen.yml                # gqlgen configuration
├── go.mod
└── go.sum

Implementation

1. GraphQL Schema

# graph/schema.graphqls

# Scalar types
scalar Time

# === Query Types ===

type Query {
  # Chat queries
  messages(roomID: ID!, limit: Int = 50): [Message!]!
  rooms: [Room!]!

  # Notification queries
  notifications(limit: Int = 20): [Notification!]!

  # User queries
  user(id: ID!): User
  onlineUsers: [User!]!
}

# === Mutation Types ===

type Mutation {
  # Chat mutations
  sendMessage(input: SendMessageInput!): Message!
  createRoom(input: CreateRoomInput!): Room!
  joinRoom(roomID: ID!): Boolean!
  leaveRoom(roomID: ID!): Boolean!

  # User mutations
  updateUserStatus(status: UserStatus!): User!

  # Notification mutations
  markNotificationRead(id: ID!): Notification!
}

# === Subscription Types ===

type Subscription {
  # Chat subscriptions
  messageAdded(roomID: ID!): Message!
  typingIndicator(roomID: ID!): TypingIndicator!
  roomUpdated(roomID: ID!): Room!

  # User subscriptions
  userStatusChanged(userID: ID): User!

  # Notification subscriptions
  notificationReceived: Notification!

  # Activity subscriptions
  activityFeed(filter: ActivityFilter): Activity!
}

# === Models ===

type Message {
  id: ID!
  roomID: ID!
  userID: ID!
  user: User!
  content: String!
  createdAt: Time!
  updatedAt: Time!
}

type Room {
  id: ID!
  name: String!
  description: String
  members: [User!]!
  createdAt: Time!
  messageCount: Int!
}

type User {
  id: ID!
  username: String!
  email: String!
  status: UserStatus!
  avatarURL: String
  lastSeen: Time!
}

type Notification {
  id: ID!
  userID: ID!
  type: NotificationType!
  title: String!
  message: String!
  read: Boolean!
  metadata: String
  createdAt: Time!
}

type Activity {
  id: ID!
  type: ActivityType!
  userID: ID!
  user: User!
  action: String!
  resourceID: String
  metadata: String
  createdAt: Time!
}

type TypingIndicator {
  roomID: ID!
  userID: ID!
  username: String!
  isTyping: Boolean!
}

# === Enums ===

enum UserStatus {
  ONLINE
  AWAY
  OFFLINE
}

enum NotificationType {
  MESSAGE
  MENTION
  SYSTEM
  ALERT
}

enum ActivityType {
  MESSAGE
  JOIN
  LEAVE
  STATUS_CHANGE
  REACTION
}

# === Input Types ===

input SendMessageInput {
  roomID: ID!
  content: String!
}

input CreateRoomInput {
  name: String!
  description: String
}

input ActivityFilter {
  types: [ActivityType!]
  userID: ID
}

2. PubSub Abstraction

// internal/pubsub/pubsub.go
package pubsub

import (
	"context"
	"sync"
)

// PubSub defines pub/sub interface
type PubSub interface {
	Publish(ctx context.Context, channel string, message interface{}) error
	Subscribe(ctx context.Context, channel string) (<-chan interface{}, error)
	Unsubscribe(ctx context.Context, channel string) error
	Close() error
}

// InMemoryPubSub is a simple in-memory pub/sub for single-server setups
type InMemoryPubSub struct {
	mu       sync.RWMutex
	channels map[string][]chan interface{}
}

func NewInMemoryPubSub() *InMemoryPubSub {
	return &InMemoryPubSub{
		channels: make(map[string][]chan interface{}),
	}
}

func (p *InMemoryPubSub) Publish(ctx context.Context, channel string, message interface{}) error {
	p.mu.RLock()
	defer p.mu.RUnlock()

	if subscribers, ok := p.channels[channel]; ok {
		for _, ch := range subscribers {
			select {
			case ch <- message:
			default:
				// Channel full, skip
			}
		}
	}

	return nil
}

func (p *InMemoryPubSub) Subscribe(ctx context.Context, channel string) (<-chan interface{}, error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	ch := make(chan interface{}, 10)
	p.channels[channel] = append(p.channels[channel], ch)

	// Cleanup on context cancellation
	go func() {
		<-ctx.Done()
		p.unsubscribe(channel, ch)
	}()

	return ch, nil
}

func (p *InMemoryPubSub) unsubscribe(channel string, ch chan interface{}) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if subscribers, ok := p.channels[channel]; ok {
		for i, subscriber := range subscribers {
			if subscriber == ch {
				p.channels[channel] = append(subscribers[:i], subscribers[i+1:]...)
				close(ch)
				break
			}
		}
	}
}

func (p *InMemoryPubSub) Unsubscribe(ctx context.Context, channel string) error {
	// Not needed with auto-cleanup in Subscribe
	return nil
}

func (p *InMemoryPubSub) Close() error {
	p.mu.Lock()
	defer p.mu.Unlock()

	for _, subscribers := range p.channels {
		for _, ch := range subscribers {
			close(ch)
		}
	}

	p.channels = make(map[string][]chan interface{})
	return nil
}

3. Redis PubSub Implementation

// internal/pubsub/redis.go
package pubsub

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
)

// RedisPubSub implements PubSub using Redis
type RedisPubSub struct {
	client *redis.Client
}

func NewRedisPubSub(addr string) (*RedisPubSub, error) {
	client := redis.NewClient(&redis.Options{
		Addr: addr,
	})

	if err := client.Ping(context.Background()).Err(); err != nil {
		return nil, fmt.Errorf("redis ping: %w", err)
	}

	return &RedisPubSub{client: client}, nil
}

func (r *RedisPubSub) Publish(ctx context.Context, channel string, message interface{}) error {
	data, err := json.Marshal(message)
	if err != nil {
		return fmt.Errorf("marshal: %w", err)
	}

	return r.client.Publish(ctx, channel, data).Err()
}

func (r *RedisPubSub) Subscribe(ctx context.Context, channel string) (<-chan interface{}, error) {
	pubsub := r.client.Subscribe(ctx, channel)
	ch := make(chan interface{}, 10)

	go func() {
		defer close(ch)
		defer pubsub.Close()

		msgChan := pubsub.Channel()

		for {
			select {
			case <-ctx.Done():
				return
			case msg := <-msgChan:
				if msg == nil {
					return
				}

				var payload interface{}
				if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil {
					continue
				}

				select {
				case ch <- payload:
				case <-ctx.Done():
					return
				}
			}
		}
	}()

	return ch, nil
}

func (r *RedisPubSub) Unsubscribe(ctx context.Context, channel string) error {
	// Auto-handled by context cancellation
	return nil
}

func (r *RedisPubSub) Close() error {
	return r.client.Close()
}

4. Resolver Root

// graph/resolver.go
package graph

import (
	"graphql-subscriptions/internal/pubsub"
	"graphql-subscriptions/internal/service"
	"sync"
)

// Resolver is the root resolver
type Resolver struct {
	pubsub       pubsub.PubSub
	chatService  *service.ChatService
	userService  *service.UserService
	notifService *service.NotificationService

	// In-memory stores (use real DB in production)
	mu           sync.RWMutex
	messages     []*model.Message
	rooms        []*model.Room
	users        []*model.User
	notifications []*model.Notification
}

// NewResolver creates a new resolver
func NewResolver(ps pubsub.PubSub) *Resolver {
	resolver := &Resolver{
		pubsub:        ps,
		messages:      []*model.Message{},
		rooms:         []*model.Room{},
		users:         []*model.User{},
		notifications: []*model.Notification{},
	}

	// Initialize services
	resolver.chatService = service.NewChatService(ps, resolver)
	resolver.userService = service.NewUserService(ps, resolver)
	resolver.notifService = service.NewNotificationService(ps, resolver)

	return resolver
}

// Mutation returns the mutation resolver
func (r *Resolver) Mutation() MutationResolver {
	return &mutationResolver{r}
}

// Query returns the query resolver
func (r *Resolver) Query() QueryResolver {
	return &queryResolver{r}
}

// Subscription returns the subscription resolver
func (r *Resolver) Subscription() SubscriptionResolver {
	return &subscriptionResolver{r}
}

5. Subscription Resolvers

// graph/schema.resolvers.go
package graph

import (
	"context"
	"fmt"
	"time"

	"graphql-subscriptions/graph/model"
)

type subscriptionResolver struct{ *Resolver }

// MessageAdded subscribes to new messages in a room
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
	channel := fmt.Sprintf("room:%s:messages", roomID)

	// Subscribe to pub/sub channel
	msgChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, fmt.Errorf("subscribe: %w", err)
	}

	// Convert interface{} channel to *model.Message channel
	out := make(chan *model.Message)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-msgChan:
				if !ok {
					return
				}

				// Type assert to *model.Message
				if message, ok := msg.(*model.Message); ok {
					select {
					case out <- message:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

// TypingIndicator subscribes to typing indicators in a room
func (r *subscriptionResolver) TypingIndicator(ctx context.Context, roomID string) (<-chan *model.TypingIndicator, error) {
	channel := fmt.Sprintf("room:%s:typing", roomID)
	indicatorChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, err
	}

	out := make(chan *model.TypingIndicator)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-indicatorChan:
				if !ok {
					return
				}

				if indicator, ok := msg.(*model.TypingIndicator); ok {
					select {
					case out <- indicator:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

// UserStatusChanged subscribes to user status changes
func (r *subscriptionResolver) UserStatusChanged(ctx context.Context, userID *string) (<-chan *model.User, error) {
	channel := "user:status"
	if userID != nil {
		channel = fmt.Sprintf("user:%s:status", *userID)
	}

	userChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, err
	}

	out := make(chan *model.User)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-userChan:
				if !ok {
					return
				}

				if user, ok := msg.(*model.User); ok {
					// Filter by userID if specified
					if userID != nil && user.ID != *userID {
						continue
					}

					select {
					case out <- user:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

// NotificationReceived subscribes to user notifications
func (r *subscriptionResolver) NotificationReceived(ctx context.Context) (<-chan *model.Notification, error) {
	// Get user ID from context (set by auth middleware)
	userID, ok := ctx.Value("userID").(string)
	if !ok {
		return nil, fmt.Errorf("unauthorized")
	}

	channel := fmt.Sprintf("user:%s:notifications", userID)
	notifChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, err
	}

	out := make(chan *model.Notification)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-notifChan:
				if !ok {
					return
				}

				if notif, ok := msg.(*model.Notification); ok {
					select {
					case out <- notif:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

// ActivityFeed subscribes to activity feed
func (r *subscriptionResolver) ActivityFeed(ctx context.Context, filter *model.ActivityFilter) (<-chan *model.Activity, error) {
	channel := "activity:feed"

	activityChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, err
	}

	out := make(chan *model.Activity)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-activityChan:
				if !ok {
					return
				}

				if activity, ok := msg.(*model.Activity); ok {
					// Apply filters
					if filter != nil {
						if filter.UserID != nil && activity.UserID != *filter.UserID {
							continue
						}

						if filter.Types != nil {
							found := false
							for _, t := range filter.Types {
								if activity.Type == t {
									found = true
									break
								}
							}
							if !found {
								continue
							}
						}
					}

					select {
					case out <- activity:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

// RoomUpdated subscribes to room updates
func (r *subscriptionResolver) RoomUpdated(ctx context.Context, roomID string) (<-chan *model.Room, error) {
	channel := fmt.Sprintf("room:%s:updated", roomID)

	roomChan, err := r.pubsub.Subscribe(ctx, channel)
	if err != nil {
		return nil, err
	}

	out := make(chan *model.Room)

	go func() {
		defer close(out)

		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-roomChan:
				if !ok {
					return
				}

				if room, ok := msg.(*model.Room); ok {
					select {
					case out <- room:
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

6. Mutation Resolvers (Trigger Subscriptions)

type mutationResolver struct{ *Resolver }

// SendMessage sends a message and triggers subscription
func (r *mutationResolver) SendMessage(ctx context.Context, input model.SendMessageInput) (*model.Message, error) {
	// Get user from context
	userID, ok := ctx.Value("userID").(string)
	if !ok {
		return nil, fmt.Errorf("unauthorized")
	}

	// Create message
	message := &model.Message{
		ID:        fmt.Sprintf("msg-%d", time.Now().UnixNano()),
		RoomID:    input.RoomID,
		UserID:    userID,
		Content:   input.Content,
		CreatedAt: time.Now(),
		UpdatedAt: time.Now(),
	}

	// Store message
	r.mu.Lock()
	r.messages = append(r.messages, message)
	r.mu.Unlock()

	// Publish to subscribers
	channel := fmt.Sprintf("room:%s:messages", input.RoomID)
	if err := r.pubsub.Publish(ctx, channel, message); err != nil {
		return nil, fmt.Errorf("publish: %w", err)
	}

	// Trigger activity feed
	activity := &model.Activity{
		ID:         fmt.Sprintf("act-%d", time.Now().UnixNano()),
		Type:       model.ActivityTypeMessage,
		UserID:     userID,
		Action:     "sent a message",
		ResourceID: &message.ID,
		CreatedAt:  time.Now(),
	}
	r.pubsub.Publish(ctx, "activity:feed", activity)

	return message, nil
}

// UpdateUserStatus updates user status and triggers subscription
func (r *mutationResolver) UpdateUserStatus(ctx context.Context, status model.UserStatus) (*model.User, error) {
	userID, ok := ctx.Value("userID").(string)
	if !ok {
		return nil, fmt.Errorf("unauthorized")
	}

	// Find and update user
	r.mu.Lock()
	var user *model.User
	for _, u := range r.users {
		if u.ID == userID {
			u.Status = status
			u.LastSeen = time.Now()
			user = u
			break
		}
	}
	r.mu.Unlock()

	if user == nil {
		return nil, fmt.Errorf("user not found")
	}

	// Publish status change
	r.pubsub.Publish(ctx, "user:status", user)
	r.pubsub.Publish(ctx, fmt.Sprintf("user:%s:status", userID), user)

	return user, nil
}

7. Server Setup

// cmd/server/main.go
package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/99designs/gqlgen/graphql/handler"
	"github.com/99designs/gqlgen/graphql/handler/extension"
	"github.com/99designs/gqlgen/graphql/handler/loglevel"
	"github.com/99designs/gqlgen/graphql/handler/transport"
	"github.com/99designs/gqlgen/graphql/playground"
	"github.com/gorilla/websocket"

	"graphql-subscriptions/graph"
	"graphql-subscriptions/graph/generated"
	"graphql-subscriptions/internal/pubsub"
)

func main() {
	// Create pub/sub (use Redis in production)
	ps := pubsub.NewInMemoryPubSub()
	// For multi-server: ps, _ := pubsub.NewRedisPubSub("localhost:6379")
	defer ps.Close()

	// Create resolver
	resolver := graph.NewResolver(ps)

	// Create GraphQL server
	srv := handler.New(generated.NewExecutableSchema(generated.Config{
		Resolvers: resolver,
	}))

	// Add transports
	srv.AddTransport(transport.POST{})
	srv.AddTransport(transport.Websocket{
		KeepAlivePingInterval: 10 * time.Second,
		Upgrader: websocket.Upgrader{
			CheckOrigin: func(r *http.Request) bool {
				return true // Configure appropriately for production
			},
		},
	})

	// Add extensions
	srv.Use(extension.Introspection{})
	srv.SetErrorPresenter(errorPresenter)
	srv.SetRecoverFunc(recoverFunc)

	// Logging
	srv.SetQueryLogger(loglevel.DebugLevel)

	// Setup routes
	http.Handle("/", playground.Handler("GraphQL Playground", "/query"))
	http.Handle("/query", authMiddleware(srv))

	// Health check
	http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("OK"))
	})

	// Start server
	server := &http.Server{
		Addr:         ":8080",
		Handler:      nil,
		ReadTimeout:  15 * time.Second,
		WriteTimeout: 15 * time.Second,
		IdleTimeout:  60 * time.Second,
	}

	go func() {
		log.Println("Server starting on http://localhost:8080")
		log.Println("GraphQL Playground: http://localhost:8080")
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Server error: %v", err)
		}
	}()

	// Graceful shutdown
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	log.Println("Shutting down server...")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	if err := server.Shutdown(ctx); err != nil {
		log.Fatal("Server forced to shutdown:", err)
	}

	log.Println("Server stopped")
}

func authMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Extract user from JWT/session
		// For demo, use header
		userID := r.Header.Get("X-User-ID")
		if userID == "" {
			userID = "anonymous"
		}

		ctx := context.WithValue(r.Context(), "userID", userID)
		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

func errorPresenter(ctx context.Context, e error) *gqlerror.Error {
	log.Printf("GraphQL error: %v", e)
	return graphql.DefaultErrorPresenter(ctx, e)
}

func recoverFunc(ctx context.Context, err interface{}) error {
	log.Printf("Panic recovered: %v", err)
	return fmt.Errorf("internal server error")
}

Client Examples

JavaScript/TypeScript Client

import { createClient } from 'graphql-ws';
import { GraphQLClient, gql } from 'graphql-request';

// WebSocket client for subscriptions
const wsClient = createClient({
  url: 'ws://localhost:8080/query',
  connectionParams: {
    authorization: 'Bearer <token>',
  },
});

// Subscribe to messages
const subscription = gql`
  subscription MessageAdded($roomID: ID!) {
    messageAdded(roomID: $roomID) {
      id
      content
      user {
        id
        username
      }
      createdAt
    }
  }
`;

const unsubscribe = wsClient.subscribe(
  {
    query: subscription,
    variables: { roomID: 'room-123' },
  },
  {
    next: (data) => {
      console.log('New message:', data.data.messageAdded);
      // Update UI
    },
    error: (error) => {
      console.error('Subscription error:', error);
    },
    complete: () => {
      console.log('Subscription complete');
    },
  }
);

// Unsubscribe when done
// unsubscribe();

// Send message (mutation)
const httpClient = new GraphQLClient('http://localhost:8080/query');

async function sendMessage(roomID: string, content: string) {
  const mutation = gql`
    mutation SendMessage($input: SendMessageInput!) {
      sendMessage(input: $input) {
        id
        content
        createdAt
      }
    }
  `;

  const result = await httpClient.request(mutation, {
    input: { roomID, content },
  });

  return result.sendMessage;
}

React Hook

import { useSubscription, useMutation } from '@apollo/client';
import { gql } from '@apollo/client';

const MESSAGE_ADDED = gql`
  subscription MessageAdded($roomID: ID!) {
    messageAdded(roomID: $roomID) {
      id
      content
      user {
        id
        username
        avatarURL
      }
      createdAt
    }
  }
`;

const SEND_MESSAGE = gql`
  mutation SendMessage($input: SendMessageInput!) {
    sendMessage(input: $input) {
      id
      content
      createdAt
    }
  }
`;

function ChatRoom({ roomID }) {
  const [messages, setMessages] = useState([]);

  // Subscribe to new messages
  useSubscription(MESSAGE_ADDED, {
    variables: { roomID },
    onData: ({ data }) => {
      setMessages((prev) => [...prev, data.data.messageAdded]);
    },
  });

  // Mutation to send message
  const [sendMessage] = useMutation(SEND_MESSAGE);

  const handleSend = async (content) => {
    await sendMessage({
      variables: {
        input: { roomID, content },
      },
    });
  };

  return (
    <div>
      {messages.map((msg) => (
        <Message key={msg.id} message={msg} />
      ))}
      <MessageInput onSend={handleSend} />
    </div>
  );
}

Advanced Patterns

1. Subscription Filtering (Server-Side)

// Filter events before sending to client
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string, filter *MessageFilter) (<-chan *model.Message, error) {
	channel := fmt.Sprintf("room:%s:messages", roomID)
	msgChan, _ := r.pubsub.Subscribe(ctx, channel)
	out := make(chan *model.Message)

	go func() {
		defer close(out)

		for msg := range msgChan {
			if message, ok := msg.(*model.Message); ok {
				// Apply filters
				if filter != nil {
					if filter.UserID != nil && message.UserID != *filter.UserID {
						continue
					}
					if filter.MinLength != nil && len(message.Content) < *filter.MinLength {
						continue
					}
				}

				select {
				case out <- message:
				case <-ctx.Done():
					return
				}
			}
		}
	}()

	return out, nil
}

2. Rate Limiting Subscriptions

import "golang.org/x/time/rate"

type subscriptionResolver struct {
	*Resolver
	limiters map[string]*rate.Limiter
	mu       sync.Mutex
}

func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
	userID := ctx.Value("userID").(string)

	// Get or create rate limiter for user
	r.mu.Lock()
	limiter, ok := r.limiters[userID]
	if !ok {
		limiter = rate.NewLimiter(rate.Limit(10), 20) // 10 msg/s, burst 20
		r.limiters[userID] = limiter
	}
	r.mu.Unlock()

	channel := fmt.Sprintf("room:%s:messages", roomID)
	msgChan, _ := r.pubsub.Subscribe(ctx, channel)
	out := make(chan *model.Message)

	go func() {
		defer close(out)

		for msg := range msgChan {
			if message, ok := msg.(*model.Message); ok {
				// Apply rate limit
				if err := limiter.Wait(ctx); err != nil {
					continue
				}

				select {
				case out <- message:
				case <-ctx.Done():
					return
				}
			}
		}
	}()

	return out, nil
}

3. Batching Events

// Batch multiple events before sending
func (r *subscriptionResolver) ActivityFeedBatched(ctx context.Context) (<-chan []*model.Activity, error) {
	activityChan, _ := r.pubsub.Subscribe(ctx, "activity:feed")
	out := make(chan []*model.Activity)

	go func() {
		defer close(out)

		var batch []*model.Activity
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return

			case msg := <-activityChan:
				if activity, ok := msg.(*model.Activity); ok {
					batch = append(batch, activity)
				}

			case <-ticker.C:
				if len(batch) > 0 {
					select {
					case out <- batch:
						batch = []*model.Activity{}
					case <-ctx.Done():
						return
					}
				}
			}
		}
	}()

	return out, nil
}

Dependencies

// go.mod
module graphql-subscriptions

go 1.21

require (
	github.com/99designs/gqlgen v0.17.42
	github.com/gorilla/websocket v1.5.1
	github.com/redis/go-redis/v9 v9.4.0
	github.com/vektah/gqlparser/v2 v2.5.10
)

gqlgen Configuration

# gqlgen.yml
schema:
  - graph/schema.graphqls

exec:
  filename: graph/generated/generated.go
  package: generated

model:
  filename: graph/model/models_gen.go
  package: model

resolver:
  layout: follow-schema
  dir: graph
  package: graph
  filename_template: "{name}.resolvers.go"

autobind:
  - "graphql-subscriptions/graph/model"

models:
  ID:
    model:
      - github.com/99designs/gqlgen/graphql.ID
      - github.com/99designs/gqlgen/graphql.Int
      - github.com/99designs/gqlgen/graphql.Int64
      - github.com/99designs/gqlgen/graphql.Int32
  Int:
    model:
      - github.com/99designs/gqlgen/graphql.Int
      - github.com/99designs/gqlgen/graphql.Int64
      - github.com/99designs/gqlgen/graphql.Int32

Best Practices

1. Schema Design

Keep subscriptions focused:

# Good: Specific, focused subscription
subscription MessageAdded($roomID: ID!) {
  messageAdded(roomID: $roomID) {
    id
    content
  }
}

# Bad: Too broad, wasteful
subscription AllEvents {
  everything {
    ... # Too much data
  }
}

2. Error Handling

func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
	// Validate inputs
	if roomID == "" {
		return nil, fmt.Errorf("roomID required")
	}

	// Check permissions
	userID := ctx.Value("userID").(string)
	if !r.chatService.HasAccess(userID, roomID) {
		return nil, fmt.Errorf("access denied")
	}

	// Subscribe
	msgChan, err := r.pubsub.Subscribe(ctx, fmt.Sprintf("room:%s:messages", roomID))
	if err != nil {
		return nil, fmt.Errorf("subscribe failed: %w", err)
	}

	// ... rest of implementation
}

3. Memory Management

// Limit buffer sizes to prevent memory issues
out := make(chan *model.Message, 100) // Bounded channel

// Clean up on context cancellation
go func() {
	<-ctx.Done()
	close(out)
	// Cleanup resources
}()

4. Authentication & Authorization

// Check permissions for each subscription
func (r *subscriptionResolver) PrivateMessageAdded(ctx context.Context, conversationID string) (<-chan *model.Message, error) {
	userID := ctx.Value("userID").(string)

	// Verify user is participant
	if !r.chatService.IsParticipant(userID, conversationID) {
		return nil, fmt.Errorf("unauthorized: not a participant")
	}

	// ... subscription logic
}

When to Use GraphQL Subscriptions

Perfect for:

  • Chat and messaging applications
  • Real-time dashboards and analytics
  • Collaborative editing tools
  • Social media feeds and notifications
  • Live sports scores and gaming
  • Stock tickers and financial data
  • Type-safe real-time APIs

Not ideal for:

  • High-frequency updates (> 100 msg/sec per connection)
  • Simple server-to-client streaming (use SSE)
  • Binary data streaming (use WebRTC)
  • IoT devices (use MQTT)
  • When schema overhead isn’t justified
  • Broadcast to millions of clients

Conclusion

GraphQL Subscriptions provide type-safe, schema-driven real-time communication that integrates seamlessly with your existing GraphQL API. By combining WebSocket transport with GraphQL’s declarative query language, subscriptions offer a developer-friendly way to build real-time features with strong typing and built-in introspection.

Key takeaways:

  • Define subscriptions in your GraphQL schema
  • Use pub/sub for event distribution across servers
  • Implement proper authentication and authorization
  • Apply filtering and rate limiting server-side
  • Clean up resources on client disconnect
  • Use Redis pub/sub for horizontal scaling

GraphQL Subscriptions shine when you need typed real-time APIs that integrate with your existing GraphQL infrastructure, making them ideal for modern web applications requiring live updates.


{{series_nav(current_post=11)}}