Backend Communication
Current:
gRPC Streaming
What is gRPC Streaming?
gRPC (gRPC Remote Procedure Call) is a high-performance, open-source RPC framework that uses HTTP/2 for transport, Protocol Buffers for serialization, and provides built-in support for streaming. Unlike traditional request-response RPCs, gRPC streaming enables long-lived connections where either party can send multiple messages over time.
Key characteristics:
- Three streaming modes - Server, client, and bidirectional streaming
- HTTP/2 based - Multiplexing, flow control, header compression
- Protocol Buffers - Efficient binary serialization with strong typing
- Code generation - Auto-generated client/server code from .proto files
- Built-in features - Deadlines, cancellation, authentication, load balancing
Streaming Modes
graph TB
subgraph "Unary RPC (Traditional)"
C1[Client] -->|Single Request| S1[Server]
S1 -->|Single Response| C1
end
subgraph "Server Streaming"
C2[Client] -->|Single Request| S2[Server]
S2 -->|Stream Response 1| C2
S2 -->|Stream Response 2| C2
S2 -->|Stream Response N| C2
end
subgraph "Client Streaming"
C3[Client] -->|Stream Request 1| S3[Server]
C3 -->|Stream Request 2| S3
C3 -->|Stream Request N| S3
S3 -->|Single Response| C3
end
subgraph "Bidirectional Streaming"
C4[Client] <-->|Stream Messages| S4[Server]
C4 <-->|Both Ways| S4
end
style C1 fill:#e1f5ff
style C2 fill:#ffe1f5
style C3 fill:#f5ffe1
style C4 fill:#fff4e1
Real-World Use Cases
Server Streaming:
- Live metrics/telemetry - Continuous sensor data, log streaming
- News feeds - Real-time updates, stock prices
- Progress tracking - Long-running job status updates
- Database change feeds - CDC (Change Data Capture) streams
Client Streaming:
- File uploads - Chunked file transfer with single response
- Telemetry ingestion - Batching metrics from clients
- Bulk data import - Streaming large datasets
- Audio/video upload - Media streaming to server
Bidirectional Streaming:
- Chat applications - Real-time messaging
- Multiplayer games - Game state synchronization
- Collaborative editing - Operational transforms
- Live translation - Speech-to-text with real-time response
Implementation in Go
Project Structure
grpc-streaming/
├── proto/
│ └── chat.proto
├── server/
│ └── main.go
├── client/
│ └── main.go
├── go.mod
├── go.sum
└── Makefile
1. Protocol Buffer Definition
proto/chat.proto
syntax = "proto3";
package chat;
option go_package = "grpc-streaming/proto/chat";
import "google/protobuf/timestamp.proto";
// ChatService provides various streaming patterns
service ChatService {
// Unary RPC - traditional request/response
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);
// Server streaming - client subscribes to messages
rpc SubscribeMessages(SubscribeRequest) returns (stream Message);
// Client streaming - client sends multiple messages, server responds once
rpc UploadMessages(stream Message) returns (UploadResponse);
// Bidirectional streaming - full chat
rpc Chat(stream Message) returns (stream Message);
// Server streaming - metrics/telemetry
rpc StreamMetrics(MetricsRequest) returns (stream Metric);
}
message SendMessageRequest {
string user = 1;
string content = 2;
string room = 3;
}
message SendMessageResponse {
string message_id = 1;
google.protobuf.Timestamp timestamp = 2;
}
message SubscribeRequest {
string room = 1;
string user = 2;
}
message Message {
string id = 1;
string user = 2;
string content = 3;
string room = 4;
google.protobuf.Timestamp timestamp = 5;
MessageType type = 6;
}
enum MessageType {
CHAT = 0;
JOIN = 1;
LEAVE = 2;
SYSTEM = 3;
}
message UploadResponse {
int32 message_count = 1;
bool success = 2;
}
message MetricsRequest {
string instance_id = 1;
int32 interval_seconds = 2;
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
map<string, string> labels = 4;
}
2. Generate Go Code
Makefile
.PHONY: proto clean
proto:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/chat.proto
clean:
rm -f proto/*.pb.go
deps:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
go.mod
module grpc-streaming
go 1.21
require (
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
)
Run code generation:
make deps
make proto
3. Server Implementation
server/main.go
package main
import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
pb "grpc-streaming/proto/chat"
)
type chatServer struct {
pb.UnimplementedChatServiceServer
mu sync.RWMutex
subscribers map[string][]chan *pb.Message
messageID int64
}
func newChatServer() *chatServer {
return &chatServer{
subscribers: make(map[string][]chan *pb.Message),
}
}
// Unary RPC - traditional request/response
func (s *chatServer) SendMessage(ctx context.Context, req *pb.SendMessageRequest) (*pb.SendMessageResponse, error) {
log.Printf("Unary RPC: SendMessage from %s: %s", req.User, req.Content)
// Generate message ID
s.mu.Lock()
s.messageID++
msgID := fmt.Sprintf("msg_%d", s.messageID)
s.mu.Unlock()
// Create message
msg := &pb.Message{
Id: msgID,
User: req.User,
Content: req.Content,
Room: req.Room,
Timestamp: timestamppb.Now(),
Type: pb.MessageType_CHAT,
}
// Broadcast to subscribers
s.broadcast(req.Room, msg)
return &pb.SendMessageResponse{
MessageId: msgID,
Timestamp: msg.Timestamp,
}, nil
}
// Server Streaming - client subscribes to room messages
func (s *chatServer) SubscribeMessages(req *pb.SubscribeRequest, stream pb.ChatService_SubscribeMessagesServer) error {
log.Printf("Server Streaming: %s subscribed to room %s", req.User, req.Room)
// Create channel for this subscriber
ch := make(chan *pb.Message, 100)
// Register subscriber
s.mu.Lock()
s.subscribers[req.Room] = append(s.subscribers[req.Room], ch)
s.mu.Unlock()
// Cleanup on disconnect
defer func() {
s.mu.Lock()
subs := s.subscribers[req.Room]
for i, sub := range subs {
if sub == ch {
s.subscribers[req.Room] = append(subs[:i], subs[i+1:]...)
break
}
}
close(ch)
s.mu.Unlock()
log.Printf("Server Streaming: %s unsubscribed from room %s", req.User, req.Room)
}()
// Send welcome message
welcome := &pb.Message{
Id: "system",
User: "System",
Content: fmt.Sprintf("Welcome to room %s, %s!", req.Room, req.User),
Room: req.Room,
Timestamp: timestamppb.Now(),
Type: pb.MessageType_SYSTEM,
}
if err := stream.Send(welcome); err != nil {
return err
}
// Stream messages to client
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case msg, ok := <-ch:
if !ok {
return nil
}
if err := stream.Send(msg); err != nil {
return err
}
}
}
}
// Client Streaming - client sends multiple messages, server responds once
func (s *chatServer) UploadMessages(stream pb.ChatService_UploadMessagesServer) error {
log.Printf("Client Streaming: UploadMessages started")
count := 0
startTime := time.Now()
for {
msg, err := stream.Recv()
if err == io.EOF {
// Client finished sending
duration := time.Since(startTime)
log.Printf("Client Streaming: Received %d messages in %v", count, duration)
return stream.SendAndClose(&pb.UploadResponse{
MessageCount: int32(count),
Success: true,
})
}
if err != nil {
return status.Errorf(codes.Internal, "receive error: %v", err)
}
log.Printf("Received upload: %s from %s", msg.Content, msg.User)
count++
// Broadcast message to room
s.broadcast(msg.Room, msg)
}
}
// Bidirectional Streaming - full chat
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
log.Printf("Bidirectional Streaming: Chat session started")
// Channel to receive client messages
errChan := make(chan error, 1)
// Goroutine to read from client
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Printf("Bidirectional Streaming: Client closed stream")
errChan <- nil
return
}
if err != nil {
log.Printf("Bidirectional Streaming: Receive error: %v", err)
errChan <- err
return
}
log.Printf("Bidirectional: Received from %s: %s", msg.User, msg.Content)
// Echo back with server timestamp
response := &pb.Message{
Id: fmt.Sprintf("echo_%s", msg.Id),
User: "Server",
Content: fmt.Sprintf("Echo: %s", msg.Content),
Room: msg.Room,
Timestamp: timestamppb.Now(),
Type: pb.MessageType_SYSTEM,
}
if err := stream.Send(response); err != nil {
log.Printf("Bidirectional Streaming: Send error: %v", err)
errChan <- err
return
}
// Also broadcast to subscribers
s.broadcast(msg.Room, msg)
}
}()
// Send periodic server messages
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
log.Printf("Bidirectional Streaming: Context cancelled")
return stream.Context().Err()
case err := <-errChan:
return err
case <-ticker.C:
serverMsg := &pb.Message{
Id: fmt.Sprintf("server_%d", time.Now().Unix()),
User: "Server",
Content: "Server is alive!",
Timestamp: timestamppb.Now(),
Type: pb.MessageType_SYSTEM,
}
if err := stream.Send(serverMsg); err != nil {
log.Printf("Bidirectional Streaming: Ticker send error: %v", err)
return err
}
}
}
}
// Server Streaming - metrics/telemetry
func (s *chatServer) StreamMetrics(req *pb.MetricsRequest, stream pb.ChatService_StreamMetricsServer) error {
log.Printf("Server Streaming: StreamMetrics for instance %s", req.InstanceId)
interval := time.Duration(req.IntervalSeconds) * time.Second
if interval == 0 {
interval = 1 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
log.Printf("Server Streaming: Metrics stream closed for %s", req.InstanceId)
return stream.Context().Err()
case <-ticker.C:
// Generate sample metrics
metrics := []*pb.Metric{
{
Name: "cpu_usage",
Value: rand.Float64() * 100,
Timestamp: timestamppb.Now(),
Labels: map[string]string{
"instance": req.InstanceId,
"core": "0",
},
},
{
Name: "memory_usage",
Value: rand.Float64() * 8192,
Timestamp: timestamppb.Now(),
Labels: map[string]string{
"instance": req.InstanceId,
"type": "resident",
},
},
{
Name: "requests_per_second",
Value: rand.Float64() * 1000,
Timestamp: timestamppb.Now(),
Labels: map[string]string{
"instance": req.InstanceId,
"endpoint": "/api",
},
},
}
// Send all metrics
for _, metric := range metrics {
if err := stream.Send(metric); err != nil {
return err
}
}
}
}
}
// Helper: broadcast message to room subscribers
func (s *chatServer) broadcast(room string, msg *pb.Message) {
s.mu.RLock()
subs := s.subscribers[room]
s.mu.RUnlock()
for _, ch := range subs {
select {
case ch <- msg:
default:
log.Printf("Subscriber channel full, dropping message")
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
// Create gRPC server
grpcServer := grpc.NewServer(
grpc.MaxConcurrentStreams(1000),
grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
grpc.MaxSendMsgSize(10 * 1024 * 1024), // 10MB
)
// Register service
pb.RegisterChatServiceServer(grpcServer, newChatServer())
log.Printf("gRPC server listening on :50051")
log.Printf("Streaming modes available:")
log.Printf(" - Unary: SendMessage")
log.Printf(" - Server Streaming: SubscribeMessages, StreamMetrics")
log.Printf(" - Client Streaming: UploadMessages")
log.Printf(" - Bidirectional: Chat")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
4. Client Implementation
client/main.go
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/timestamppb"
pb "grpc-streaming/proto/chat"
)
func main() {
// Connect to server
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewChatServiceClient(conn)
// Example 1: Unary RPC
fmt.Println("\n=== Unary RPC Example ===")
unaryExample(client)
// Example 2: Server Streaming
fmt.Println("\n=== Server Streaming Example ===")
go serverStreamingExample(client)
// Example 3: Client Streaming
fmt.Println("\n=== Client Streaming Example ===")
clientStreamingExample(client)
// Example 4: Bidirectional Streaming
fmt.Println("\n=== Bidirectional Streaming Example ===")
go bidirectionalExample(client)
// Example 5: Metrics Streaming
fmt.Println("\n=== Metrics Streaming Example ===")
go metricsExample(client)
// Keep running
select {}
}
func unaryExample(client pb.ChatServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SendMessage(ctx, &pb.SendMessageRequest{
User: "Alice",
Content: "Hello from unary RPC!",
Room: "general",
})
if err != nil {
log.Printf("Unary error: %v", err)
return
}
log.Printf("Unary response: ID=%s, Time=%v", resp.MessageId, resp.Timestamp.AsTime())
}
func serverStreamingExample(client pb.ChatServiceClient) {
ctx := context.Background()
stream, err := client.SubscribeMessages(ctx, &pb.SubscribeRequest{
Room: "general",
User: "Alice",
})
if err != nil {
log.Printf("Server streaming error: %v", err)
return
}
log.Printf("Subscribed to room 'general'")
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Printf("Server closed stream")
break
}
if err != nil {
log.Printf("Receive error: %v", err)
break
}
log.Printf("[%s] %s: %s", msg.Room, msg.User, msg.Content)
}
}
func clientStreamingExample(client pb.ChatServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.UploadMessages(ctx)
if err != nil {
log.Printf("Client streaming error: %v", err)
return
}
// Send multiple messages
messages := []string{
"First batch message",
"Second batch message",
"Third batch message",
"Fourth batch message",
"Fifth batch message",
}
for i, content := range messages {
msg := &pb.Message{
Id: fmt.Sprintf("batch_%d", i),
User: "Bob",
Content: content,
Room: "general",
Timestamp: timestamppb.Now(),
Type: pb.MessageType_CHAT,
}
if err := stream.Send(msg); err != nil {
log.Printf("Send error: %v", err)
return
}
log.Printf("Sent batch message %d", i+1)
time.Sleep(200 * time.Millisecond)
}
// Close and receive response
resp, err := stream.CloseAndRecv()
if err != nil {
log.Printf("Close error: %v", err)
return
}
log.Printf("Upload complete: %d messages processed, success=%v", resp.MessageCount, resp.Success)
}
func bidirectionalExample(client pb.ChatServiceClient) {
ctx := context.Background()
stream, err := client.Chat(ctx)
if err != nil {
log.Printf("Bidirectional error: %v", err)
return
}
// Goroutine to receive messages
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Printf("Server closed bidirectional stream")
return
}
if err != nil {
log.Printf("Bidirectional receive error: %v", err)
return
}
log.Printf("[Bidirectional] %s: %s", msg.User, msg.Content)
}
}()
// Send messages periodically
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
counter := 0
for range ticker.C {
counter++
msg := &pb.Message{
Id: fmt.Sprintf("client_%d", counter),
User: "Charlie",
Content: fmt.Sprintf("Bidirectional message #%d", counter),
Room: "general",
Timestamp: timestamppb.Now(),
Type: pb.MessageType_CHAT,
}
if err := stream.Send(msg); err != nil {
log.Printf("Bidirectional send error: %v", err)
return
}
log.Printf("Sent bidirectional message #%d", counter)
if counter >= 10 {
stream.CloseSend()
return
}
}
}
func metricsExample(client pb.ChatServiceClient) {
ctx := context.Background()
stream, err := client.StreamMetrics(ctx, &pb.MetricsRequest{
InstanceId: "server-01",
IntervalSeconds: 2,
})
if err != nil {
log.Printf("Metrics streaming error: %v", err)
return
}
log.Printf("Receiving metrics stream...")
count := 0
for {
metric, err := stream.Recv()
if err == io.EOF {
log.Printf("Metrics stream closed")
break
}
if err != nil {
log.Printf("Metrics receive error: %v", err)
break
}
log.Printf("Metric: %s = %.2f (labels: %v)", metric.Name, metric.Value, metric.Labels)
count++
if count >= 10 {
break
}
}
}
Bridging to Browsers with gRPC-Web
Since browsers can’t make direct gRPC calls, we need gRPC-Web with a proxy.
Envoy Proxy Configuration
envoy.yaml
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 8080 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: auto
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: grpc_service
timeout: 0s
max_stream_duration:
grpc_timeout_header_max: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
max_age: "1728000"
expose_headers: custom-header-1,grpc-status,grpc-message
http_filters:
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: grpc_service
connect_timeout: 0.25s
type: logical_dns
http2_protocol_options: {}
lb_policy: round_robin
load_assignment:
cluster_name: grpc_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: localhost
port_value: 50051
JavaScript Client
// npm install grpc-web
// npm install google-protobuf
const {ChatServiceClient} = require('./proto/chat_grpc_web_pb');
const {SubscribeRequest, SendMessageRequest} = require('./proto/chat_pb');
const client = new ChatServiceClient('http://localhost:8080');
// Server Streaming Example
const subscribeReq = new SubscribeRequest();
subscribeReq.setRoom('general');
subscribeReq.setUser('WebUser');
const stream = client.subscribeMessages(subscribeReq, {});
stream.on('data', (message) => {
console.log(`[${message.getRoom()}] ${message.getUser()}: ${message.getContent()}`);
});
stream.on('status', (status) => {
console.log('Stream status:', status);
});
stream.on('end', () => {
console.log('Stream ended');
});
// Unary RPC Example
const sendReq = new SendMessageRequest();
sendReq.setUser('WebUser');
sendReq.setContent('Hello from browser!');
sendReq.setRoom('general');
client.sendMessage(sendReq, {}, (err, response) => {
if (err) {
console.error('Error:', err);
return;
}
console.log('Message sent:', response.getMessageId());
});
Best Practices
1. Context and Timeouts
// Always set deadlines
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SendMessage(ctx, req)
2. Graceful Stream Shutdown
func (s *chatServer) SubscribeMessages(req *pb.SubscribeRequest, stream pb.ChatService_SubscribeMessagesServer) error {
ch := make(chan *pb.Message, 100)
defer func() {
// Cleanup on exit
s.unregisterSubscriber(req.Room, ch)
close(ch)
}()
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case msg := <-ch:
if err := stream.Send(msg); err != nil {
return err
}
}
}
}
3. Error Handling
import "google.golang.org/grpc/status"
// Return proper gRPC error codes
if user == "" {
return nil, status.Error(codes.InvalidArgument, "user is required")
}
if !authorized {
return nil, status.Error(codes.PermissionDenied, "unauthorized")
}
// Client: extract error details
if err != nil {
if st, ok := status.FromError(err); ok {
log.Printf("Code: %s, Message: %s", st.Code(), st.Message())
}
}
4. Backpressure Handling
// Use buffered channels and handle full buffers
select {
case ch <- msg:
// Sent successfully
default:
// Channel full - either drop message or disconnect client
return status.Error(codes.ResourceExhausted, "client too slow")
}
5. Connection Pooling
// Reuse connections
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024),
grpc.MaxCallSendMsgSize(10*1024*1024),
),
)
Common Pitfalls
1. Not Handling Context Cancellation
// Bad: ignores context cancellation
for {
msg := <-ch
stream.Send(msg)
}
// Good: monitors context
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case msg := <-ch:
stream.Send(msg)
}
}
2. Forgetting to Close Client Streams
// Bad: stream never closes
stream, _ := client.UploadMessages(ctx)
for _, msg := range messages {
stream.Send(msg)
}
// Stream left open!
// Good: close and receive response
stream, _ := client.UploadMessages(ctx)
for _, msg := range messages {
stream.Send(msg)
}
resp, err := stream.CloseAndRecv()
3. Not Setting Message Size Limits
// Set limits to prevent memory exhaustion
grpc.NewServer(
grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
grpc.MaxSendMsgSize(10 * 1024 * 1024),
grpc.MaxConcurrentStreams(1000),
)
4. Blocking in Stream Handlers
// Bad: blocks stream
func (s *server) Chat(stream pb.Service_ChatServer) error {
msg, _ := stream.Recv()
time.Sleep(10 * time.Second) // Blocks entire stream!
stream.Send(response)
}
// Good: use goroutines for processing
func (s *server) Chat(stream pb.Service_ChatServer) error {
msg, _ := stream.Recv()
go processAsync(msg) // Don't block stream
stream.Send(response)
}
When to Use gRPC Streaming
✅ Use gRPC Streaming When:
- Microservices communication - Service-to-service RPC
- High throughput required - Binary protocol is efficient
- Strong typing needed - Protocol Buffers provide schemas
- Multiple languages - gRPC has broad language support
- HTTP/2 infrastructure - Already using HTTP/2
- Long-lived streams - Telemetry, logs, metrics
- Bidirectional flows - Both parties need to stream
❌ Avoid gRPC Streaming When:
- Browser-first applications - Requires gRPC-Web proxy
- Simple REST APIs - HTTP/REST is simpler
- Wide client support needed - Not all clients support gRPC
- Debugging priority - Binary protocol harder to debug
- Human-readable protocols - Protobuf is binary
Advantages
- High Performance - Binary protocol, HTTP/2 multiplexing
- Type Safety - Strong typing with Protocol Buffers
- Code Generation - Auto-generated client/server code
- Streaming Support - First-class streaming in all modes
- Language Agnostic - Broad language support
- Built-in Features - Auth, load balancing, retries
- Efficient - Compact binary serialization
Disadvantages
- Complexity - More complex than REST
- Binary Protocol - Harder to debug than JSON
- Browser Support - Requires gRPC-Web proxy
- Learning Curve - Protocol Buffers and gRPC concepts
- Tooling - Requires protoc compiler setup
- Limited Proxy Support - Some proxies don’t handle HTTP/2 well
Performance Comparison
| Protocol | Latency | Throughput | Message Size | CPU Usage |
|---|---|---|---|---|
| gRPC | Very Low | Very High | Small (binary) | Low |
| WebSockets | Low | High | Medium (text/binary) | Medium |
| SSE | Medium | Medium | Medium (text) | Medium |
| REST/JSON | Medium | Low | Large (JSON) | High |
Testing
package main
import (
"context"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
pb "grpc-streaming/proto/chat"
)
func setupTestServer(t *testing.T) (pb.ChatServiceClient, func()) {
buffer := 1024 * 1024
listener := bufconn.Listen(buffer)
server := grpc.NewServer()
pb.RegisterChatServiceServer(server, newChatServer())
go func() {
if err := server.Serve(listener); err != nil {
t.Fatalf("Server error: %v", err)
}
}()
conn, err := grpc.DialContext(context.Background(), "",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}
client := pb.NewChatServiceClient(conn)
cleanup := func() {
conn.Close()
server.Stop()
listener.Close()
}
return client, cleanup
}
func TestUnaryRPC(t *testing.T) {
client, cleanup := setupTestServer(t)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SendMessage(ctx, &pb.SendMessageRequest{
User: "TestUser",
Content: "Test message",
Room: "test",
})
if err != nil {
t.Fatalf("SendMessage failed: %v", err)
}
if resp.MessageId == "" {
t.Error("Expected message ID")
}
}
func TestServerStreaming(t *testing.T) {
client, cleanup := setupTestServer(t)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.SubscribeMessages(ctx, &pb.SubscribeRequest{
Room: "test",
User: "TestUser",
})
if err != nil {
t.Fatalf("SubscribeMessages failed: %v", err)
}
// Should receive welcome message
msg, err := stream.Recv()
if err != nil {
t.Fatalf("Receive failed: %v", err)
}
if msg.Type != pb.MessageType_SYSTEM {
t.Errorf("Expected SYSTEM message, got %v", msg.Type)
}
}
Backend Communication
Current:
gRPC Streaming