Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool →


What is the Request/Response Pattern?

The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation.

Key Components:

  • Request: Contains data and a response channel
  • Response: Contains result data and/or error information
  • Requester: Sends request and waits for response
  • Responder: Processes request and sends response

Real-World Use Cases

  • Database Operations: Query execution with results
  • API Gateways: Forwarding requests to microservices
  • Cache Systems: Get/Set operations with confirmation
  • File Operations: Read/Write with status feedback
  • Validation Services: Input validation with results
  • Authentication: Login requests with tokens

Basic Request/Response Implementation

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// Request represents a request with a response channel
type Request struct {
    ID       string
    Data     interface{}
    Response chan Response
}

// Response represents the response to a request
type Response struct {
    ID     string
    Result interface{}
    Error  error
}

// Server processes requests
type Server struct {
    requests chan Request
    quit     chan bool
}

// NewServer creates a new server
func NewServer() *Server {
    return &Server{
        requests: make(chan Request),
        quit:     make(chan bool),
    }
}

// Start begins processing requests
func (s *Server) Start() {
    go func() {
        for {
            select {
            case req := <-s.requests:
                s.processRequest(req)
            case <-s.quit:
                return
            }
        }
    }()
}

// processRequest handles a single request
func (s *Server) processRequest(req Request) {
    // Simulate processing time
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    
    // Process the request (example: double the number)
    var response Response
    response.ID = req.ID
    
    if num, ok := req.Data.(int); ok {
        response.Result = num * 2
    } else {
        response.Error = fmt.Errorf("invalid data type")
    }
    
    // Send response back
    req.Response <- response
}

// SendRequest sends a request and waits for response
func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) {
    responseChan := make(chan Response, 1)
    
    request := Request{
        ID:       id,
        Data:     data,
        Response: responseChan,
    }
    
    s.requests <- request
    
    // Wait for response
    response := <-responseChan
    return response.Result, response.Error
}

// Stop shuts down the server
func (s *Server) Stop() {
    close(s.quit)
}

func main() {
    server := NewServer()
    server.Start()
    defer server.Stop()
    
    // Send multiple requests
    for i := 1; i <= 5; i++ {
        result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10)
        if err != nil {
            fmt.Printf("Request %d failed: %v\n", i, err)
        } else {
            fmt.Printf("Request %d result: %v\n", i, result)
        }
    }
}

Request/Response with Timeout

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

// TimedRequest includes context for timeout handling
type TimedRequest struct {
    ID       string
    Data     interface{}
    Response chan TimedResponse
    Context  context.Context
}

// TimedResponse includes timing information
type TimedResponse struct {
    ID        string
    Result    interface{}
    Error     error
    Duration  time.Duration
    Timestamp time.Time
}

// TimedServer processes requests with timeout support
type TimedServer struct {
    requests chan TimedRequest
    quit     chan bool
}

func NewTimedServer() *TimedServer {
    return &TimedServer{
        requests: make(chan TimedRequest, 10),
        quit:     make(chan bool),
    }
}

func (ts *TimedServer) Start() {
    go func() {
        for {
            select {
            case req := <-ts.requests:
                go ts.processTimedRequest(req)
            case <-ts.quit:
                return
            }
        }
    }()
}

func (ts *TimedServer) processTimedRequest(req TimedRequest) {
    start := time.Now()
    
    // Check if context is already cancelled
    select {
    case <-req.Context.Done():
        ts.sendResponse(req, nil, req.Context.Err(), start)
        return
    default:
    }
    
    // Simulate work with random duration
    workDuration := time.Duration(rand.Intn(200)) * time.Millisecond
    
    select {
    case <-time.After(workDuration):
        // Work completed
        if num, ok := req.Data.(int); ok {
            ts.sendResponse(req, num*2, nil, start)
        } else {
            ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start)
        }
    case <-req.Context.Done():
        // Context cancelled during work
        ts.sendResponse(req, nil, req.Context.Err(), start)
    }
}

func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) {
    response := TimedResponse{
        ID:        req.ID,
        Result:    result,
        Error:     err,
        Duration:  time.Since(start),
        Timestamp: time.Now(),
    }
    
    select {
    case req.Response <- response:
    case <-req.Context.Done():
        // Client no longer waiting
    }
}

// SendRequestWithTimeout sends a request with a timeout
func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    responseChan := make(chan TimedResponse, 1)
    
    request := TimedRequest{
        ID:       id,
        Data:     data,
        Response: responseChan,
        Context:  ctx,
    }
    
    select {
    case ts.requests <- request:
    case <-ctx.Done():
        return nil, ctx.Err()
    }
    
    select {
    case response := <-responseChan:
        fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration)
        return response.Result, response.Error
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (ts *TimedServer) Stop() {
    close(ts.quit)
}

