Backend Communication
Current: gRPC Streaming

What is gRPC Streaming?

gRPC (gRPC Remote Procedure Call) is a high-performance, open-source RPC framework that uses HTTP/2 for transport, Protocol Buffers for serialization, and provides built-in support for streaming. Unlike traditional request-response RPCs, gRPC streaming enables long-lived connections where either party can send multiple messages over time.

Key characteristics:

  • Three streaming modes - Server, client, and bidirectional streaming
  • HTTP/2 based - Multiplexing, flow control, header compression
  • Protocol Buffers - Efficient binary serialization with strong typing
  • Code generation - Auto-generated client/server code from .proto files
  • Built-in features - Deadlines, cancellation, authentication, load balancing

Streaming Modes

graph TB subgraph "Unary RPC (Traditional)" C1[Client] -->|Single Request| S1[Server] S1 -->|Single Response| C1 end subgraph "Server Streaming" C2[Client] -->|Single Request| S2[Server] S2 -->|Stream Response 1| C2 S2 -->|Stream Response 2| C2 S2 -->|Stream Response N| C2 end subgraph "Client Streaming" C3[Client] -->|Stream Request 1| S3[Server] C3 -->|Stream Request 2| S3 C3 -->|Stream Request N| S3 S3 -->|Single Response| C3 end subgraph "Bidirectional Streaming" C4[Client] <-->|Stream Messages| S4[Server] C4 <-->|Both Ways| S4 end style C1 fill:#e1f5ff style C2 fill:#ffe1f5 style C3 fill:#f5ffe1 style C4 fill:#fff4e1

Real-World Use Cases

Server Streaming:

  • Live metrics/telemetry - Continuous sensor data, log streaming
  • News feeds - Real-time updates, stock prices
  • Progress tracking - Long-running job status updates
  • Database change feeds - CDC (Change Data Capture) streams

Client Streaming:

  • File uploads - Chunked file transfer with single response
  • Telemetry ingestion - Batching metrics from clients
  • Bulk data import - Streaming large datasets
  • Audio/video upload - Media streaming to server

Bidirectional Streaming:

  • Chat applications - Real-time messaging
  • Multiplayer games - Game state synchronization
  • Collaborative editing - Operational transforms
  • Live translation - Speech-to-text with real-time response

Implementation in Go

Project Structure

grpc-streaming/
├── proto/
│   └── chat.proto
├── server/
│   └── main.go
├── client/
│   └── main.go
├── go.mod
├── go.sum
└── Makefile

1. Protocol Buffer Definition

proto/chat.proto

syntax = "proto3";

package chat;

option go_package = "grpc-streaming/proto/chat";

import "google/protobuf/timestamp.proto";

// ChatService provides various streaming patterns
service ChatService {
  // Unary RPC - traditional request/response
  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);

  // Server streaming - client subscribes to messages
  rpc SubscribeMessages(SubscribeRequest) returns (stream Message);

  // Client streaming - client sends multiple messages, server responds once
  rpc UploadMessages(stream Message) returns (UploadResponse);

  // Bidirectional streaming - full chat
  rpc Chat(stream Message) returns (stream Message);

  // Server streaming - metrics/telemetry
  rpc StreamMetrics(MetricsRequest) returns (stream Metric);
}

message SendMessageRequest {
  string user = 1;
  string content = 2;
  string room = 3;
}

message SendMessageResponse {
  string message_id = 1;
  google.protobuf.Timestamp timestamp = 2;
}

message SubscribeRequest {
  string room = 1;
  string user = 2;
}

message Message {
  string id = 1;
  string user = 2;
  string content = 3;
  string room = 4;
  google.protobuf.Timestamp timestamp = 5;
  MessageType type = 6;
}

enum MessageType {
  CHAT = 0;
  JOIN = 1;
  LEAVE = 2;
  SYSTEM = 3;
}

message UploadResponse {
  int32 message_count = 1;
  bool success = 2;
}

message MetricsRequest {
  string instance_id = 1;
  int32 interval_seconds = 2;
}

message Metric {
  string name = 1;
  double value = 2;
  google.protobuf.Timestamp timestamp = 3;
  map<string, string> labels = 4;
}

2. Generate Go Code

Makefile

.PHONY: proto clean

proto:
	protoc --go_out=. --go_opt=paths=source_relative \
	       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
	       proto/chat.proto

