Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Semaphore Pattern →


What is the Rate Limiter Pattern?

Rate limiting controls the rate at which operations are performed, preventing system overload and ensuring fair resource usage. It’s essential for protecting services from abuse, managing resource consumption, and maintaining system stability under load.

Common Algorithms:

  • Token Bucket: Allows bursts up to bucket capacity
  • Fixed Window: Fixed number of requests per time window
  • Sliding Window: Smooth rate limiting over time
  • Leaky Bucket: Constant output rate regardless of input

Real-World Use Cases

  • API Rate Limiting: Prevent API abuse and ensure fair usage
  • Database Throttling: Control database query rates
  • File Processing: Limit file processing rate
  • Network Operations: Control bandwidth usage
  • Background Jobs: Throttle job processing
  • User Actions: Prevent spam and abuse

Token Bucket Rate Limiter

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// TokenBucket implements the token bucket rate limiting algorithm
type TokenBucket struct {
    mu           sync.Mutex
    capacity     int           // Maximum number of tokens
    tokens       int           // Current number of tokens
    refillRate   int           // Tokens added per second
    lastRefill   time.Time     // Last refill time
}

// NewTokenBucket creates a new token bucket rate limiter
func NewTokenBucket(capacity, refillRate int) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity, // Start with full bucket
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

// Allow checks if a request should be allowed
func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    tb.refill()
    
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    
    return false
}

// AllowN checks if n requests should be allowed
func (tb *TokenBucket) AllowN(n int) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    tb.refill()
    
    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    
    return false
}