func main() {
    server := NewTimedServer()
    server.Start()
    defer server.Stop()
    
    // Send requests with different timeouts
    requests := []struct {
        id      string
        data    int
        timeout time.Duration
    }{
        {"fast", 10, 300 * time.Millisecond},
        {"medium", 20, 150 * time.Millisecond},
        {"slow", 30, 50 * time.Millisecond}, // This might timeout
    }
    
    for _, req := range requests {
        result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout)
        if err != nil {
            fmt.Printf("Request %s failed: %v\n", req.id, err)
        } else {
            fmt.Printf("Request %s result: %v\n", req.id, result)
        }
    }
}

Future/Promise Pattern

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Future represents a value that will be available in the future
type Future struct {
    mu       sync.Mutex
    done     chan struct{}
    result   interface{}
    err      error
    computed bool
}

// NewFuture creates a new future
func NewFuture() *Future {
    return &Future{
        done: make(chan struct{}),
    }
}

// Set sets the future's value
func (f *Future) Set(result interface{}, err error) {
    f.mu.Lock()
    defer f.mu.Unlock()
    
    if f.computed {
        return // Already set
    }
    
    f.result = result
    f.err = err
    f.computed = true
    close(f.done)
}

// Get waits for and returns the future's value
func (f *Future) Get() (interface{}, error) {
    <-f.done
    return f.result, f.err
}

// GetWithTimeout waits for the value with a timeout
func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) {
    select {
    case <-f.done:
        return f.result, f.err
    case <-time.After(timeout):
        return nil, fmt.Errorf("timeout waiting for future")
    }
}

// GetWithContext waits for the value with context cancellation
func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) {
    select {
    case <-f.done:
        return f.result, f.err
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

// IsReady returns true if the future has been computed
func (f *Future) IsReady() bool {
    f.mu.Lock()
    defer f.mu.Unlock()
    return f.computed
}

// AsyncService demonstrates async operations with futures
type AsyncService struct {
    workers chan struct{}
}

func NewAsyncService(maxWorkers int) *AsyncService {
    return &AsyncService{
        workers: make(chan struct{}, maxWorkers),
    }
}

// ProcessAsync starts async processing and returns a future
func (as *AsyncService) ProcessAsync(data interface{}) *Future {
    future := NewFuture()
    
    go func() {
        // Acquire worker slot
        as.workers <- struct{}{}
        defer func() { <-as.workers }()
        
        // Simulate processing
        time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
        
        // Process data
        if num, ok := data.(int); ok {
            future.Set(num*num, nil)
        } else {
            future.Set(nil, fmt.Errorf("invalid data type"))
        }
    }()
    
    return future
}

func main() {
    service := NewAsyncService(3)
    
    // Start multiple async operations
    futures := make([]*Future, 5)
    for i := 0; i < 5; i++ {
        fmt.Printf("Starting async operation %d\n", i+1)
        futures[i] = service.ProcessAsync((i + 1) * 10)
    }
    
    // Wait for all results
    fmt.Println("\nWaiting for results...")
    for i, future := range futures {
        result, err := future.Get()
        if err != nil {
            fmt.Printf("Operation %d failed: %v\n", i+1, err)
        } else {
            fmt.Printf("Operation %d result: %v\n", i+1, result)
        }
    }
    
    // Example with timeout
    fmt.Println("\nTesting timeout...")
    timeoutFuture := service.ProcessAsync(100)
    result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond)
    if err != nil {
        fmt.Printf("Timeout example failed: %v\n", err)
    } else {
        fmt.Printf("Timeout example result: %v\n", result)
    }
}

Batch Request/Response

package main

import (
    "fmt"
    "sync"
    "time"
)

// BatchRequest represents multiple requests processed together
type BatchRequest struct {
    ID       string
    Items    []interface{}
    Response chan BatchResponse
}

// BatchResponse contains results for all items in a batch
type BatchResponse struct {
    ID      string
    Results []BatchResult
    Error   error
}

// BatchResult represents the result of processing one item
type BatchResult struct {
    Index  int
    Result interface{}
    Error  error
}

// BatchProcessor processes requests in batches for efficiency
type BatchProcessor struct {
    requests    chan BatchRequest
    batchSize   int
    batchWindow time.Duration
    quit        chan bool
}

func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor {
    return &BatchProcessor{
        requests:    make(chan BatchRequest, 100),
        batchSize:   batchSize,
        batchWindow: batchWindow,
        quit:        make(chan bool),
    }
}

func (bp *BatchProcessor) Start() {
    go func() {
        batch := make([]BatchRequest, 0, bp.batchSize)
        timer := time.NewTimer(bp.batchWindow)
        timer.Stop()
        
        for {
            select {
            case req := <-bp.requests:
                batch = append(batch, req)
                
                if len(batch) == 1 {
                    timer.Reset(bp.batchWindow)
                }
                
                if len(batch) >= bp.batchSize {
                    bp.processBatch(batch)
                    batch = batch[:0]
                    timer.Stop()
                }
                
            case <-timer.C:
                if len(batch) > 0 {
                    bp.processBatch(batch)
                    batch = batch[:0]
                }
                
            case <-bp.quit:
                if len(batch) > 0 {
                    bp.processBatch(batch)
                }
                return
            }
        }
    }()
}

func (bp *BatchProcessor) processBatch(batch []BatchRequest) {
    fmt.Printf("Processing batch of %d requests\n", len(batch))
    
    var wg sync.WaitGroup
    for _, req := range batch {
        wg.Add(1)
        go func(r BatchRequest) {
            defer wg.Done()
            bp.processRequest(r)
        }(req)
    }
    wg.Wait()
}

func (bp *BatchProcessor) processRequest(req BatchRequest) {
    results := make([]BatchResult, len(req.Items))
    
    for i, item := range req.Items {
        // Simulate processing each item
        time.Sleep(10 * time.Millisecond)
        
        if num, ok := item.(int); ok {
            results[i] = BatchResult{
                Index:  i,
                Result: num * 3,
            }
        } else {
            results[i] = BatchResult{
                Index: i,
                Error: fmt.Errorf("invalid item type at index %d", i),
            }
        }
    }
    
    response := BatchResponse{
        ID:      req.ID,
        Results: results,
    }
    
    req.Response <- response
}

// SendBatchRequest sends a batch request and waits for response
func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) {
    responseChan := make(chan BatchResponse, 1)
    
    request := BatchRequest{
        ID:       id,
        Items:    items,
        Response: responseChan,
    }
    
    bp.requests <- request
    response := <-responseChan
    
    return response.Results, response.Error
}