clean:
	rm -f proto/*.pb.go

deps:
	go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
	go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

go.mod

module grpc-streaming

go 1.21

require (
    google.golang.org/grpc v1.61.0
    google.golang.org/protobuf v1.32.0
)

Run code generation:

make deps
make proto

3. Server Implementation

server/main.go

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"

    pb "grpc-streaming/proto/chat"
)

type chatServer struct {
    pb.UnimplementedChatServiceServer
    mu          sync.RWMutex
    subscribers map[string][]chan *pb.Message
    messageID   int64
}

func newChatServer() *chatServer {
    return &chatServer{
        subscribers: make(map[string][]chan *pb.Message),
    }
}

// Unary RPC - traditional request/response
func (s *chatServer) SendMessage(ctx context.Context, req *pb.SendMessageRequest) (*pb.SendMessageResponse, error) {
    log.Printf("Unary RPC: SendMessage from %s: %s", req.User, req.Content)

    // Generate message ID
    s.mu.Lock()
    s.messageID++
    msgID := fmt.Sprintf("msg_%d", s.messageID)
    s.mu.Unlock()

    // Create message
    msg := &pb.Message{
        Id:        msgID,
        User:      req.User,
        Content:   req.Content,
        Room:      req.Room,
        Timestamp: timestamppb.Now(),
        Type:      pb.MessageType_CHAT,
    }

    // Broadcast to subscribers
    s.broadcast(req.Room, msg)

    return &pb.SendMessageResponse{
        MessageId: msgID,
        Timestamp: msg.Timestamp,
    }, nil
}

// Server Streaming - client subscribes to room messages
func (s *chatServer) SubscribeMessages(req *pb.SubscribeRequest, stream pb.ChatService_SubscribeMessagesServer) error {
    log.Printf("Server Streaming: %s subscribed to room %s", req.User, req.Room)

    // Create channel for this subscriber
    ch := make(chan *pb.Message, 100)

    // Register subscriber
    s.mu.Lock()
    s.subscribers[req.Room] = append(s.subscribers[req.Room], ch)
    s.mu.Unlock()

    // Cleanup on disconnect
    defer func() {
        s.mu.Lock()
        subs := s.subscribers[req.Room]
        for i, sub := range subs {
            if sub == ch {
                s.subscribers[req.Room] = append(subs[:i], subs[i+1:]...)
                break
            }
        }
        close(ch)
        s.mu.Unlock()
        log.Printf("Server Streaming: %s unsubscribed from room %s", req.User, req.Room)
    }()

    // Send welcome message
    welcome := &pb.Message{
        Id:        "system",
        User:      "System",
        Content:   fmt.Sprintf("Welcome to room %s, %s!", req.Room, req.User),
        Room:      req.Room,
        Timestamp: timestamppb.Now(),
        Type:      pb.MessageType_SYSTEM,
    }

    if err := stream.Send(welcome); err != nil {
        return err
    }

    // Stream messages to client
    for {
        select {
        case <-stream.Context().Done():
            return stream.Context().Err()
        case msg, ok := <-ch:
            if !ok {
                return nil
            }
            if err := stream.Send(msg); err != nil {
                return err
            }
        }
    }
}

// Client Streaming - client sends multiple messages, server responds once
func (s *chatServer) UploadMessages(stream pb.ChatService_UploadMessagesServer) error {
    log.Printf("Client Streaming: UploadMessages started")

    count := 0
    startTime := time.Now()

    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            // Client finished sending
            duration := time.Since(startTime)
            log.Printf("Client Streaming: Received %d messages in %v", count, duration)

            return stream.SendAndClose(&pb.UploadResponse{
                MessageCount: int32(count),
                Success:      true,
            })
        }
        if err != nil {
            return status.Errorf(codes.Internal, "receive error: %v", err)
        }

        log.Printf("Received upload: %s from %s", msg.Content, msg.User)
        count++

        // Broadcast message to room
        s.broadcast(msg.Room, msg)
    }
}

// Bidirectional Streaming - full chat
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
    log.Printf("Bidirectional Streaming: Chat session started")

    // Channel to receive client messages
    errChan := make(chan error, 1)

    // Goroutine to read from client
    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                log.Printf("Bidirectional Streaming: Client closed stream")
                errChan <- nil
                return
            }
            if err != nil {
                log.Printf("Bidirectional Streaming: Receive error: %v", err)
                errChan <- err
                return
            }

            log.Printf("Bidirectional: Received from %s: %s", msg.User, msg.Content)

            // Echo back with server timestamp
            response := &pb.Message{
                Id:        fmt.Sprintf("echo_%s", msg.Id),
                User:      "Server",
                Content:   fmt.Sprintf("Echo: %s", msg.Content),
                Room:      msg.Room,
                Timestamp: timestamppb.Now(),
                Type:      pb.MessageType_SYSTEM,
            }

            if err := stream.Send(response); err != nil {
                log.Printf("Bidirectional Streaming: Send error: %v", err)
                errChan <- err
                return
            }

            // Also broadcast to subscribers
            s.broadcast(msg.Room, msg)
        }
    }()

    // Send periodic server messages
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            log.Printf("Bidirectional Streaming: Context cancelled")
            return stream.Context().Err()

        case err := <-errChan:
            return err

        case <-ticker.C:
            serverMsg := &pb.Message{
                Id:        fmt.Sprintf("server_%d", time.Now().Unix()),
                User:      "Server",
                Content:   "Server is alive!",
                Timestamp: timestamppb.Now(),
                Type:      pb.MessageType_SYSTEM,
            }

            if err := stream.Send(serverMsg); err != nil {
                log.Printf("Bidirectional Streaming: Ticker send error: %v", err)
                return err
            }
        }
    }
}

// Server Streaming - metrics/telemetry
func (s *chatServer) StreamMetrics(req *pb.MetricsRequest, stream pb.ChatService_StreamMetricsServer) error {
    log.Printf("Server Streaming: StreamMetrics for instance %s", req.InstanceId)

    interval := time.Duration(req.IntervalSeconds) * time.Second
    if interval == 0 {
        interval = 1 * time.Second
    }

    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            log.Printf("Server Streaming: Metrics stream closed for %s", req.InstanceId)
            return stream.Context().Err()

        case <-ticker.C:
            // Generate sample metrics
            metrics := []*pb.Metric{
                {
                    Name:      "cpu_usage",
                    Value:     rand.Float64() * 100,
                    Timestamp: timestamppb.Now(),
                    Labels: map[string]string{
                        "instance": req.InstanceId,
                        "core":     "0",
                    },
                },
                {
                    Name:      "memory_usage",
                    Value:     rand.Float64() * 8192,
                    Timestamp: timestamppb.Now(),
                    Labels: map[string]string{
                        "instance": req.InstanceId,
                        "type":     "resident",
                    },
                },
                {
                    Name:      "requests_per_second",
                    Value:     rand.Float64() * 1000,
                    Timestamp: timestamppb.Now(),
                    Labels: map[string]string{
                        "instance": req.InstanceId,
                        "endpoint": "/api",
                    },
                },
            }

            // Send all metrics
            for _, metric := range metrics {
                if err := stream.Send(metric); err != nil {
                    return err
                }
            }
        }
    }
}

// Helper: broadcast message to room subscribers
func (s *chatServer) broadcast(room string, msg *pb.Message) {
    s.mu.RLock()
    subs := s.subscribers[room]
    s.mu.RUnlock()

    for _, ch := range subs {
        select {
        case ch <- msg:
        default:
            log.Printf("Subscriber channel full, dropping message")
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    // Create gRPC server
    grpcServer := grpc.NewServer(
        grpc.MaxConcurrentStreams(1000),
        grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
        grpc.MaxSendMsgSize(10 * 1024 * 1024), // 10MB
    )

    // Register service
    pb.RegisterChatServiceServer(grpcServer, newChatServer())

    log.Printf("gRPC server listening on :50051")
    log.Printf("Streaming modes available:")
    log.Printf("  - Unary: SendMessage")
    log.Printf("  - Server Streaming: SubscribeMessages, StreamMetrics")
    log.Printf("  - Client Streaming: UploadMessages")
    log.Printf("  - Bidirectional: Chat")

    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

4. Client Implementation

client/main.go

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/protobuf/types/known/timestamppb"

    pb "grpc-streaming/proto/chat"
)

func main() {
    // Connect to server
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithBlock(),
    )
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewChatServiceClient(conn)

    // Example 1: Unary RPC
    fmt.Println("\n=== Unary RPC Example ===")
    unaryExample(client)

    // Example 2: Server Streaming
    fmt.Println("\n=== Server Streaming Example ===")
    go serverStreamingExample(client)

    // Example 3: Client Streaming
    fmt.Println("\n=== Client Streaming Example ===")
    clientStreamingExample(client)

    // Example 4: Bidirectional Streaming
    fmt.Println("\n=== Bidirectional Streaming Example ===")
    go bidirectionalExample(client)

    // Example 5: Metrics Streaming
    fmt.Println("\n=== Metrics Streaming Example ===")
    go metricsExample(client)

    // Keep running
    select {}
}

func unaryExample(client pb.ChatServiceClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    resp, err := client.SendMessage(ctx, &pb.SendMessageRequest{
        User:    "Alice",
        Content: "Hello from unary RPC!",
        Room:    "general",
    })

    if err != nil {
        log.Printf("Unary error: %v", err)
        return
    }

    log.Printf("Unary response: ID=%s, Time=%v", resp.MessageId, resp.Timestamp.AsTime())
}

func serverStreamingExample(client pb.ChatServiceClient) {
    ctx := context.Background()

    stream, err := client.SubscribeMessages(ctx, &pb.SubscribeRequest{
        Room: "general",
        User: "Alice",
    })

    if err != nil {
        log.Printf("Server streaming error: %v", err)
        return
    }

    log.Printf("Subscribed to room 'general'")

    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            log.Printf("Server closed stream")
            break
        }
        if err != nil {
            log.Printf("Receive error: %v", err)
            break
        }

        log.Printf("[%s] %s: %s", msg.Room, msg.User, msg.Content)
    }
}

func clientStreamingExample(client pb.ChatServiceClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.UploadMessages(ctx)
    if err != nil {
        log.Printf("Client streaming error: %v", err)
        return
    }

    // Send multiple messages
    messages := []string{
        "First batch message",
        "Second batch message",
        "Third batch message",
        "Fourth batch message",
        "Fifth batch message",
    }

    for i, content := range messages {
        msg := &pb.Message{
            Id:        fmt.Sprintf("batch_%d", i),
            User:      "Bob",
            Content:   content,
            Room:      "general",
            Timestamp: timestamppb.Now(),
            Type:      pb.MessageType_CHAT,
        }

        if err := stream.Send(msg); err != nil {
            log.Printf("Send error: %v", err)
            return
        }

        log.Printf("Sent batch message %d", i+1)
        time.Sleep(200 * time.Millisecond)
    }

    // Close and receive response
    resp, err := stream.CloseAndRecv()
    if err != nil {
        log.Printf("Close error: %v", err)
        return
    }

    log.Printf("Upload complete: %d messages processed, success=%v", resp.MessageCount, resp.Success)
}

func bidirectionalExample(client pb.ChatServiceClient) {
    ctx := context.Background()

    stream, err := client.Chat(ctx)
    if err != nil {
        log.Printf("Bidirectional error: %v", err)
        return
    }

    // Goroutine to receive messages
    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                log.Printf("Server closed bidirectional stream")
                return
            }
            if err != nil {
                log.Printf("Bidirectional receive error: %v", err)
                return
            }

            log.Printf("[Bidirectional] %s: %s", msg.User, msg.Content)
        }
    }()

    // Send messages periodically
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    counter := 0
    for range ticker.C {
        counter++
        msg := &pb.Message{
            Id:        fmt.Sprintf("client_%d", counter),
            User:      "Charlie",
            Content:   fmt.Sprintf("Bidirectional message #%d", counter),
            Room:      "general",
            Timestamp: timestamppb.Now(),
            Type:      pb.MessageType_CHAT,
        }

        if err := stream.Send(msg); err != nil {
            log.Printf("Bidirectional send error: %v", err)
            return
        }

        log.Printf("Sent bidirectional message #%d", counter)

        if counter >= 10 {
            stream.CloseSend()
            return
        }
    }
}

func metricsExample(client pb.ChatServiceClient) {
    ctx := context.Background()

    stream, err := client.StreamMetrics(ctx, &pb.MetricsRequest{
        InstanceId:      "server-01",
        IntervalSeconds: 2,
    })

    if err != nil {
        log.Printf("Metrics streaming error: %v", err)
        return
    }

    log.Printf("Receiving metrics stream...")

    count := 0
    for {
        metric, err := stream.Recv()
        if err == io.EOF {
            log.Printf("Metrics stream closed")
            break
        }
        if err != nil {
            log.Printf("Metrics receive error: %v", err)
            break
        }

        log.Printf("Metric: %s = %.2f (labels: %v)", metric.Name, metric.Value, metric.Labels)
        count++

        if count >= 10 {
            break
        }
    }
}

Bridging to Browsers with gRPC-Web

Since browsers can’t make direct gRPC calls, we need gRPC-Web with a proxy.

Envoy Proxy Configuration

envoy.yaml

admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 8080 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          codec_type: auto
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route:
                  cluster: grpc_service
                  timeout: 0s
                  max_stream_duration:
                    grpc_timeout_header_max: 0s
              cors:
                allow_origin_string_match:
                - prefix: "*"
                allow_methods: GET, PUT, DELETE, POST, OPTIONS
                allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
                max_age: "1728000"
                expose_headers: custom-header-1,grpc-status,grpc-message
          http_filters:
          - name: envoy.filters.http.grpc_web
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
          - name: envoy.filters.http.cors
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
  - name: grpc_service
    connect_timeout: 0.25s
    type: logical_dns
    http2_protocol_options: {}
    lb_policy: round_robin
    load_assignment:
      cluster_name: grpc_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: localhost
                port_value: 50051

JavaScript Client

// npm install grpc-web
// npm install google-protobuf

const {ChatServiceClient} = require('./proto/chat_grpc_web_pb');
const {SubscribeRequest, SendMessageRequest} = require('./proto/chat_pb');

const client = new ChatServiceClient('http://localhost:8080');

// Server Streaming Example
const subscribeReq = new SubscribeRequest();
subscribeReq.setRoom('general');
subscribeReq.setUser('WebUser');

const stream = client.subscribeMessages(subscribeReq, {});

stream.on('data', (message) => {
  console.log(`[${message.getRoom()}] ${message.getUser()}: ${message.getContent()}`);
});

stream.on('status', (status) => {
  console.log('Stream status:', status);
});

stream.on('end', () => {
  console.log('Stream ended');
});

// Unary RPC Example
const sendReq = new SendMessageRequest();
sendReq.setUser('WebUser');
sendReq.setContent('Hello from browser!');
sendReq.setRoom('general');

client.sendMessage(sendReq, {}, (err, response) => {
  if (err) {
    console.error('Error:', err);
    return;
  }
  console.log('Message sent:', response.getMessageId());
});

Best Practices

1. Context and Timeouts

// Always set deadlines
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.SendMessage(ctx, req)

2. Graceful Stream Shutdown

func (s *chatServer) SubscribeMessages(req *pb.SubscribeRequest, stream pb.ChatService_SubscribeMessagesServer) error {
    ch := make(chan *pb.Message, 100)

    defer func() {
        // Cleanup on exit
        s.unregisterSubscriber(req.Room, ch)
        close(ch)
    }()

    for {
        select {
        case <-stream.Context().Done():
            return stream.Context().Err()
        case msg := <-ch:
            if err := stream.Send(msg); err != nil {
                return err
            }
        }
    }
}

3. Error Handling

import "google.golang.org/grpc/status"

// Return proper gRPC error codes
if user == "" {
    return nil, status.Error(codes.InvalidArgument, "user is required")
}

if !authorized {
    return nil, status.Error(codes.PermissionDenied, "unauthorized")
}

// Client: extract error details
if err != nil {
    if st, ok := status.FromError(err); ok {
        log.Printf("Code: %s, Message: %s", st.Code(), st.Message())
    }
}

4. Backpressure Handling

// Use buffered channels and handle full buffers
select {
case ch <- msg:
    // Sent successfully
default:
    // Channel full - either drop message or disconnect client
    return status.Error(codes.ResourceExhausted, "client too slow")
}

5. Connection Pooling

// Reuse connections
conn, err := grpc.Dial(
    "localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(10*1024*1024),
        grpc.MaxCallSendMsgSize(10*1024*1024),
    ),
)

Common Pitfalls

1. Not Handling Context Cancellation

// Bad: ignores context cancellation
for {
    msg := <-ch
    stream.Send(msg)
}

// Good: monitors context
for {
    select {
    case <-stream.Context().Done():
        return stream.Context().Err()
    case msg := <-ch:
        stream.Send(msg)
    }
}

2. Forgetting to Close Client Streams

// Bad: stream never closes
stream, _ := client.UploadMessages(ctx)
for _, msg := range messages {
    stream.Send(msg)
}
// Stream left open!

// Good: close and receive response
stream, _ := client.UploadMessages(ctx)
for _, msg := range messages {
    stream.Send(msg)
}
resp, err := stream.CloseAndRecv()

3. Not Setting Message Size Limits

// Set limits to prevent memory exhaustion
grpc.NewServer(
    grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
    grpc.MaxSendMsgSize(10 * 1024 * 1024),
    grpc.MaxConcurrentStreams(1000),
)

4. Blocking in Stream Handlers

// Bad: blocks stream
func (s *server) Chat(stream pb.Service_ChatServer) error {
    msg, _ := stream.Recv()
    time.Sleep(10 * time.Second) // Blocks entire stream!
    stream.Send(response)
}

// Good: use goroutines for processing
func (s *server) Chat(stream pb.Service_ChatServer) error {
    msg, _ := stream.Recv()
    go processAsync(msg) // Don't block stream
    stream.Send(response)
}

When to Use gRPC Streaming

✅ Use gRPC Streaming When:

  • Microservices communication - Service-to-service RPC
  • High throughput required - Binary protocol is efficient
  • Strong typing needed - Protocol Buffers provide schemas
  • Multiple languages - gRPC has broad language support
  • HTTP/2 infrastructure - Already using HTTP/2
  • Long-lived streams - Telemetry, logs, metrics
  • Bidirectional flows - Both parties need to stream

❌ Avoid gRPC Streaming When:

  • Browser-first applications - Requires gRPC-Web proxy
  • Simple REST APIs - HTTP/REST is simpler
  • Wide client support needed - Not all clients support gRPC
  • Debugging priority - Binary protocol harder to debug
  • Human-readable protocols - Protobuf is binary

Advantages

  1. High Performance - Binary protocol, HTTP/2 multiplexing
  2. Type Safety - Strong typing with Protocol Buffers
  3. Code Generation - Auto-generated client/server code
  4. Streaming Support - First-class streaming in all modes
  5. Language Agnostic - Broad language support
  6. Built-in Features - Auth, load balancing, retries
  7. Efficient - Compact binary serialization

Disadvantages

  1. Complexity - More complex than REST
  2. Binary Protocol - Harder to debug than JSON
  3. Browser Support - Requires gRPC-Web proxy
  4. Learning Curve - Protocol Buffers and gRPC concepts
  5. Tooling - Requires protoc compiler setup
  6. Limited Proxy Support - Some proxies don’t handle HTTP/2 well

Performance Comparison

Protocol Latency Throughput Message Size CPU Usage
gRPC Very Low Very High Small (binary) Low
WebSockets Low High Medium (text/binary) Medium
SSE Medium Medium Medium (text) Medium
REST/JSON Medium Low Large (JSON) High

Testing

package main

import (
    "context"
    "testing"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/test/bufconn"
    pb "grpc-streaming/proto/chat"
)

func setupTestServer(t *testing.T) (pb.ChatServiceClient, func()) {
    buffer := 1024 * 1024
    listener := bufconn.Listen(buffer)

    server := grpc.NewServer()
    pb.RegisterChatServiceServer(server, newChatServer())

    go func() {
        if err := server.Serve(listener); err != nil {
            t.Fatalf("Server error: %v", err)
        }
    }()

    conn, err := grpc.DialContext(context.Background(), "",
        grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
            return listener.Dial()
        }),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        t.Fatalf("Failed to dial: %v", err)
    }

    client := pb.NewChatServiceClient(conn)

    cleanup := func() {
        conn.Close()
        server.Stop()
        listener.Close()
    }

    return client, cleanup
}

func TestUnaryRPC(t *testing.T) {
    client, cleanup := setupTestServer(t)
    defer cleanup()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    resp, err := client.SendMessage(ctx, &pb.SendMessageRequest{
        User:    "TestUser",
        Content: "Test message",
        Room:    "test",
    })

    if err != nil {
        t.Fatalf("SendMessage failed: %v", err)
    }

    if resp.MessageId == "" {
        t.Error("Expected message ID")
    }
}

func TestServerStreaming(t *testing.T) {
    client, cleanup := setupTestServer(t)
    defer cleanup()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.SubscribeMessages(ctx, &pb.SubscribeRequest{
        Room: "test",
        User: "TestUser",
    })

    if err != nil {
        t.Fatalf("SubscribeMessages failed: %v", err)
    }

    // Should receive welcome message
    msg, err := stream.Recv()
    if err != nil {
        t.Fatalf("Receive failed: %v", err)
    }

    if msg.Type != pb.MessageType_SYSTEM {
        t.Errorf("Expected SYSTEM message, got %v", msg.Type)
    }
}

Backend Communication
Current: gRPC Streaming