Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool →
What is the Request/Response Pattern?
The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation.
Key Components:
- Request: Contains data and a response channel
- Response: Contains result data and/or error information
- Requester: Sends request and waits for response
- Responder: Processes request and sends response
Real-World Use Cases
- Database Operations: Query execution with results
- API Gateways: Forwarding requests to microservices
- Cache Systems: Get/Set operations with confirmation
- File Operations: Read/Write with status feedback
- Validation Services: Input validation with results
- Authentication: Login requests with tokens
Basic Request/Response Implementation
package main
import (
"fmt"
"math/rand"
"time"
)
// Request represents a request with a response channel
type Request struct {
ID string
Data interface{}
Response chan Response
}
// Response represents the response to a request
type Response struct {
ID string
Result interface{}
Error error
}
// Server processes requests
type Server struct {
requests chan Request
quit chan bool
}
// NewServer creates a new server
func NewServer() *Server {
return &Server{
requests: make(chan Request),
quit: make(chan bool),
}
}
// Start begins processing requests
func (s *Server) Start() {
go func() {
for {
select {
case req := <-s.requests:
s.processRequest(req)
case <-s.quit:
return
}
}
}()
}
// processRequest handles a single request
func (s *Server) processRequest(req Request) {
// Simulate processing time
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// Process the request (example: double the number)
var response Response
response.ID = req.ID
if num, ok := req.Data.(int); ok {
response.Result = num * 2
} else {
response.Error = fmt.Errorf("invalid data type")
}
// Send response back
req.Response <- response
}
// SendRequest sends a request and waits for response
func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) {
responseChan := make(chan Response, 1)
request := Request{
ID: id,
Data: data,
Response: responseChan,
}
s.requests <- request
// Wait for response
response := <-responseChan
return response.Result, response.Error
}
// Stop shuts down the server
func (s *Server) Stop() {
close(s.quit)
}
func main() {
server := NewServer()
server.Start()
defer server.Stop()
// Send multiple requests
for i := 1; i <= 5; i++ {
result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10)
if err != nil {
fmt.Printf("Request %d failed: %v\n", i, err)
} else {
fmt.Printf("Request %d result: %v\n", i, result)
}
}
}
Request/Response with Timeout
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// TimedRequest includes context for timeout handling
type TimedRequest struct {
ID string
Data interface{}
Response chan TimedResponse
Context context.Context
}
// TimedResponse includes timing information
type TimedResponse struct {
ID string
Result interface{}
Error error
Duration time.Duration
Timestamp time.Time
}
// TimedServer processes requests with timeout support
type TimedServer struct {
requests chan TimedRequest
quit chan bool
}
func NewTimedServer() *TimedServer {
return &TimedServer{
requests: make(chan TimedRequest, 10),
quit: make(chan bool),
}
}
func (ts *TimedServer) Start() {
go func() {
for {
select {
case req := <-ts.requests:
go ts.processTimedRequest(req)
case <-ts.quit:
return
}
}
}()
}
func (ts *TimedServer) processTimedRequest(req TimedRequest) {
start := time.Now()
// Check if context is already cancelled
select {
case <-req.Context.Done():
ts.sendResponse(req, nil, req.Context.Err(), start)
return
default:
}
// Simulate work with random duration
workDuration := time.Duration(rand.Intn(200)) * time.Millisecond
select {
case <-time.After(workDuration):
// Work completed
if num, ok := req.Data.(int); ok {
ts.sendResponse(req, num*2, nil, start)
} else {
ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start)
}
case <-req.Context.Done():
// Context cancelled during work
ts.sendResponse(req, nil, req.Context.Err(), start)
}
}
func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) {
response := TimedResponse{
ID: req.ID,
Result: result,
Error: err,
Duration: time.Since(start),
Timestamp: time.Now(),
}
select {
case req.Response <- response:
case <-req.Context.Done():
// Client no longer waiting
}
}
// SendRequestWithTimeout sends a request with a timeout
func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
responseChan := make(chan TimedResponse, 1)
request := TimedRequest{
ID: id,
Data: data,
Response: responseChan,
Context: ctx,
}
select {
case ts.requests <- request:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case response := <-responseChan:
fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration)
return response.Result, response.Error
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (ts *TimedServer) Stop() {
close(ts.quit)
}
func main() {
server := NewTimedServer()
server.Start()
defer server.Stop()
// Send requests with different timeouts
requests := []struct {
id string
data int
timeout time.Duration
}{
{"fast", 10, 300 * time.Millisecond},
{"medium", 20, 150 * time.Millisecond},
{"slow", 30, 50 * time.Millisecond}, // This might timeout
}
for _, req := range requests {
result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout)
if err != nil {
fmt.Printf("Request %s failed: %v\n", req.id, err)
} else {
fmt.Printf("Request %s result: %v\n", req.id, result)
}
}
}
Future/Promise Pattern
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Future represents a value that will be available in the future
type Future struct {
mu sync.Mutex
done chan struct{}
result interface{}
err error
computed bool
}
// NewFuture creates a new future
func NewFuture() *Future {
return &Future{
done: make(chan struct{}),
}
}
// Set sets the future's value
func (f *Future) Set(result interface{}, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.computed {
return // Already set
}
f.result = result
f.err = err
f.computed = true
close(f.done)
}
// Get waits for and returns the future's value
func (f *Future) Get() (interface{}, error) {
<-f.done
return f.result, f.err
}
// GetWithTimeout waits for the value with a timeout
func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) {
select {
case <-f.done:
return f.result, f.err
case <-time.After(timeout):
return nil, fmt.Errorf("timeout waiting for future")
}
}
// GetWithContext waits for the value with context cancellation
func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) {
select {
case <-f.done:
return f.result, f.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
// IsReady returns true if the future has been computed
func (f *Future) IsReady() bool {
f.mu.Lock()
defer f.mu.Unlock()
return f.computed
}
// AsyncService demonstrates async operations with futures
type AsyncService struct {
workers chan struct{}
}
func NewAsyncService(maxWorkers int) *AsyncService {
return &AsyncService{
workers: make(chan struct{}, maxWorkers),
}
}
// ProcessAsync starts async processing and returns a future
func (as *AsyncService) ProcessAsync(data interface{}) *Future {
future := NewFuture()
go func() {
// Acquire worker slot
as.workers <- struct{}{}
defer func() { <-as.workers }()
// Simulate processing
time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
// Process data
if num, ok := data.(int); ok {
future.Set(num*num, nil)
} else {
future.Set(nil, fmt.Errorf("invalid data type"))
}
}()
return future
}
func main() {
service := NewAsyncService(3)
// Start multiple async operations
futures := make([]*Future, 5)
for i := 0; i < 5; i++ {
fmt.Printf("Starting async operation %d\n", i+1)
futures[i] = service.ProcessAsync((i + 1) * 10)
}
// Wait for all results
fmt.Println("\nWaiting for results...")
for i, future := range futures {
result, err := future.Get()
if err != nil {
fmt.Printf("Operation %d failed: %v\n", i+1, err)
} else {
fmt.Printf("Operation %d result: %v\n", i+1, result)
}
}
// Example with timeout
fmt.Println("\nTesting timeout...")
timeoutFuture := service.ProcessAsync(100)
result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond)
if err != nil {
fmt.Printf("Timeout example failed: %v\n", err)
} else {
fmt.Printf("Timeout example result: %v\n", result)
}
}
Batch Request/Response
package main
import (
"fmt"
"sync"
"time"
)
// BatchRequest represents multiple requests processed together
type BatchRequest struct {
ID string
Items []interface{}
Response chan BatchResponse
}
// BatchResponse contains results for all items in a batch
type BatchResponse struct {
ID string
Results []BatchResult
Error error
}
// BatchResult represents the result of processing one item
type BatchResult struct {
Index int
Result interface{}
Error error
}
// BatchProcessor processes requests in batches for efficiency
type BatchProcessor struct {
requests chan BatchRequest
batchSize int
batchWindow time.Duration
quit chan bool
}
func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor {
return &BatchProcessor{
requests: make(chan BatchRequest, 100),
batchSize: batchSize,
batchWindow: batchWindow,
quit: make(chan bool),
}
}
func (bp *BatchProcessor) Start() {
go func() {
batch := make([]BatchRequest, 0, bp.batchSize)
timer := time.NewTimer(bp.batchWindow)
timer.Stop()
for {
select {
case req := <-bp.requests:
batch = append(batch, req)
if len(batch) == 1 {
timer.Reset(bp.batchWindow)
}
if len(batch) >= bp.batchSize {
bp.processBatch(batch)
batch = batch[:0]
timer.Stop()
}
case <-timer.C:
if len(batch) > 0 {
bp.processBatch(batch)
batch = batch[:0]
}
case <-bp.quit:
if len(batch) > 0 {
bp.processBatch(batch)
}
return
}
}
}()
}
func (bp *BatchProcessor) processBatch(batch []BatchRequest) {
fmt.Printf("Processing batch of %d requests\n", len(batch))
var wg sync.WaitGroup
for _, req := range batch {
wg.Add(1)
go func(r BatchRequest) {
defer wg.Done()
bp.processRequest(r)
}(req)
}
wg.Wait()
}
func (bp *BatchProcessor) processRequest(req BatchRequest) {
results := make([]BatchResult, len(req.Items))
for i, item := range req.Items {
// Simulate processing each item
time.Sleep(10 * time.Millisecond)
if num, ok := item.(int); ok {
results[i] = BatchResult{
Index: i,
Result: num * 3,
}
} else {
results[i] = BatchResult{
Index: i,
Error: fmt.Errorf("invalid item type at index %d", i),
}
}
}
response := BatchResponse{
ID: req.ID,
Results: results,
}
req.Response <- response
}
// SendBatchRequest sends a batch request and waits for response
func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) {
responseChan := make(chan BatchResponse, 1)
request := BatchRequest{
ID: id,
Items: items,
Response: responseChan,
}
bp.requests <- request
response := <-responseChan
return response.Results, response.Error
}
func (bp *BatchProcessor) Stop() {
close(bp.quit)
}
func main() {
processor := NewBatchProcessor(3, 100*time.Millisecond)
processor.Start()
defer processor.Stop()
// Send individual batch requests
go func() {
results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5})
if err != nil {
fmt.Printf("Batch 1 failed: %v\n", err)
return
}
fmt.Println("Batch 1 results:")
for _, result := range results {
if result.Error != nil {
fmt.Printf(" Item %d error: %v\n", result.Index, result.Error)
} else {
fmt.Printf(" Item %d result: %v\n", result.Index, result.Result)
}
}
}()
go func() {
results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30})
if err != nil {
fmt.Printf("Batch 2 failed: %v\n", err)
return
}
fmt.Println("Batch 2 results:")
for _, result := range results {
if result.Error != nil {
fmt.Printf(" Item %d error: %v\n", result.Index, result.Error)
} else {
fmt.Printf(" Item %d result: %v\n", result.Index, result.Result)
}
}
}()
// Wait for processing
time.Sleep(500 * time.Millisecond)
}
Best Practices
- Always Use Timeouts: Prevent indefinite blocking
- Handle Context Cancellation: Support graceful cancellation
- Buffer Response Channels: Avoid blocking responders
- Error Handling: Always include error information in responses
- Resource Cleanup: Ensure channels and goroutines are cleaned up
- Monitoring: Track request/response times and success rates
- Backpressure: Handle situations when responders are overwhelmed
Common Pitfalls
- Deadlocks: Not buffering response channels
- Goroutine Leaks: Not handling context cancellation
- Memory Leaks: Not closing channels properly
- Blocking Operations: Long-running operations without timeouts
- Lost Responses: Not handling channel closure
Testing Request/Response
package main
import (
"context"
"testing"
"time"
)
func TestRequestResponse(t *testing.T) {
server := NewTimedServer()
server.Start()
defer server.Stop()
// Test successful request
result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond)
if err != nil {
t.Fatalf("Request failed: %v", err)
}
if result != 84 {
t.Errorf("Expected 84, got %v", result)
}
// Test timeout
_, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond)
if err == nil {
t.Error("Expected timeout error")
}
}
func TestFuture(t *testing.T) {
future := NewFuture()
// Test that future is not ready initially
if future.IsReady() {
t.Error("Future should not be ready initially")
}
// Set value in goroutine
go func() {
time.Sleep(50 * time.Millisecond)
future.Set("test result", nil)
}()
// Get value
result, err := future.Get()
if err != nil {
t.Fatalf("Future failed: %v", err)
}
if result != "test result" {
t.Errorf("Expected 'test result', got %v", result)
}
// Test that future is ready after setting
if !future.IsReady() {
t.Error("Future should be ready after setting")
}
}
The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation.
Next: Learn about Worker Pool Pattern for managing concurrent workers efficiently.