func (bp *BatchProcessor) Stop() {
    close(bp.quit)
}

func main() {
    processor := NewBatchProcessor(3, 100*time.Millisecond)
    processor.Start()
    defer processor.Stop()
    
    // Send individual batch requests
    go func() {
        results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5})
        if err != nil {
            fmt.Printf("Batch 1 failed: %v\n", err)
            return
        }
        
        fmt.Println("Batch 1 results:")
        for _, result := range results {
            if result.Error != nil {
                fmt.Printf("  Item %d error: %v\n", result.Index, result.Error)
            } else {
                fmt.Printf("  Item %d result: %v\n", result.Index, result.Result)
            }
        }
    }()
    
    go func() {
        results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30})
        if err != nil {
            fmt.Printf("Batch 2 failed: %v\n", err)
            return
        }
        
        fmt.Println("Batch 2 results:")
        for _, result := range results {
            if result.Error != nil {
                fmt.Printf("  Item %d error: %v\n", result.Index, result.Error)
            } else {
                fmt.Printf("  Item %d result: %v\n", result.Index, result.Result)
            }
        }
    }()
    
    // Wait for processing
    time.Sleep(500 * time.Millisecond)
}

Best Practices

  1. Always Use Timeouts: Prevent indefinite blocking
  2. Handle Context Cancellation: Support graceful cancellation
  3. Buffer Response Channels: Avoid blocking responders
  4. Error Handling: Always include error information in responses
  5. Resource Cleanup: Ensure channels and goroutines are cleaned up
  6. Monitoring: Track request/response times and success rates
  7. Backpressure: Handle situations when responders are overwhelmed

Common Pitfalls

  1. Deadlocks: Not buffering response channels
  2. Goroutine Leaks: Not handling context cancellation
  3. Memory Leaks: Not closing channels properly
  4. Blocking Operations: Long-running operations without timeouts
  5. Lost Responses: Not handling channel closure

Testing Request/Response

package main

import (
    "context"
    "testing"
    "time"
)

func TestRequestResponse(t *testing.T) {
    server := NewTimedServer()
    server.Start()
    defer server.Stop()
    
    // Test successful request
    result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond)
    if err != nil {
        t.Fatalf("Request failed: %v", err)
    }
    
    if result != 84 {
        t.Errorf("Expected 84, got %v", result)
    }
    
    // Test timeout
    _, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond)
    if err == nil {
        t.Error("Expected timeout error")
    }
}

func TestFuture(t *testing.T) {
    future := NewFuture()
    
    // Test that future is not ready initially
    if future.IsReady() {
        t.Error("Future should not be ready initially")
    }
    
    // Set value in goroutine
    go func() {
        time.Sleep(50 * time.Millisecond)
        future.Set("test result", nil)
    }()
    
    // Get value
    result, err := future.Get()
    if err != nil {
        t.Fatalf("Future failed: %v", err)
    }
    
    if result != "test result" {
        t.Errorf("Expected 'test result', got %v", result)
    }
    
    // Test that future is ready after setting
    if !future.IsReady() {
        t.Error("Future should be ready after setting")
    }
}

The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation.


Next: Learn about Worker Pool Pattern for managing concurrent workers efficiently.