Backend Communication
Current:
WebSockets
What are WebSockets?
WebSockets provide full-duplex, bidirectional communication channels over a single TCP connection. Unlike HTTP’s request-response model, WebSockets enable both client and server to send messages independently at any time, making them ideal for real-time applications.
Key characteristics:
- Full-duplex communication - Both parties can send/receive simultaneously
- Persistent connection - Single long-lived connection reduces overhead
- Low latency - No HTTP request/response overhead per message
- Binary and text support - Efficient data transfer
- Protocol upgrade - Starts as HTTP, upgrades to WebSocket
Protocol Overview
sequenceDiagram
participant C as Client
participant S as Server
Note over C,S: WebSocket Handshake
C->>S: GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xxx S-->>C: HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade rect rgb(200, 220, 240) Note over C,S: Bidirectional Messages C->>S: Text/Binary Frame S->>C: Text/Binary Frame S->>C: Message C->>S: Message end rect rgb(220, 240, 200) Note over C,S: Ping/Pong Heartbeat S->>C: Ping Frame C->>S: Pong Frame end rect rgb(240, 220, 200) Note over C,S: Connection Close C->>S: Close Frame (1000) S->>C: Close Frame (1000) end
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xxx S-->>C: HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade rect rgb(200, 220, 240) Note over C,S: Bidirectional Messages C->>S: Text/Binary Frame S->>C: Text/Binary Frame S->>C: Message C->>S: Message end rect rgb(220, 240, 200) Note over C,S: Ping/Pong Heartbeat S->>C: Ping Frame C->>S: Pong Frame end rect rgb(240, 220, 200) Note over C,S: Connection Close C->>S: Close Frame (1000) S->>C: Close Frame (1000) end
Real-World Use Cases
- Chat Applications - Instant messaging, group chats, DMs
- Collaborative Editing - Google Docs-style real-time collaboration
- Multiplayer Games - Real-time game state synchronization
- Trading Platforms - Live price updates, order book changes
- Live Feeds - Social media streams, news updates, notifications
- IoT Dashboards - Real-time sensor data, device control
- Video/Audio Calls - Signaling for WebRTC connections
- Live Sports Scores - Real-time score updates, play-by-play
Implementation in Go
Project Structure
websocket-server/
├── main.go
├── websocket/
│ ├── hub.go
│ ├── client.go
│ └── message.go
├── handlers/
│ └── ws_handler.go
├── middleware/
│ └── auth.go
├── redis/
│ └── pubsub.go
└── go.mod
Dependencies
go.mod
module websocket-server
go 1.21
require (
github.com/gorilla/websocket v1.5.1
github.com/golang-jwt/jwt/v5 v5.2.0
github.com/redis/go-redis/v9 v9.4.0
golang.org/x/time v0.5.0
)
1. Message Types
websocket/message.go
package websocket
import (
"encoding/json"
"time"
)
// MessageType represents different message types
type MessageType string
const (
MessageTypeChat MessageType = "chat"
MessageTypeJoin MessageType = "join"
MessageTypeLeave MessageType = "leave"
MessageTypeTyping MessageType = "typing"
MessageTypeUserList MessageType = "user_list"
MessageTypeError MessageType = "error"
MessageTypeHeartbeat MessageType = "heartbeat"
)
// Message represents a WebSocket message
type Message struct {
Type MessageType `json:"type"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"` // For direct messages
Room string `json:"room,omitempty"`
Content string `json:"content,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
// Envelope wraps messages for internal routing
type Envelope struct {
Message *Message
ClientID string
Room string
Broadcast bool
}
// NewMessage creates a new message
func NewMessage(msgType MessageType, from, content string) *Message {
return &Message{
Type: msgType,
From: from,
Content: content,
Timestamp: time.Now(),
}
}
// ToJSON converts message to JSON bytes
func (m *Message) ToJSON() ([]byte, error) {
return json.Marshal(m)
}
// FromJSON parses JSON bytes into message
func FromJSON(data []byte) (*Message, error) {
var msg Message
err := json.Unmarshal(data, &msg)
return &msg, err
}
2. Client Connection
websocket/client.go
package websocket
import (
"context"
"log"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer
pongWait = 60 * time.Second
// Send pings to peer with this period (must be less than pongWait)
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer
maxMessageSize = 512 * 1024 // 512 KB
)
// Client represents a WebSocket client connection
type Client struct {
ID string
Hub *Hub
Conn *websocket.Conn
Send chan *Message
Rooms map[string]bool
UserID string
Username string
ctx context.Context
cancel context.CancelFunc
}
// NewClient creates a new client
func NewClient(id string, hub *Hub, conn *websocket.Conn, userID, username string) *Client {
ctx, cancel := context.WithCancel(context.Background())
return &Client{
ID: id,
Hub: hub,
Conn: conn,
Send: make(chan *Message, 256),
Rooms: make(map[string]bool),
UserID: userID,
Username: username,
ctx: ctx,
cancel: cancel,
}
}
// ReadPump pumps messages from the WebSocket connection to the hub
func (c *Client) ReadPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, data, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket error: %v", err)
}
break
}
// Parse message
message, err := FromJSON(data)
if err != nil {
log.Printf("Error parsing message: %v", err)
continue
}
// Set sender info
message.From = c.Username
message.Timestamp = time.Now()
// Create envelope for routing
envelope := &Envelope{
Message: message,
ClientID: c.ID,
Broadcast: true,
}
// Handle different message types
switch message.Type {
case MessageTypeChat:
if message.To != "" {
// Direct message
envelope.Broadcast = false
c.Hub.DirectMessage <- envelope
} else {
// Broadcast to room or all
c.Hub.Broadcast <- envelope
}
case MessageTypeJoin:
c.Hub.JoinRoom <- &RoomAction{
Client: c,
Room: message.Room,
}
case MessageTypeLeave:
c.Hub.LeaveRoom <- &RoomAction{
Client: c,
Room: message.Room,
}
default:
c.Hub.Broadcast <- envelope
}
log.Printf("Client %s sent %s message", c.ID, message.Type)
}
}
// WritePump pumps messages from the hub to the WebSocket connection
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Hub closed the channel
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// Write message
data, err := message.ToJSON()
if err != nil {
log.Printf("Error encoding message: %v", err)
continue
}
if err := c.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Printf("Error writing message: %v", err)
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <-c.ctx.Done():
return
}
}
}
// Close closes the client connection
func (c *Client) Close() {
c.cancel()
close(c.Send)
}
3. Hub for Connection Pooling
websocket/hub.go
package websocket
import (
"log"
"sync"
)
// RoomAction represents a room join/leave action
type RoomAction struct {
Client *Client
Room string
}
// Hub maintains active clients and broadcasts messages
type Hub struct {
// Registered clients
Clients map[string]*Client
mu sync.RWMutex
// Room memberships: room -> client IDs
Rooms map[string]map[string]bool
roomsMu sync.RWMutex
// Inbound messages from clients
Broadcast chan *Envelope
// Direct messages
DirectMessage chan *Envelope
// Register requests from clients
Register chan *Client
// Unregister requests from clients
Unregister chan *Client
// Room management
JoinRoom chan *RoomAction
LeaveRoom chan *RoomAction
}
// NewHub creates a new Hub
func NewHub() *Hub {
return &Hub{
Clients: make(map[string]*Client),
Rooms: make(map[string]map[string]bool),
Broadcast: make(chan *Envelope, 256),
DirectMessage: make(chan *Envelope, 256),
Register: make(chan *Client),
Unregister: make(chan *Client),
JoinRoom: make(chan *RoomAction),
LeaveRoom: make(chan *RoomAction),
}
}
// Run starts the hub's main loop
func (h *Hub) Run() {
for {
select {
case client := <-h.Register:
h.registerClient(client)
case client := <-h.Unregister:
h.unregisterClient(client)
case envelope := <-h.Broadcast:
h.broadcastMessage(envelope)
case envelope := <-h.DirectMessage:
h.sendDirectMessage(envelope)
case action := <-h.JoinRoom:
h.joinRoom(action)
case action := <-h.LeaveRoom:
h.leaveRoom(action)
}
}
}
func (h *Hub) registerClient(client *Client) {
h.mu.Lock()
h.Clients[client.ID] = client
count := len(h.Clients)
h.mu.Unlock()
log.Printf("Client %s registered (total: %d)", client.ID, count)
// Send welcome message
welcome := NewMessage(MessageTypeJoin, "system", "Welcome to the chat!")
client.Send <- welcome
// Notify others
notification := NewMessage(MessageTypeJoin, "system", client.Username+" joined")
h.broadcastMessage(&Envelope{
Message: notification,
ClientID: client.ID,
Broadcast: true,
})
}
func (h *Hub) unregisterClient(client *Client) {
h.mu.Lock()
if _, ok := h.Clients[client.ID]; ok {
delete(h.Clients, client.ID)
client.Close()
}
count := len(h.Clients)
h.mu.Unlock()
// Remove from all rooms
h.roomsMu.Lock()
for room := range client.Rooms {
if clients, ok := h.Rooms[room]; ok {
delete(clients, client.ID)
if len(clients) == 0 {
delete(h.Rooms, room)
}
}
}
h.roomsMu.Unlock()
log.Printf("Client %s unregistered (total: %d)", client.ID, count)
// Notify others
notification := NewMessage(MessageTypeLeave, "system", client.Username+" left")
h.broadcastMessage(&Envelope{
Message: notification,
ClientID: client.ID,
Broadcast: true,
})
}
func (h *Hub) broadcastMessage(envelope *Envelope) {
h.mu.RLock()
defer h.mu.RUnlock()
if envelope.Message.Room != "" {
// Broadcast to specific room
h.roomsMu.RLock()
roomClients, exists := h.Rooms[envelope.Message.Room]
h.roomsMu.RUnlock()
if exists {
for clientID := range roomClients {
if client, ok := h.Clients[clientID]; ok && client.ID != envelope.ClientID {
select {
case client.Send <- envelope.Message:
default:
log.Printf("Client %s send buffer full", client.ID)
}
}
}
log.Printf("Broadcast to room %s: %d clients", envelope.Message.Room, len(roomClients))
}
} else {
// Broadcast to all clients
for _, client := range h.Clients {
if client.ID != envelope.ClientID { // Don't echo to sender
select {
case client.Send <- envelope.Message:
default:
log.Printf("Client %s send buffer full", client.ID)
}
}
}
log.Printf("Broadcast to all: %d clients", len(h.Clients))
}
}
func (h *Hub) sendDirectMessage(envelope *Envelope) {
h.mu.RLock()
defer h.mu.RUnlock()
// Find recipient by username
for _, client := range h.Clients {
if client.Username == envelope.Message.To {
select {
case client.Send <- envelope.Message:
log.Printf("Direct message from %s to %s", envelope.Message.From, envelope.Message.To)
default:
log.Printf("Client %s send buffer full", client.ID)
}
return
}
}
// Recipient not found - send error back to sender
if sender, ok := h.Clients[envelope.ClientID]; ok {
errorMsg := NewMessage(MessageTypeError, "system", "User not found: "+envelope.Message.To)
sender.Send <- errorMsg
}
}
func (h *Hub) joinRoom(action *RoomAction) {
h.roomsMu.Lock()
if h.Rooms[action.Room] == nil {
h.Rooms[action.Room] = make(map[string]bool)
}
h.Rooms[action.Room][action.Client.ID] = true
h.roomsMu.Unlock()
action.Client.Rooms[action.Room] = true
log.Printf("Client %s joined room %s", action.Client.ID, action.Room)
// Notify client
msg := NewMessage(MessageTypeJoin, "system", "Joined room: "+action.Room)
msg.Room = action.Room
action.Client.Send <- msg
// Notify room members
notification := NewMessage(MessageTypeJoin, "system", action.Client.Username+" joined the room")
notification.Room = action.Room
h.broadcastMessage(&Envelope{
Message: notification,
ClientID: action.Client.ID,
Broadcast: true,
})
}
func (h *Hub) leaveRoom(action *RoomAction) {
h.roomsMu.Lock()
if clients, ok := h.Rooms[action.Room]; ok {
delete(clients, action.Client.ID)
if len(clients) == 0 {
delete(h.Rooms, action.Room)
}
}
h.roomsMu.Unlock()
delete(action.Client.Rooms, action.Room)
log.Printf("Client %s left room %s", action.Client.ID, action.Room)
// Notify room members
notification := NewMessage(MessageTypeLeave, "system", action.Client.Username+" left the room")
notification.Room = action.Room
h.broadcastMessage(&Envelope{
Message: notification,
ClientID: action.Client.ID,
Broadcast: true,
})
}
// GetStats returns hub statistics
func (h *Hub) GetStats() map[string]interface{} {
h.mu.RLock()
clientCount := len(h.Clients)
h.mu.RUnlock()
h.roomsMu.RLock()
roomCount := len(h.Rooms)
h.roomsMu.RUnlock()
return map[string]interface{}{
"clients": clientCount,
"rooms": roomCount,
}
}
4. HTTP Handler with JWT Authentication
middleware/auth.go
package middleware
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
)
var jwtSecret = []byte("your-secret-key-change-in-production")
type Claims struct {
UserID string `json:"user_id"`
Username string `json:"username"`
jwt.RegisteredClaims
}
// GenerateToken generates a JWT token
func GenerateToken(userID, username string) (string, error) {
claims := &Claims{
UserID: userID,
Username: username,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(jwtSecret)
}
// ValidateToken validates a JWT token
func ValidateToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method")
}
return jwtSecret, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
// AuthMiddleware validates JWT from query params or header
func AuthMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Try to get token from query params (for WebSocket upgrade)
tokenString := r.URL.Query().Get("token")
// If not in query, try Authorization header
if tokenString == "" {
authHeader := r.Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") {
tokenString = strings.TrimPrefix(authHeader, "Bearer ")
}
}
if tokenString == "" {
http.Error(w, "Missing token", http.StatusUnauthorized)
return
}
claims, err := ValidateToken(tokenString)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// Add claims to request context
ctx := context.WithValue(r.Context(), "claims", claims)
next(w, r.WithContext(ctx))
}
}
handlers/ws_handler.go
package handlers
import (
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
"websocket-server/middleware"
"websocket-server/ws"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// In production, validate origin properly
return true
},
}
// WSHandler handles WebSocket connections
func WSHandler(hub *ws.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Get claims from context (set by auth middleware)
claims, ok := r.Context().Value("claims").(*middleware.Claims)
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Upgrade connection
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Upgrade error: %v", err)
return
}
// Create client
clientID := fmt.Sprintf("%s_%d", claims.UserID, time.Now().UnixNano())
client := ws.NewClient(clientID, hub, conn, claims.UserID, claims.Username)
// Register client
hub.Register <- client
// Start pumps
go client.WritePump()
go client.ReadPump()
log.Printf("WebSocket connection established for user %s", claims.Username)
}
}
5. Rate Limiting
middleware/ratelimit.go
package middleware
import (
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
// IPRateLimiter manages rate limiters per IP
type IPRateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
r rate.Limit
b int
}
// NewIPRateLimiter creates a new IP-based rate limiter
func NewIPRateLimiter(r rate.Limit, b int) *IPRateLimiter {
return &IPRateLimiter{
limiters: make(map[string]*rate.Limiter),
r: r,
b: b,
}
}
// GetLimiter returns the rate limiter for an IP
func (i *IPRateLimiter) GetLimiter(ip string) *rate.Limiter {
i.mu.Lock()
defer i.mu.Unlock()
limiter, exists := i.limiters[ip]
if !exists {
limiter = rate.NewLimiter(i.r, i.b)
i.limiters[ip] = limiter
}
return limiter
}
// Cleanup removes old limiters
func (i *IPRateLimiter) Cleanup() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
i.mu.Lock()
for ip, limiter := range i.limiters {
if time.Since(limiter.Reserve().Delay()) > 3*time.Minute {
delete(i.limiters, ip)
}
}
i.mu.Unlock()
}
}
// RateLimitMiddleware creates rate limiting middleware
func RateLimitMiddleware(limiter *IPRateLimiter) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ip := r.RemoteAddr
l := limiter.GetLimiter(ip)
if !l.Allow() {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
next(w, r)
}
}
}
6. Main Application
main.go
package main
import (
"encoding/json"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/time/rate"
"websocket-server/handlers"
"websocket-server/middleware"
"websocket-server/websocket"
)
func main() {
// Create hub
hub := websocket.NewHub()
go hub.Run()
// Create rate limiter (10 req/sec, burst 20)
rateLimiter := middleware.NewIPRateLimiter(rate.Limit(10), 20)
go rateLimiter.Cleanup()
// Setup routes
mux := http.NewServeMux()
// WebSocket endpoint (with auth and rate limiting)
wsHandler := middleware.AuthMiddleware(handlers.WSHandler(hub))
wsHandler = middleware.RateLimitMiddleware(rateLimiter)(wsHandler)
mux.HandleFunc("/ws", wsHandler)
// Auth endpoints
mux.HandleFunc("/auth/login", loginHandler)
mux.HandleFunc("/auth/register", registerHandler)
// Status endpoint
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(hub.GetStats())
})
// Health check
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// Static file serving for demo client
mux.Handle("/", http.FileServer(http.Dir("./static")))
// Start server
server := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down server...")
server.Close()
}()
log.Println("WebSocket server starting on :8080")
log.Println("Endpoints:")
log.Println(" - WS /ws?token=<jwt> (WebSocket)")
log.Println(" - POST /auth/login (get JWT)")
log.Println(" - GET /status (hub stats)")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}
// Simple login handler (demo only)
func loginHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
username := r.FormValue("username")
if username == "" {
http.Error(w, "Username required", http.StatusBadRequest)
return
}
// In production: validate credentials against database
userID := "user_" + username
token, err := middleware.GenerateToken(userID, username)
if err != nil {
http.Error(w, "Token generation failed", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"token": token,
"user_id": userID,
"username": username,
})
}
func registerHandler(w http.ResponseWriter, r *http.Request) {
// Same as login for demo
loginHandler(w, r)
}
7. Client Examples
JavaScript Client
class WebSocketClient {
constructor(url, token) {
this.url = url;
this.token = token;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
}
connect() {
this.ws = new WebSocket(`${this.url}?token=${this.token}`);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleMessage(message);
} catch (e) {
console.error('Parse error:', e);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason);
this.reconnect();
};
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
return;
}
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
send(type, content, options = {}) {
if (this.ws.readyState !== WebSocket.OPEN) {
console.error('WebSocket not connected');
return;
}
const message = {
type,
content,
...options
};
this.ws.send(JSON.stringify(message));
}
sendChat(content, room = null) {
this.send('chat', content, { room });
}
sendDM(to, content) {
this.send('chat', content, { to });
}
joinRoom(room) {
this.send('join', '', { room });
}
leaveRoom(room) {
this.send('leave', '', { room });
}
handleMessage(message) {
console.log('Received:', message);
switch (message.type) {
case 'chat':
this.onChat(message);
break;
case 'join':
this.onJoin(message);
break;
case 'leave':
this.onLeave(message);
break;
case 'error':
this.onError(message);
break;
}
}
onChat(message) {
// Override in your app
console.log(`${message.from}: ${message.content}`);
}
onJoin(message) {
console.log(`${message.from} joined`);
}
onLeave(message) {
console.log(`${message.from} left`);
}
onError(message) {
console.error('Server error:', message.content);
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
async function main() {
// Get token
const response = await fetch('/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: 'username=alice'
});
const { token } = await response.json();
// Connect
const client = new WebSocketClient('ws://localhost:8080/ws', token);
client.connect();
// Send messages
client.sendChat('Hello everyone!');
client.joinRoom('general');
client.sendChat('Hello room!', 'general');
client.sendDM('bob', 'Private message');
}
main();
Go Client
package main
import (
"encoding/json"
"log"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
type Message struct {
Type string `json:"type"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"`
Room string `json:"room,omitempty"`
Content string `json:"content,omitempty"`
Time time.Time `json:"timestamp"`
}
func main() {
token := "your-jwt-token"
u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"}
q := u.Query()
q.Set("token", token)
u.RawQuery = q.Encode()
log.Printf("Connecting to %s", u.String())
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("Dial error:", err)
}
defer conn.Close()
// Interrupt handler
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
done := make(chan struct{})
// Read messages
go func() {
defer close(done)
for {
var msg Message
err := conn.ReadJSON(&msg)
if err != nil {
log.Println("Read error:", err)
return
}
log.Printf("Received: %+v", msg)
}
}()
// Send a test message
msg := Message{
Type: "chat",
Content: "Hello from Go client!",
}
if err := conn.WriteJSON(msg); err != nil {
log.Println("Write error:", err)
return
}
// Wait for interrupt
select {
case <-done:
return
case <-interrupt:
log.Println("Interrupt received, closing connection")
// Send close message
err := conn.WriteMessage(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
)
if err != nil {
log.Println("Write close error:", err)
return
}
select {
case <-done:
case <-time.After(time.Second):
}
}
}
Scaling with Redis Pub/Sub
redis/pubsub.go
package redis
import (
"context"
"encoding/json"
"log"
"github.com/redis/go-redis/v9"
"websocket-server/websocket"
)
type RedisHub struct {
client *redis.Client
pubsub *redis.PubSub
localHub *websocket.Hub
channel string
}
func NewRedisHub(redisAddr, channel string, localHub *websocket.Hub) *RedisHub {
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
return &RedisHub{
client: client,
localHub: localHub,
channel: channel,
}
}
func (rh *RedisHub) Start(ctx context.Context) error {
// Subscribe to channel
rh.pubsub = rh.client.Subscribe(ctx, rh.channel)
// Wait for confirmation
_, err := rh.pubsub.Receive(ctx)
if err != nil {
return err
}
log.Printf("Subscribed to Redis channel: %s", rh.channel)
// Listen for messages
go rh.listen(ctx)
// Forward local broadcasts to Redis
go rh.forwardToRedis()
return nil
}
func (rh *RedisHub) listen(ctx context.Context) {
ch := rh.pubsub.Channel()
for {
select {
case <-ctx.Done():
rh.pubsub.Close()
return
case msg := <-ch:
var envelope websocket.Envelope
if err := json.Unmarshal([]byte(msg.Payload), &envelope); err != nil {
log.Printf("Error unmarshaling: %v", err)
continue
}
// Broadcast to local clients only (don't re-publish to Redis)
rh.localHub.Broadcast <- &envelope
}
}
}
func (rh *RedisHub) forwardToRedis() {
// Listen to local hub's broadcast and forward to Redis
for envelope := range rh.localHub.Broadcast {
data, err := json.Marshal(envelope)
if err != nil {
log.Printf("Marshal error: %v", err)
continue
}
if err := rh.client.Publish(context.Background(), rh.channel, data).Err(); err != nil {
log.Printf("Redis publish error: %v", err)
}
}
}
Architecture Patterns
graph TB
subgraph "Single Server"
C1[Client 1] -->|WS| S1[Server]
C2[Client 2] -->|WS| S1
C3[Client 3] -->|WS| S1
S1 --> H1[Hub]
end
subgraph "Horizontal Scaling"
C4[Client 1] -->|WS| LB[Load Balancer]
C5[Client 2] -->|WS| LB
C6[Client 3] -->|WS| LB
LB -->|Sticky Session| S2[Server 1]
LB -->|Sticky Session| S3[Server 2]
LB -->|Sticky Session| S4[Server 3]
S2 --> H2[Hub 1]
S3 --> H3[Hub 2]
S4 --> H4[Hub 3]
H2 <-->|Pub/Sub| R[Redis]
H3 <-->|Pub/Sub| R
H4 <-->|Pub/Sub| R
end
style H1 fill:#e1f5ff
style R fill:#ffe1e1
style LB fill:#fff4e1
Best Practices
1. Always Use Ping/Pong
// Send pings regularly to detect dead connections
const pingPeriod = 54 * time.Second // < pongWait
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
2. Set Read/Write Deadlines
// Prevent goroutine leaks from abandoned connections
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetWriteDeadline(time.Now().Add(writeWait))
3. Use Buffered Channels
// Prevent slow clients from blocking hub
Send: make(chan *Message, 256)
4. Handle Backpressure
// Drop messages or disconnect slow clients
select {
case client.Send <- message:
default:
// Client too slow - disconnect
hub.Unregister <- client
}
5. Implement Connection Limits
const maxConnectionsPerIP = 10
if connections[ip] >= maxConnectionsPerIP {
http.Error(w, "Too many connections", http.StatusTooManyRequests)
return
}
Common Pitfalls
1. Not Handling Goroutine Cleanup
// Bad: Goroutines leak on disconnect
go client.ReadPump()
go client.WritePump()
// Good: Proper cleanup
go func() {
defer hub.Unregister(client)
defer conn.Close()
client.ReadPump()
}()
2. Concurrent Writes
// Bad: Concurrent writes cause panic
conn.WriteMessage(websocket.TextMessage, data)
// Good: Serialize writes through channel
func (c *Client) WritePump() {
for message := range c.Send {
c.Conn.WriteMessage(websocket.TextMessage, message)
}
}
3. Not Validating Origin
// Bad: Accept all origins (security risk)
CheckOrigin: func(r *http.Request) bool { return true }
// Good: Validate origin
CheckOrigin: func(r *http.Request) bool {
origin := r.Header.Get("Origin")
return origin == "https://yourdomain.com"
}
4. Missing Error Handling
// Always check for close errors
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Printf("Error: %v", err)
}
When to Use WebSockets
✅ Use WebSockets When:
- Bidirectional communication required
- Low latency is critical (< 100ms)
- High message frequency (multiple per second)
- Real-time interactions (chat, games, collaboration)
- Client needs to push data to server
- Long-lived connections are acceptable
❌ Avoid WebSockets When:
- Simple server-to-client updates (use SSE)
- Infrequent updates (use HTTP polling)
- RESTful architecture preferred
- Operating through difficult proxies
- Need request/response semantics
- Clients are diverse (limited support)
Advantages
- Low Latency - No HTTP overhead per message
- Bidirectional - Full-duplex communication
- Efficient - Single persistent connection
- Binary Support - Efficient data transfer
- Real-Time - Instant message delivery
- Flexible - Custom protocols and message formats
Disadvantages
- Complex - More difficult to implement than HTTP
- Stateful - Requires connection management
- Scaling Challenges - Sticky sessions or pub/sub needed
- Proxy Issues - Some proxies don’t support WebSockets
- No Automatic Reconnection - Must implement client-side
- Debugging - Harder to debug than HTTP
- Resource Intensive - Each connection uses memory
Load Testing
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
func loadTest(url string, numClients int) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numClients; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Printf("Client %d: dial error: %v", id, err)
return
}
defer conn.Close()
// Send 100 messages
for j := 0; j < 100; j++ {
msg := fmt.Sprintf("Client %d message %d", id, j)
if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
log.Printf("Client %d: write error: %v", id, err)
return
}
time.Sleep(100 * time.Millisecond)
}
}(i)
}
wg.Wait()
duration := time.Since(start)
log.Printf("Load test completed: %d clients in %v", numClients, duration)
}
func main() {
loadTest("ws://localhost:8080/ws?token=test", 100)
}
Backend Communication
Current:
WebSockets