{{series_nav(current_post=11)}}
GraphQL Subscriptions bring real-time capabilities to GraphQL APIs, allowing clients to subscribe to specific events and receive updates when data changes. Unlike queries and mutations, subscriptions maintain a long-lived connection and push data from server to client, providing type-safe, schema-driven real-time communication.
What are GraphQL Subscriptions?
GraphQL Subscriptions are a GraphQL operation type (alongside Query and Mutation) that enables real-time data flow from server to client. They combine the declarative nature of GraphQL with event-driven architecture, allowing clients to specify exactly what data they want to receive when specific events occur.
Core Concepts
Subscription: A GraphQL operation that establishes a long-lived connection for receiving updates.
Schema: Type-safe definition of available subscriptions and their payloads.
Resolver: Server-side function that manages subscription lifecycle and emits events.
Transport: WebSocket protocol for bidirectional communication (typically graphql-ws).
Filtering: Server-side or client-side filtering of subscription events.
Event Source: Backend system generating events (database, message queue, service).
Architecture
gqlgen] subgraph "Schema" S1[Subscription Types] S2[Query Types] S3[Mutation Types] end subgraph "Resolvers" R1[Subscription Resolvers] R2[Query Resolvers] R3[Mutation Resolvers] end subgraph "WebSocket Manager" WS[WebSocket Connections] SUB[Active Subscriptions] end end subgraph "Event Sources" DB[(Database)] MQ[Message Queue
NATS/Kafka] CACHE[Redis Pub/Sub] API[External APIs] end C1 <-->|WebSocket
graphql-ws| WS C2 <-->|WebSocket| WS C3 <-->|WebSocket| WS WS --> SUB SUB --> R1 R1 --> S1 R1 -.->|Listen| DB R1 -.->|Subscribe| MQ R1 -.->|Subscribe| CACHE R1 -.->|Poll| API R3 -->|Trigger Event| R1
Subscription Flow
Transport Protocol (graphql-ws)
Real-World Use Cases
1. Real-Time Chat
Subscribe to new messages in chat rooms with typing indicators.
Perfect for: Team collaboration, customer support, social messaging
2. Live Dashboards
Monitor metrics, KPIs, and system health in real-time.
Perfect for: Analytics dashboards, monitoring tools, admin panels
3. Collaborative Editing
Sync document changes across multiple users.
Perfect for: Google Docs-style apps, code editors, design tools
4. Social Media Feeds
Receive new posts, comments, likes, and notifications instantly.
Perfect for: Social networks, activity feeds, news aggregators
5. E-commerce Updates
Track order status, inventory changes, and price updates.
Perfect for: Shopping carts, auction sites, stock trading platforms
6. Gaming
Sync game state, player positions, and scores in real-time.
Perfect for: Multiplayer games, leaderboards, tournaments
7. IoT Monitoring
Stream sensor data and device status updates.
Perfect for: Smart home dashboards, industrial monitoring, fleet tracking
8. Notifications
Deliver targeted notifications based on user preferences.
Perfect for: Alert systems, push notifications, activity streams
Project Structure
graphql-subscriptions/
├── cmd/
│ └── server/
│ └── main.go # Server entry point
├── graph/
│ ├── schema.graphqls # GraphQL schema
│ ├── schema.resolvers.go # Generated resolvers
│ ├── resolver.go # Resolver root
│ ├── model/
│ │ └── models_gen.go # Generated models
│ └── generated/
│ └── generated.go # gqlgen generated code
├── internal/
│ ├── pubsub/
│ │ ├── pubsub.go # Pub/Sub abstraction
│ │ └── redis.go # Redis implementation
│ ├── service/
│ │ ├── chat.go # Chat service
│ │ ├── notification.go # Notification service
│ │ └── activity.go # Activity service
│ └── middleware/
│ ├── auth.go # Authentication
│ └── logging.go # Logging
├── pkg/
│ └── ws/
│ └── transport.go # WebSocket transport
├── gqlgen.yml # gqlgen configuration
├── go.mod
└── go.sum
Implementation
1. GraphQL Schema
# graph/schema.graphqls
# Scalar types
scalar Time
# === Query Types ===
type Query {
# Chat queries
messages(roomID: ID!, limit: Int = 50): [Message!]!
rooms: [Room!]!
# Notification queries
notifications(limit: Int = 20): [Notification!]!
# User queries
user(id: ID!): User
onlineUsers: [User!]!
}
# === Mutation Types ===
type Mutation {
# Chat mutations
sendMessage(input: SendMessageInput!): Message!
createRoom(input: CreateRoomInput!): Room!
joinRoom(roomID: ID!): Boolean!
leaveRoom(roomID: ID!): Boolean!
# User mutations
updateUserStatus(status: UserStatus!): User!
# Notification mutations
markNotificationRead(id: ID!): Notification!
}
# === Subscription Types ===
type Subscription {
# Chat subscriptions
messageAdded(roomID: ID!): Message!
typingIndicator(roomID: ID!): TypingIndicator!
roomUpdated(roomID: ID!): Room!
# User subscriptions
userStatusChanged(userID: ID): User!
# Notification subscriptions
notificationReceived: Notification!
# Activity subscriptions
activityFeed(filter: ActivityFilter): Activity!
}
# === Models ===
type Message {
id: ID!
roomID: ID!
userID: ID!
user: User!
content: String!
createdAt: Time!
updatedAt: Time!
}
type Room {
id: ID!
name: String!
description: String
members: [User!]!
createdAt: Time!
messageCount: Int!
}
type User {
id: ID!
username: String!
email: String!
status: UserStatus!
avatarURL: String
lastSeen: Time!
}
type Notification {
id: ID!
userID: ID!
type: NotificationType!
title: String!
message: String!
read: Boolean!
metadata: String
createdAt: Time!
}
type Activity {
id: ID!
type: ActivityType!
userID: ID!
user: User!
action: String!
resourceID: String
metadata: String
createdAt: Time!
}
type TypingIndicator {
roomID: ID!
userID: ID!
username: String!
isTyping: Boolean!
}
# === Enums ===
enum UserStatus {
ONLINE
AWAY
OFFLINE
}
enum NotificationType {
MESSAGE
MENTION
SYSTEM
ALERT
}
enum ActivityType {
MESSAGE
JOIN
LEAVE
STATUS_CHANGE
REACTION
}
# === Input Types ===
input SendMessageInput {
roomID: ID!
content: String!
}
input CreateRoomInput {
name: String!
description: String
}
input ActivityFilter {
types: [ActivityType!]
userID: ID
}
2. PubSub Abstraction
// internal/pubsub/pubsub.go
package pubsub
import (
"context"
"sync"
)
// PubSub defines pub/sub interface
type PubSub interface {
Publish(ctx context.Context, channel string, message interface{}) error
Subscribe(ctx context.Context, channel string) (<-chan interface{}, error)
Unsubscribe(ctx context.Context, channel string) error
Close() error
}
// InMemoryPubSub is a simple in-memory pub/sub for single-server setups
type InMemoryPubSub struct {
mu sync.RWMutex
channels map[string][]chan interface{}
}
func NewInMemoryPubSub() *InMemoryPubSub {
return &InMemoryPubSub{
channels: make(map[string][]chan interface{}),
}
}
func (p *InMemoryPubSub) Publish(ctx context.Context, channel string, message interface{}) error {
p.mu.RLock()
defer p.mu.RUnlock()
if subscribers, ok := p.channels[channel]; ok {
for _, ch := range subscribers {
select {
case ch <- message:
default:
// Channel full, skip
}
}
}
return nil
}
func (p *InMemoryPubSub) Subscribe(ctx context.Context, channel string) (<-chan interface{}, error) {
p.mu.Lock()
defer p.mu.Unlock()
ch := make(chan interface{}, 10)
p.channels[channel] = append(p.channels[channel], ch)
// Cleanup on context cancellation
go func() {
<-ctx.Done()
p.unsubscribe(channel, ch)
}()
return ch, nil
}
func (p *InMemoryPubSub) unsubscribe(channel string, ch chan interface{}) {
p.mu.Lock()
defer p.mu.Unlock()
if subscribers, ok := p.channels[channel]; ok {
for i, subscriber := range subscribers {
if subscriber == ch {
p.channels[channel] = append(subscribers[:i], subscribers[i+1:]...)
close(ch)
break
}
}
}
}
func (p *InMemoryPubSub) Unsubscribe(ctx context.Context, channel string) error {
// Not needed with auto-cleanup in Subscribe
return nil
}
func (p *InMemoryPubSub) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
for _, subscribers := range p.channels {
for _, ch := range subscribers {
close(ch)
}
}
p.channels = make(map[string][]chan interface{})
return nil
}
3. Redis PubSub Implementation
// internal/pubsub/redis.go
package pubsub
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
// RedisPubSub implements PubSub using Redis
type RedisPubSub struct {
client *redis.Client
}
func NewRedisPubSub(addr string) (*RedisPubSub, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
})
if err := client.Ping(context.Background()).Err(); err != nil {
return nil, fmt.Errorf("redis ping: %w", err)
}
return &RedisPubSub{client: client}, nil
}
func (r *RedisPubSub) Publish(ctx context.Context, channel string, message interface{}) error {
data, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
return r.client.Publish(ctx, channel, data).Err()
}
func (r *RedisPubSub) Subscribe(ctx context.Context, channel string) (<-chan interface{}, error) {
pubsub := r.client.Subscribe(ctx, channel)
ch := make(chan interface{}, 10)
go func() {
defer close(ch)
defer pubsub.Close()
msgChan := pubsub.Channel()
for {
select {
case <-ctx.Done():
return
case msg := <-msgChan:
if msg == nil {
return
}
var payload interface{}
if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil {
continue
}
select {
case ch <- payload:
case <-ctx.Done():
return
}
}
}
}()
return ch, nil
}
func (r *RedisPubSub) Unsubscribe(ctx context.Context, channel string) error {
// Auto-handled by context cancellation
return nil
}
func (r *RedisPubSub) Close() error {
return r.client.Close()
}
4. Resolver Root
// graph/resolver.go
package graph
import (
"graphql-subscriptions/internal/pubsub"
"graphql-subscriptions/internal/service"
"sync"
)
// Resolver is the root resolver
type Resolver struct {
pubsub pubsub.PubSub
chatService *service.ChatService
userService *service.UserService
notifService *service.NotificationService
// In-memory stores (use real DB in production)
mu sync.RWMutex
messages []*model.Message
rooms []*model.Room
users []*model.User
notifications []*model.Notification
}
// NewResolver creates a new resolver
func NewResolver(ps pubsub.PubSub) *Resolver {
resolver := &Resolver{
pubsub: ps,
messages: []*model.Message{},
rooms: []*model.Room{},
users: []*model.User{},
notifications: []*model.Notification{},
}
// Initialize services
resolver.chatService = service.NewChatService(ps, resolver)
resolver.userService = service.NewUserService(ps, resolver)
resolver.notifService = service.NewNotificationService(ps, resolver)
return resolver
}
// Mutation returns the mutation resolver
func (r *Resolver) Mutation() MutationResolver {
return &mutationResolver{r}
}
// Query returns the query resolver
func (r *Resolver) Query() QueryResolver {
return &queryResolver{r}
}
// Subscription returns the subscription resolver
func (r *Resolver) Subscription() SubscriptionResolver {
return &subscriptionResolver{r}
}
5. Subscription Resolvers
// graph/schema.resolvers.go
package graph
import (
"context"
"fmt"
"time"
"graphql-subscriptions/graph/model"
)
type subscriptionResolver struct{ *Resolver }
// MessageAdded subscribes to new messages in a room
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
channel := fmt.Sprintf("room:%s:messages", roomID)
// Subscribe to pub/sub channel
msgChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, fmt.Errorf("subscribe: %w", err)
}
// Convert interface{} channel to *model.Message channel
out := make(chan *model.Message)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-msgChan:
if !ok {
return
}
// Type assert to *model.Message
if message, ok := msg.(*model.Message); ok {
select {
case out <- message:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
// TypingIndicator subscribes to typing indicators in a room
func (r *subscriptionResolver) TypingIndicator(ctx context.Context, roomID string) (<-chan *model.TypingIndicator, error) {
channel := fmt.Sprintf("room:%s:typing", roomID)
indicatorChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, err
}
out := make(chan *model.TypingIndicator)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-indicatorChan:
if !ok {
return
}
if indicator, ok := msg.(*model.TypingIndicator); ok {
select {
case out <- indicator:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
// UserStatusChanged subscribes to user status changes
func (r *subscriptionResolver) UserStatusChanged(ctx context.Context, userID *string) (<-chan *model.User, error) {
channel := "user:status"
if userID != nil {
channel = fmt.Sprintf("user:%s:status", *userID)
}
userChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, err
}
out := make(chan *model.User)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-userChan:
if !ok {
return
}
if user, ok := msg.(*model.User); ok {
// Filter by userID if specified
if userID != nil && user.ID != *userID {
continue
}
select {
case out <- user:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
// NotificationReceived subscribes to user notifications
func (r *subscriptionResolver) NotificationReceived(ctx context.Context) (<-chan *model.Notification, error) {
// Get user ID from context (set by auth middleware)
userID, ok := ctx.Value("userID").(string)
if !ok {
return nil, fmt.Errorf("unauthorized")
}
channel := fmt.Sprintf("user:%s:notifications", userID)
notifChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, err
}
out := make(chan *model.Notification)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-notifChan:
if !ok {
return
}
if notif, ok := msg.(*model.Notification); ok {
select {
case out <- notif:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
// ActivityFeed subscribes to activity feed
func (r *subscriptionResolver) ActivityFeed(ctx context.Context, filter *model.ActivityFilter) (<-chan *model.Activity, error) {
channel := "activity:feed"
activityChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, err
}
out := make(chan *model.Activity)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-activityChan:
if !ok {
return
}
if activity, ok := msg.(*model.Activity); ok {
// Apply filters
if filter != nil {
if filter.UserID != nil && activity.UserID != *filter.UserID {
continue
}
if filter.Types != nil {
found := false
for _, t := range filter.Types {
if activity.Type == t {
found = true
break
}
}
if !found {
continue
}
}
}
select {
case out <- activity:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
// RoomUpdated subscribes to room updates
func (r *subscriptionResolver) RoomUpdated(ctx context.Context, roomID string) (<-chan *model.Room, error) {
channel := fmt.Sprintf("room:%s:updated", roomID)
roomChan, err := r.pubsub.Subscribe(ctx, channel)
if err != nil {
return nil, err
}
out := make(chan *model.Room)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case msg, ok := <-roomChan:
if !ok {
return
}
if room, ok := msg.(*model.Room); ok {
select {
case out <- room:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
6. Mutation Resolvers (Trigger Subscriptions)
type mutationResolver struct{ *Resolver }
// SendMessage sends a message and triggers subscription
func (r *mutationResolver) SendMessage(ctx context.Context, input model.SendMessageInput) (*model.Message, error) {
// Get user from context
userID, ok := ctx.Value("userID").(string)
if !ok {
return nil, fmt.Errorf("unauthorized")
}
// Create message
message := &model.Message{
ID: fmt.Sprintf("msg-%d", time.Now().UnixNano()),
RoomID: input.RoomID,
UserID: userID,
Content: input.Content,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// Store message
r.mu.Lock()
r.messages = append(r.messages, message)
r.mu.Unlock()
// Publish to subscribers
channel := fmt.Sprintf("room:%s:messages", input.RoomID)
if err := r.pubsub.Publish(ctx, channel, message); err != nil {
return nil, fmt.Errorf("publish: %w", err)
}
// Trigger activity feed
activity := &model.Activity{
ID: fmt.Sprintf("act-%d", time.Now().UnixNano()),
Type: model.ActivityTypeMessage,
UserID: userID,
Action: "sent a message",
ResourceID: &message.ID,
CreatedAt: time.Now(),
}
r.pubsub.Publish(ctx, "activity:feed", activity)
return message, nil
}
// UpdateUserStatus updates user status and triggers subscription
func (r *mutationResolver) UpdateUserStatus(ctx context.Context, status model.UserStatus) (*model.User, error) {
userID, ok := ctx.Value("userID").(string)
if !ok {
return nil, fmt.Errorf("unauthorized")
}
// Find and update user
r.mu.Lock()
var user *model.User
for _, u := range r.users {
if u.ID == userID {
u.Status = status
u.LastSeen = time.Now()
user = u
break
}
}
r.mu.Unlock()
if user == nil {
return nil, fmt.Errorf("user not found")
}
// Publish status change
r.pubsub.Publish(ctx, "user:status", user)
r.pubsub.Publish(ctx, fmt.Sprintf("user:%s:status", userID), user)
return user, nil
}
7. Server Setup
// cmd/server/main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/extension"
"github.com/99designs/gqlgen/graphql/handler/loglevel"
"github.com/99designs/gqlgen/graphql/handler/transport"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/gorilla/websocket"
"graphql-subscriptions/graph"
"graphql-subscriptions/graph/generated"
"graphql-subscriptions/internal/pubsub"
)
func main() {
// Create pub/sub (use Redis in production)
ps := pubsub.NewInMemoryPubSub()
// For multi-server: ps, _ := pubsub.NewRedisPubSub("localhost:6379")
defer ps.Close()
// Create resolver
resolver := graph.NewResolver(ps)
// Create GraphQL server
srv := handler.New(generated.NewExecutableSchema(generated.Config{
Resolvers: resolver,
}))
// Add transports
srv.AddTransport(transport.POST{})
srv.AddTransport(transport.Websocket{
KeepAlivePingInterval: 10 * time.Second,
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // Configure appropriately for production
},
},
})
// Add extensions
srv.Use(extension.Introspection{})
srv.SetErrorPresenter(errorPresenter)
srv.SetRecoverFunc(recoverFunc)
// Logging
srv.SetQueryLogger(loglevel.DebugLevel)
// Setup routes
http.Handle("/", playground.Handler("GraphQL Playground", "/query"))
http.Handle("/query", authMiddleware(srv))
// Health check
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// Start server
server := &http.Server{
Addr: ":8080",
Handler: nil,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
go func() {
log.Println("Server starting on http://localhost:8080")
log.Println("GraphQL Playground: http://localhost:8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}
log.Println("Server stopped")
}
func authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract user from JWT/session
// For demo, use header
userID := r.Header.Get("X-User-ID")
if userID == "" {
userID = "anonymous"
}
ctx := context.WithValue(r.Context(), "userID", userID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func errorPresenter(ctx context.Context, e error) *gqlerror.Error {
log.Printf("GraphQL error: %v", e)
return graphql.DefaultErrorPresenter(ctx, e)
}
func recoverFunc(ctx context.Context, err interface{}) error {
log.Printf("Panic recovered: %v", err)
return fmt.Errorf("internal server error")
}
Client Examples
JavaScript/TypeScript Client
import { createClient } from 'graphql-ws';
import { GraphQLClient, gql } from 'graphql-request';
// WebSocket client for subscriptions
const wsClient = createClient({
url: 'ws://localhost:8080/query',
connectionParams: {
authorization: 'Bearer <token>',
},
});
// Subscribe to messages
const subscription = gql`
subscription MessageAdded($roomID: ID!) {
messageAdded(roomID: $roomID) {
id
content
user {
id
username
}
createdAt
}
}
`;
const unsubscribe = wsClient.subscribe(
{
query: subscription,
variables: { roomID: 'room-123' },
},
{
next: (data) => {
console.log('New message:', data.data.messageAdded);
// Update UI
},
error: (error) => {
console.error('Subscription error:', error);
},
complete: () => {
console.log('Subscription complete');
},
}
);
// Unsubscribe when done
// unsubscribe();
// Send message (mutation)
const httpClient = new GraphQLClient('http://localhost:8080/query');
async function sendMessage(roomID: string, content: string) {
const mutation = gql`
mutation SendMessage($input: SendMessageInput!) {
sendMessage(input: $input) {
id
content
createdAt
}
}
`;
const result = await httpClient.request(mutation, {
input: { roomID, content },
});
return result.sendMessage;
}
React Hook
import { useSubscription, useMutation } from '@apollo/client';
import { gql } from '@apollo/client';
const MESSAGE_ADDED = gql`
subscription MessageAdded($roomID: ID!) {
messageAdded(roomID: $roomID) {
id
content
user {
id
username
avatarURL
}
createdAt
}
}
`;
const SEND_MESSAGE = gql`
mutation SendMessage($input: SendMessageInput!) {
sendMessage(input: $input) {
id
content
createdAt
}
}
`;
function ChatRoom({ roomID }) {
const [messages, setMessages] = useState([]);
// Subscribe to new messages
useSubscription(MESSAGE_ADDED, {
variables: { roomID },
onData: ({ data }) => {
setMessages((prev) => [...prev, data.data.messageAdded]);
},
});
// Mutation to send message
const [sendMessage] = useMutation(SEND_MESSAGE);
const handleSend = async (content) => {
await sendMessage({
variables: {
input: { roomID, content },
},
});
};
return (
<div>
{messages.map((msg) => (
<Message key={msg.id} message={msg} />
))}
<MessageInput onSend={handleSend} />
</div>
);
}
Advanced Patterns
1. Subscription Filtering (Server-Side)
// Filter events before sending to client
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string, filter *MessageFilter) (<-chan *model.Message, error) {
channel := fmt.Sprintf("room:%s:messages", roomID)
msgChan, _ := r.pubsub.Subscribe(ctx, channel)
out := make(chan *model.Message)
go func() {
defer close(out)
for msg := range msgChan {
if message, ok := msg.(*model.Message); ok {
// Apply filters
if filter != nil {
if filter.UserID != nil && message.UserID != *filter.UserID {
continue
}
if filter.MinLength != nil && len(message.Content) < *filter.MinLength {
continue
}
}
select {
case out <- message:
case <-ctx.Done():
return
}
}
}
}()
return out, nil
}
2. Rate Limiting Subscriptions
import "golang.org/x/time/rate"
type subscriptionResolver struct {
*Resolver
limiters map[string]*rate.Limiter
mu sync.Mutex
}
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
userID := ctx.Value("userID").(string)
// Get or create rate limiter for user
r.mu.Lock()
limiter, ok := r.limiters[userID]
if !ok {
limiter = rate.NewLimiter(rate.Limit(10), 20) // 10 msg/s, burst 20
r.limiters[userID] = limiter
}
r.mu.Unlock()
channel := fmt.Sprintf("room:%s:messages", roomID)
msgChan, _ := r.pubsub.Subscribe(ctx, channel)
out := make(chan *model.Message)
go func() {
defer close(out)
for msg := range msgChan {
if message, ok := msg.(*model.Message); ok {
// Apply rate limit
if err := limiter.Wait(ctx); err != nil {
continue
}
select {
case out <- message:
case <-ctx.Done():
return
}
}
}
}()
return out, nil
}
3. Batching Events
// Batch multiple events before sending
func (r *subscriptionResolver) ActivityFeedBatched(ctx context.Context) (<-chan []*model.Activity, error) {
activityChan, _ := r.pubsub.Subscribe(ctx, "activity:feed")
out := make(chan []*model.Activity)
go func() {
defer close(out)
var batch []*model.Activity
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case msg := <-activityChan:
if activity, ok := msg.(*model.Activity); ok {
batch = append(batch, activity)
}
case <-ticker.C:
if len(batch) > 0 {
select {
case out <- batch:
batch = []*model.Activity{}
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
Dependencies
// go.mod
module graphql-subscriptions
go 1.21
require (
github.com/99designs/gqlgen v0.17.42
github.com/gorilla/websocket v1.5.1
github.com/redis/go-redis/v9 v9.4.0
github.com/vektah/gqlparser/v2 v2.5.10
)
gqlgen Configuration
# gqlgen.yml
schema:
- graph/schema.graphqls
exec:
filename: graph/generated/generated.go
package: generated
model:
filename: graph/model/models_gen.go
package: model
resolver:
layout: follow-schema
dir: graph
package: graph
filename_template: "{name}.resolvers.go"
autobind:
- "graphql-subscriptions/graph/model"
models:
ID:
model:
- github.com/99designs/gqlgen/graphql.ID
- github.com/99designs/gqlgen/graphql.Int
- github.com/99designs/gqlgen/graphql.Int64
- github.com/99designs/gqlgen/graphql.Int32
Int:
model:
- github.com/99designs/gqlgen/graphql.Int
- github.com/99designs/gqlgen/graphql.Int64
- github.com/99designs/gqlgen/graphql.Int32
Best Practices
1. Schema Design
Keep subscriptions focused:
# Good: Specific, focused subscription
subscription MessageAdded($roomID: ID!) {
messageAdded(roomID: $roomID) {
id
content
}
}
# Bad: Too broad, wasteful
subscription AllEvents {
everything {
... # Too much data
}
}
2. Error Handling
func (r *subscriptionResolver) MessageAdded(ctx context.Context, roomID string) (<-chan *model.Message, error) {
// Validate inputs
if roomID == "" {
return nil, fmt.Errorf("roomID required")
}
// Check permissions
userID := ctx.Value("userID").(string)
if !r.chatService.HasAccess(userID, roomID) {
return nil, fmt.Errorf("access denied")
}
// Subscribe
msgChan, err := r.pubsub.Subscribe(ctx, fmt.Sprintf("room:%s:messages", roomID))
if err != nil {
return nil, fmt.Errorf("subscribe failed: %w", err)
}
// ... rest of implementation
}
3. Memory Management
// Limit buffer sizes to prevent memory issues
out := make(chan *model.Message, 100) // Bounded channel
// Clean up on context cancellation
go func() {
<-ctx.Done()
close(out)
// Cleanup resources
}()
4. Authentication & Authorization
// Check permissions for each subscription
func (r *subscriptionResolver) PrivateMessageAdded(ctx context.Context, conversationID string) (<-chan *model.Message, error) {
userID := ctx.Value("userID").(string)
// Verify user is participant
if !r.chatService.IsParticipant(userID, conversationID) {
return nil, fmt.Errorf("unauthorized: not a participant")
}
// ... subscription logic
}
When to Use GraphQL Subscriptions
✅ Perfect for:
- Chat and messaging applications
- Real-time dashboards and analytics
- Collaborative editing tools
- Social media feeds and notifications
- Live sports scores and gaming
- Stock tickers and financial data
- Type-safe real-time APIs
❌ Not ideal for:
- High-frequency updates (> 100 msg/sec per connection)
- Simple server-to-client streaming (use SSE)
- Binary data streaming (use WebRTC)
- IoT devices (use MQTT)
- When schema overhead isn’t justified
- Broadcast to millions of clients
Conclusion
GraphQL Subscriptions provide type-safe, schema-driven real-time communication that integrates seamlessly with your existing GraphQL API. By combining WebSocket transport with GraphQL’s declarative query language, subscriptions offer a developer-friendly way to build real-time features with strong typing and built-in introspection.
Key takeaways:
- Define subscriptions in your GraphQL schema
- Use pub/sub for event distribution across servers
- Implement proper authentication and authorization
- Apply filtering and rate limiting server-side
- Clean up resources on client disconnect
- Use Redis pub/sub for horizontal scaling
GraphQL Subscriptions shine when you need typed real-time APIs that integrate with your existing GraphQL infrastructure, making them ideal for modern web applications requiring live updates.
{{series_nav(current_post=11)}}