// Wait waits until a token is available
func (tb *TokenBucket) Wait(ctx context.Context) error {
    for {
        if tb.Allow() {
            return nil
        }
        
        select {
        case <-time.After(time.Millisecond * 10):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

// refill adds tokens based on elapsed time
func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    
    tokensToAdd := int(elapsed.Seconds() * float64(tb.refillRate))
    if tokensToAdd > 0 {
        tb.tokens += tokensToAdd
        if tb.tokens > tb.capacity {
            tb.tokens = tb.capacity
        }
        tb.lastRefill = now
    }
}

// GetStats returns current bucket statistics
func (tb *TokenBucket) GetStats() (tokens, capacity int) {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    tb.refill()
    return tb.tokens, tb.capacity
}

func main() {
    // Create a token bucket: 5 tokens capacity, 2 tokens per second refill rate
    limiter := NewTokenBucket(5, 2)
    
    fmt.Println("=== Token Bucket Rate Limiter Demo ===")
    
    // Test burst capability
    fmt.Println("\n--- Testing Burst Capability ---")
    for i := 1; i <= 7; i++ {
        allowed := limiter.Allow()
        tokens, capacity := limiter.GetStats()
        
        fmt.Printf("Request %d: %s (tokens: %d/%d)\n", 
            i, allowedStatus(allowed), tokens, capacity)
    }
    
    // Wait for refill
    fmt.Println("\n--- Waiting 3 seconds for refill ---")
    time.Sleep(3 * time.Second)
    
    // Test after refill
    fmt.Println("\n--- Testing After Refill ---")
    for i := 1; i <= 4; i++ {
        allowed := limiter.Allow()
        tokens, capacity := limiter.GetStats()
        
        fmt.Printf("Request %d: %s (tokens: %d/%d)\n", 
            i, allowedStatus(allowed), tokens, capacity)
    }
    
    // Test AllowN
    fmt.Println("\n--- Testing AllowN (requesting 3 tokens) ---")
    allowed := limiter.AllowN(3)
    tokens, capacity := limiter.GetStats()
    fmt.Printf("Bulk request: %s (tokens: %d/%d)\n", 
        allowedStatus(allowed), tokens, capacity)
}

func allowedStatus(allowed bool) string {
    if allowed {
        return "✅ ALLOWED"
    }
    return "❌ DENIED"
}

Sliding Window Rate Limiter

package main

import (
    "fmt"
    "sync"
    "time"
)

// SlidingWindow implements sliding window rate limiting
type SlidingWindow struct {
    mu         sync.Mutex
    requests   []time.Time
    limit      int           // Maximum requests per window
    window     time.Duration // Time window duration
}

// NewSlidingWindow creates a new sliding window rate limiter
func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow {
    return &SlidingWindow{
        requests: make([]time.Time, 0),
        limit:    limit,
        window:   window,
    }
}

// Allow checks if a request should be allowed
func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    now := time.Now()
    sw.cleanOldRequests(now)
    
    if len(sw.requests) < sw.limit {
        sw.requests = append(sw.requests, now)
        return true
    }
    
    return false
}

// cleanOldRequests removes requests outside the current window
func (sw *SlidingWindow) cleanOldRequests(now time.Time) {
    cutoff := now.Add(-sw.window)
    
    // Find first request within window
    start := 0
    for i, req := range sw.requests {
        if req.After(cutoff) {
            start = i
            break
        }
        start = len(sw.requests) // All requests are old
    }
    
    // Keep only recent requests
    if start > 0 {
        copy(sw.requests, sw.requests[start:])
        sw.requests = sw.requests[:len(sw.requests)-start]
    }
}

// GetStats returns current window statistics
func (sw *SlidingWindow) GetStats() (current, limit int, window time.Duration) {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    sw.cleanOldRequests(time.Now())
    return len(sw.requests), sw.limit, sw.window
}

// GetRequestTimes returns timestamps of requests in current window
func (sw *SlidingWindow) GetRequestTimes() []time.Time {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    sw.cleanOldRequests(time.Now())
    result := make([]time.Time, len(sw.requests))
    copy(result, sw.requests)
    return result
}

func main() {
    // Create sliding window: 3 requests per 2 seconds
    limiter := NewSlidingWindow(3, 2*time.Second)
    
    fmt.Println("=== Sliding Window Rate Limiter Demo ===")
    fmt.Println("Limit: 3 requests per 2 seconds")
    
    // Test requests over time
    for i := 1; i <= 8; i++ {
        allowed := limiter.Allow()
        current, limit, window := limiter.GetStats()
        
        fmt.Printf("Request %d: %s (current: %d/%d in %v window)\n", 
            i, allowedStatus(allowed), current, limit, window)
        
        if i == 4 {
            fmt.Println("--- Waiting 1 second ---")
            time.Sleep(1 * time.Second)
        } else if i == 6 {
            fmt.Println("--- Waiting 1.5 seconds ---")
            time.Sleep(1500 * time.Millisecond)
        } else {
            time.Sleep(200 * time.Millisecond)
        }
    }
    
    // Show request timeline
    fmt.Println("\n--- Request Timeline ---")
    requests := limiter.GetRequestTimes()
    now := time.Now()
    
    for i, req := range requests {
        age := now.Sub(req)
        fmt.Printf("Request %d: %v ago\n", i+1, age.Round(time.Millisecond))
    }
}

Fixed Window Rate Limiter

package main

import (
    "fmt"
    "sync"
    "time"
)

// FixedWindow implements fixed window rate limiting
type FixedWindow struct {
    mu           sync.Mutex
    limit        int           // Maximum requests per window
    window       time.Duration // Window duration
    currentCount int           // Current window request count
    windowStart  time.Time     // Current window start time
}

// NewFixedWindow creates a new fixed window rate limiter
func NewFixedWindow(limit int, window time.Duration) *FixedWindow {
    return &FixedWindow{
        limit:       limit,
        window:      window,
        windowStart: time.Now(),
    }
}

// Allow checks if a request should be allowed
func (fw *FixedWindow) Allow() bool {
    fw.mu.Lock()
    defer fw.mu.Unlock()
    
    now := time.Now()
    
    // Check if we need to start a new window
    if now.Sub(fw.windowStart) >= fw.window {
        fw.currentCount = 0
        fw.windowStart = now
    }
    
    if fw.currentCount < fw.limit {
        fw.currentCount++
        return true
    }
    
    return false
}

// GetStats returns current window statistics
func (fw *FixedWindow) GetStats() (current, limit int, windowRemaining time.Duration) {
    fw.mu.Lock()
    defer fw.mu.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(fw.windowStart)
    
    if elapsed >= fw.window {
        return 0, fw.limit, fw.window
    }
    
    return fw.currentCount, fw.limit, fw.window - elapsed
}

func main() {
    // Create fixed window: 3 requests per 2 seconds
    limiter := NewFixedWindow(3, 2*time.Second)
    
    fmt.Println("=== Fixed Window Rate Limiter Demo ===")
    fmt.Println("Limit: 3 requests per 2 seconds")
    
    // Test requests over time
    for i := 1; i <= 10; i++ {
        allowed := limiter.Allow()
        current, limit, remaining := limiter.GetStats()
        
        fmt.Printf("Request %d: %s (current: %d/%d, window resets in: %v)\n", 
            i, allowedStatus(allowed), current, limit, remaining.Round(time.Millisecond))
        
        time.Sleep(400 * time.Millisecond)
    }
}

Advanced Rate Limiter with Multiple Algorithms

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// RateLimiterType represents different rate limiting algorithms
type RateLimiterType int

const (
    TokenBucketType RateLimiterType = iota
    SlidingWindowType
    FixedWindowType
)

// RateLimiter interface for different rate limiting algorithms
type RateLimiter interface {
    Allow() bool
    Wait(ctx context.Context) error
    GetStats() map[string]interface{}
}

// MultiRateLimiter combines multiple rate limiters
type MultiRateLimiter struct {
    limiters []RateLimiter
    names    []string
}

// NewMultiRateLimiter creates a new multi-algorithm rate limiter
func NewMultiRateLimiter() *MultiRateLimiter {
    return &MultiRateLimiter{
        limiters: make([]RateLimiter, 0),
        names:    make([]string, 0),
    }
}

// AddLimiter adds a rate limiter with a name
func (mrl *MultiRateLimiter) AddLimiter(name string, limiter RateLimiter) {
    mrl.limiters = append(mrl.limiters, limiter)
    mrl.names = append(mrl.names, name)
}

// Allow checks if request is allowed by all limiters
func (mrl *MultiRateLimiter) Allow() bool {
    for _, limiter := range mrl.limiters {
        if !limiter.Allow() {
            return false
        }
    }
    return true
}

// Wait waits until all limiters allow the request
func (mrl *MultiRateLimiter) Wait(ctx context.Context) error {
    for _, limiter := range mrl.limiters {
        if err := limiter.Wait(ctx); err != nil {
            return err
        }
    }
    return nil
}

// GetStats returns stats from all limiters
func (mrl *MultiRateLimiter) GetStats() map[string]interface{} {
    stats := make(map[string]interface{})
    
    for i, limiter := range mrl.limiters {
        stats[mrl.names[i]] = limiter.GetStats()
    }
    
    return stats
}

// Enhanced TokenBucket with RateLimiter interface
type EnhancedTokenBucket struct {
    *TokenBucket
}

func (etb *EnhancedTokenBucket) GetStats() map[string]interface{} {
    tokens, capacity := etb.TokenBucket.GetStats()
    return map[string]interface{}{
        "type":     "token_bucket",
        "tokens":   tokens,
        "capacity": capacity,
        "rate":     etb.refillRate,
    }
}

// Enhanced SlidingWindow with RateLimiter interface
type EnhancedSlidingWindow struct {
    *SlidingWindow
}

func (esw *EnhancedSlidingWindow) Wait(ctx context.Context) error {
    for {
        if esw.Allow() {
            return nil
        }
        
        select {
        case <-time.After(time.Millisecond * 10):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (esw *EnhancedSlidingWindow) GetStats() map[string]interface{} {
    current, limit, window := esw.SlidingWindow.GetStats()
    return map[string]interface{}{
        "type":    "sliding_window",
        "current": current,
        "limit":   limit,
        "window":  window.String(),
    }
}

// Enhanced FixedWindow with RateLimiter interface
type EnhancedFixedWindow struct {
    *FixedWindow
}

func (efw *EnhancedFixedWindow) Wait(ctx context.Context) error {
    for {
        if efw.Allow() {
            return nil
        }
        
        select {
        case <-time.After(time.Millisecond * 10):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (efw *EnhancedFixedWindow) GetStats() map[string]interface{} {
    current, limit, remaining := efw.FixedWindow.GetStats()
    return map[string]interface{}{
        "type":      "fixed_window",
        "current":   current,
        "limit":     limit,
        "remaining": remaining.String(),
    }
}

// RateLimitedService demonstrates rate limiting in a service
type RateLimitedService struct {
    limiter RateLimiter
    mu      sync.Mutex
    stats   struct {
        totalRequests   int
        allowedRequests int
        deniedRequests  int
    }
}

// NewRateLimitedService creates a new rate limited service
func NewRateLimitedService(limiter RateLimiter) *RateLimitedService {
    return &RateLimitedService{
        limiter: limiter,
    }
}

// ProcessRequest processes a request with rate limiting
func (rls *RateLimitedService) ProcessRequest(ctx context.Context, requestID string) error {
    rls.mu.Lock()
    rls.stats.totalRequests++
    rls.mu.Unlock()
    
    if !rls.limiter.Allow() {
        rls.mu.Lock()
        rls.stats.deniedRequests++
        rls.mu.Unlock()
        
        return fmt.Errorf("request %s denied by rate limiter", requestID)
    }
    
    rls.mu.Lock()
    rls.stats.allowedRequests++
    rls.mu.Unlock()
    
    // Simulate processing
    time.Sleep(50 * time.Millisecond)
    
    fmt.Printf("✅ Processed request %s\n", requestID)
    return nil
}

// GetServiceStats returns service statistics
func (rls *RateLimitedService) GetServiceStats() map[string]interface{} {
    rls.mu.Lock()
    defer rls.mu.Unlock()
    
    return map[string]interface{}{
        "total_requests":   rls.stats.totalRequests,
        "allowed_requests": rls.stats.allowedRequests,
        "denied_requests":  rls.stats.deniedRequests,
        "rate_limiter":     rls.limiter.GetStats(),
    }
}

func main() {
    // Create multi-algorithm rate limiter
    multiLimiter := NewMultiRateLimiter()
    
    // Add different rate limiters
    multiLimiter.AddLimiter("token_bucket", &EnhancedTokenBucket{
        TokenBucket: NewTokenBucket(5, 2), // 5 tokens, 2 per second
    })
    
    multiLimiter.AddLimiter("sliding_window", &EnhancedSlidingWindow{
        SlidingWindow: NewSlidingWindow(3, 2*time.Second), // 3 requests per 2 seconds
    })
    
    multiLimiter.AddLimiter("fixed_window", &EnhancedFixedWindow{
        FixedWindow: NewFixedWindow(4, 3*time.Second), // 4 requests per 3 seconds
    })
    
    service := NewRateLimitedService(multiLimiter)
    
    fmt.Println("=== Multi-Algorithm Rate Limiter Demo ===")
    fmt.Println("Using Token Bucket (5 tokens, 2/sec) + Sliding Window (3/2sec) + Fixed Window (4/3sec)")
    
    // Simulate concurrent requests
    var wg sync.WaitGroup
    for i := 1; i <= 15; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
            defer cancel()
            
            requestID := fmt.Sprintf("req-%d", id)
            err := service.ProcessRequest(ctx, requestID)
            
            if err != nil {
                fmt.Printf("❌ %v\n", err)
            }
        }(i)
        
        time.Sleep(200 * time.Millisecond)
    }
    
    wg.Wait()
    
    // Print final statistics
    fmt.Println("\n=== Final Statistics ===")
    stats := service.GetServiceStats()
    
    fmt.Printf("Total Requests: %d\n", stats["total_requests"])
    fmt.Printf("Allowed Requests: %d\n", stats["allowed_requests"])
    fmt.Printf("Denied Requests: %d\n", stats["denied_requests"])
    
    fmt.Println("\nRate Limiter Details:")
    rateLimiterStats := stats["rate_limiter"].(map[string]interface{})
    for name, limiterStats := range rateLimiterStats {
        fmt.Printf("  %s: %+v\n", name, limiterStats)
    }
}

Best Practices

  1. Choose Right Algorithm: Select based on your specific requirements

    • Token Bucket: Allow bursts, good for APIs
    • Sliding Window: Smooth rate limiting
    • Fixed Window: Simple, memory efficient
  2. Configure Appropriately: Set limits based on system capacity

  3. Handle Rejections Gracefully: Provide meaningful error messages

  4. Monitor Metrics: Track allowed/denied requests and adjust limits

  5. Use Context: Support cancellation in Wait operations

  6. Consider Distributed Systems: Use Redis or similar for distributed rate limiting

  7. Implement Backoff: Add exponential backoff for denied requests

Common Pitfalls

  1. Too Restrictive: Setting limits too low affects user experience
  2. Too Permissive: High limits don’t protect against abuse
  3. Memory Leaks: Not cleaning old requests in sliding window
  4. Race Conditions: Not properly synchronizing access to counters
  5. Ignoring Bursts: Fixed windows can allow double the limit at boundaries

Rate limiting is essential for protecting services from overload and ensuring fair resource usage. Choose the right algorithm based on your requirements and always monitor the effectiveness of your rate limiting strategy.


Next: Learn about Semaphore Pattern for resource counting and limiting concurrent access.