Graceful Shutdown Patterns in Go

    Go Concurrency Patterns Series: ← Go Memory Model | Series Overview | Generics Patterns → What is Graceful Shutdown? Graceful shutdown is the process of cleanly stopping a running application by: Receiving shutdown signals (SIGTERM, SIGINT) Stopping acceptance of new requests Finishing in-flight requests Closing database connections and other resources Flushing logs and metrics Exiting with appropriate status code Why It Matters: Zero-downtime deployments: No dropped requests during rollouts Data integrity: Complete ongoing transactions Resource cleanup: Prevent leaks and corruption Observability: Flush pending logs and metrics Container orchestration: Proper Kubernetes pod termination Real-World Use Cases HTTP/gRPC servers: Drain active connections before shutdown Background workers: Complete current jobs, reject new ones Message consumers: Finish processing messages, commit offsets Database connections: Close pools cleanly Caching layers: Persist in-memory state Kubernetes deployments: Respect termination grace period Basic Signal Handling Simple Shutdown Handler package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" ) func main() { // Create signal channel sigChan := make(chan os.Signal, 1) // Register for SIGINT and SIGTERM signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Simulate application work done := make(chan bool) go func() { fmt.Println("Application running...") time.Sleep(30 * time.Second) done <- true }() // Wait for signal or completion select { case sig := <-sigChan: fmt.Printf("\nReceived signal: %v\n", sig) fmt.Println("Initiating graceful shutdown...") // Perform cleanup cleanup() fmt.Println("Shutdown complete") os.Exit(0) case <-done: fmt.Println("Application completed normally") } } func cleanup() { fmt.Println("Cleaning up resources...") time.Sleep(2 * time.Second) fmt.Println("Cleanup complete") } HTTP Server Graceful Shutdown Basic HTTP Server Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) func main() { // Create HTTP server server := &http.Server{ Addr: ":8080", Handler: setupRoutes(), } // Channel to listen for errors from the server serverErrors := make(chan error, 1) // Start HTTP server in goroutine go func() { log.Printf("Server starting on %s", server.Addr) serverErrors <- server.ListenAndServe() }() // Channel to listen for interrupt signals shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) // Block until we receive a signal or server error select { case err := <-serverErrors: log.Fatalf("Server error: %v", err) case sig := <-shutdown: log.Printf("Received signal: %v. Starting graceful shutdown...", sig) // Create context with timeout for shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Attempt graceful shutdown if err := server.Shutdown(ctx); err != nil { log.Printf("Graceful shutdown failed: %v", err) // Force close if graceful shutdown fails if err := server.Close(); err != nil { log.Fatalf("Force close failed: %v", err) } } log.Println("Server shutdown complete") } } func setupRoutes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello, World!") }) mux.HandleFunc("/slow", func(w http.ResponseWriter, r *http.Request) { // Simulate slow endpoint time.Sleep(5 * time.Second) fmt.Fprintf(w, "Slow response complete") }) return mux } Advanced HTTP Server with Multiple Resources package main import ( "context" "database/sql" "fmt" "log" "net/http" "os" "os/signal" "sync" "syscall" "time" _ "github.com/lib/pq" ) type Application struct { server *http.Server db *sql.DB logger *log.Logger shutdown chan struct{} wg sync.WaitGroup } func NewApplication() (*Application, error) { // Initialize database db, err := sql.Open("postgres", "postgres://localhost/mydb") if err != nil { return nil, fmt.Errorf("database connection failed: %w", err) } app := &Application{ db: db, logger: log.New(os.Stdout, "APP: ", log.LstdFlags), shutdown: make(chan struct{}), } // Setup HTTP server app.server = &http.Server{ Addr: ":8080", Handler: app.routes(), ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, } return app, nil } func (app *Application) routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/health", app.healthHandler) mux.HandleFunc("/api/data", app.dataHandler) return mux } func (app *Application) healthHandler(w http.ResponseWriter, r *http.Request) { select { case <-app.shutdown: // Signal shutdown in progress w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "Shutting down") default: w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "OK") } } func (app *Application) dataHandler(w http.ResponseWriter, r *http.Request) { // Check if shutdown initiated select { case <-app.shutdown: http.Error(w, "Service shutting down", http.StatusServiceUnavailable) return default: } // Simulate database query ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() var count int err := app.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM items").Scan(&count) if err != nil { http.Error(w, "Database error", http.StatusInternalServerError) return } fmt.Fprintf(w, "Count: %d", count) } func (app *Application) Run() error { // Start HTTP server app.wg.Add(1) go func() { defer app.wg.Done() app.logger.Printf("Starting server on %s", app.server.Addr) if err := app.server.ListenAndServe(); err != http.ErrServerClosed { app.logger.Printf("Server error: %v", err) } }() // Start background worker app.wg.Add(1) go func() { defer app.wg.Done() app.backgroundWorker() }() return nil } func (app *Application) backgroundWorker() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: app.logger.Println("Background job executing...") // Do work case <-app.shutdown: app.logger.Println("Background worker shutting down...") return } } } func (app *Application) Shutdown(ctx context.Context) error { app.logger.Println("Starting graceful shutdown...") // Signal all components to stop close(app.shutdown) // Shutdown HTTP server app.logger.Println("Shutting down HTTP server...") if err := app.server.Shutdown(ctx); err != nil { return fmt.Errorf("HTTP server shutdown failed: %w", err) } // Wait for background workers to finish app.logger.Println("Waiting for background workers...") done := make(chan struct{}) go func() { app.wg.Wait() close(done) }() select { case <-done: app.logger.Println("All workers stopped") case <-ctx.Done(): return fmt.Errorf("shutdown timeout exceeded") } // Close database connections app.logger.Println("Closing database connections...") if err := app.db.Close(); err != nil { return fmt.Errorf("database close failed: %w", err) } app.logger.Println("Graceful shutdown complete") return nil } func main() { app, err := NewApplication() if err != nil { log.Fatalf("Application initialization failed: %v", err) } if err := app.Run(); err != nil { log.Fatalf("Application run failed: %v", err) } // Wait for shutdown signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) // Graceful shutdown with 30 second timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Application stopped") } Worker Pool Graceful Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Job struct { ID int Data string } type WorkerPool struct { jobs chan Job results chan int numWorkers int wg sync.WaitGroup shutdown chan struct{} } func NewWorkerPool(numWorkers, queueSize int) *WorkerPool { return &WorkerPool{ jobs: make(chan Job, queueSize), results: make(chan int, queueSize), numWorkers: numWorkers, shutdown: make(chan struct{}), } } func (wp *WorkerPool) Start() { for i := 0; i < wp.numWorkers; i++ { wp.wg.Add(1) go wp.worker(i) } log.Printf("Started %d workers", wp.numWorkers) } func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() log.Printf("Worker %d started", id) for { select { case job, ok := <-wp.jobs: if !ok { log.Printf("Worker %d: job channel closed, exiting", id) return } // Process job log.Printf("Worker %d processing job %d", id, job.ID) result := wp.processJob(job) wp.results <- result case <-wp.shutdown: // Drain remaining jobs before shutdown log.Printf("Worker %d: shutdown signal received, draining jobs...", id) for job := range wp.jobs { log.Printf("Worker %d processing remaining job %d", id, job.ID) result := wp.processJob(job) wp.results <- result } log.Printf("Worker %d: shutdown complete", id) return } } } func (wp *WorkerPool) processJob(job Job) int { // Simulate work time.Sleep(1 * time.Second) return job.ID * 2 } func (wp *WorkerPool) Submit(job Job) bool { select { case <-wp.shutdown: return false // Pool is shutting down case wp.jobs <- job: return true } } func (wp *WorkerPool) Shutdown(ctx context.Context) error { log.Println("WorkerPool: initiating shutdown...") // Signal workers to start draining close(wp.shutdown) // Close job channel to signal no more jobs close(wp.jobs) // Wait for workers with timeout done := make(chan struct{}) go func() { wp.wg.Wait() close(done) close(wp.results) }() select { case <-done: log.Println("WorkerPool: all workers completed") return nil case <-ctx.Done(): return fmt.Errorf("shutdown timeout: %w", ctx.Err()) } } func main() { pool := NewWorkerPool(3, 10) pool.Start() // Submit jobs go func() { for i := 1; i <= 20; i++ { job := Job{ID: i, Data: fmt.Sprintf("Job %d", i)} if !pool.Submit(job) { log.Printf("Failed to submit job %d (pool shutting down)", i) return } time.Sleep(200 * time.Millisecond) } }() // Collect results go func() { for result := range pool.results { log.Printf("Result: %d", result) } log.Println("All results collected") }() // Wait a bit then shutdown time.Sleep(5 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := pool.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Kubernetes-Aware Graceful Shutdown package main import ( "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" ) type KubernetesServer struct { server *http.Server shutdownDelay time.Duration terminationPeriod time.Duration } func NewKubernetesServer() *KubernetesServer { ks := &KubernetesServer{ // Delay before starting shutdown to allow load balancer de-registration shutdownDelay: 5 * time.Second, // Total Kubernetes termination grace period terminationPeriod: 30 * time.Second, } mux := http.NewServeMux() mux.HandleFunc("/health", ks.healthHandler) mux.HandleFunc("/readiness", ks.readinessHandler) mux.HandleFunc("/", ks.requestHandler) ks.server = &http.Server{ Addr: ":8080", Handler: mux, } return ks } var ( isHealthy = true isReady = true ) func (ks *KubernetesServer) healthHandler(w http.ResponseWriter, r *http.Request) { if isHealthy { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "healthy") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "unhealthy") } } func (ks *KubernetesServer) readinessHandler(w http.ResponseWriter, r *http.Request) { if isReady { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "ready") } else { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprintf(w, "not ready") } } func (ks *KubernetesServer) requestHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Request processed at %s", time.Now().Format(time.RFC3339)) } func (ks *KubernetesServer) Run() error { go func() { log.Printf("Server starting on %s", ks.server.Addr) if err := ks.server.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } }() return nil } func (ks *KubernetesServer) GracefulShutdown() error { // Step 1: Mark as not ready (stop receiving new traffic from load balancer) log.Println("Step 1: Marking pod as not ready...") isReady = false // Step 2: Wait for load balancer to de-register log.Printf("Step 2: Waiting %v for load balancer de-registration...", ks.shutdownDelay) time.Sleep(ks.shutdownDelay) // Step 3: Stop accepting new connections and drain existing ones log.Println("Step 3: Shutting down HTTP server...") // Calculate remaining time for shutdown shutdownTimeout := ks.terminationPeriod - ks.shutdownDelay - (2 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := ks.server.Shutdown(ctx); err != nil { log.Printf("Server shutdown error: %v", err) return err } log.Println("Step 4: Graceful shutdown complete") return nil } func main() { server := NewKubernetesServer() if err := server.Run(); err != nil { log.Fatalf("Server run failed: %v", err) } // Wait for termination signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("Received signal: %v", sig) if err := server.GracefulShutdown(); err != nil { log.Fatalf("Graceful shutdown failed: %v", err) os.Exit(1) } log.Println("Server stopped successfully") } Database Connection Cleanup package main import ( "context" "database/sql" "log" "time" _ "github.com/lib/pq" ) type DatabaseManager struct { db *sql.DB } func NewDatabaseManager(connString string) (*DatabaseManager, error) { db, err := sql.Open("postgres", connString) if err != nil { return nil, err } // Configure connection pool db.SetMaxOpenConns(25) db.SetMaxIdleConns(5) db.SetConnMaxLifetime(5 * time.Minute) db.SetConnMaxIdleTime(10 * time.Minute) return &DatabaseManager{db: db}, nil } func (dm *DatabaseManager) Shutdown(ctx context.Context) error { log.Println("Closing database connections...") // Wait for active queries to complete or timeout done := make(chan error, 1) go func() { // Close will wait for all connections to be returned to pool done <- dm.db.Close() }() select { case err := <-done: if err != nil { return err } log.Println("Database connections closed successfully") return nil case <-ctx.Done(): log.Println("Database close timeout exceeded") return ctx.Err() } } // QueryWithShutdown performs query with shutdown awareness func (dm *DatabaseManager) QueryWithShutdown(ctx context.Context, query string) error { // Check if context is already cancelled (shutdown initiated) if ctx.Err() != nil { return ctx.Err() } rows, err := dm.db.QueryContext(ctx, query) if err != nil { return err } defer rows.Close() // Process rows for rows.Next() { // Check for shutdown during processing if ctx.Err() != nil { return ctx.Err() } // Process row... } return rows.Err() } Message Queue Consumer Shutdown package main import ( "context" "fmt" "log" "sync" "time" ) type Message struct { ID string Payload string } type Consumer struct { messages chan Message shutdown chan struct{} wg sync.WaitGroup } func NewConsumer() *Consumer { return &Consumer{ messages: make(chan Message, 100), shutdown: make(chan struct{}), } } func (c *Consumer) Start() { c.wg.Add(1) go c.consume() log.Println("Consumer started") } func (c *Consumer) consume() { defer c.wg.Done() for { select { case msg := <-c.messages: // Process message if err := c.processMessage(msg); err != nil { log.Printf("Error processing message %s: %v", msg.ID, err) // In production: send to dead letter queue } case <-c.shutdown: log.Println("Consumer: shutdown initiated, processing remaining messages...") // Drain remaining messages for msg := range c.messages { log.Printf("Consumer: processing remaining message %s", msg.ID) if err := c.processMessage(msg); err != nil { log.Printf("Error processing remaining message %s: %v", msg.ID, err) } } log.Println("Consumer: all messages processed") return } } } func (c *Consumer) processMessage(msg Message) error { log.Printf("Processing message: %s", msg.ID) // Simulate processing time.Sleep(500 * time.Millisecond) // Acknowledge message log.Printf("Message %s processed successfully", msg.ID) return nil } func (c *Consumer) Shutdown(ctx context.Context) error { log.Println("Consumer: initiating shutdown...") // Stop accepting new messages close(c.shutdown) close(c.messages) // Wait for processing to complete done := make(chan struct{}) go func() { c.wg.Wait() close(done) }() select { case <-done: log.Println("Consumer: shutdown complete") return nil case <-ctx.Done(): return fmt.Errorf("consumer shutdown timeout: %w", ctx.Err()) } } func main() { consumer := NewConsumer() consumer.Start() // Simulate receiving messages go func() { for i := 1; i <= 10; i++ { msg := Message{ ID: fmt.Sprintf("msg-%d", i), Payload: fmt.Sprintf("Payload %d", i), } consumer.messages <- msg time.Sleep(300 * time.Millisecond) } }() // Wait then shutdown time.Sleep(3 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := consumer.Shutdown(ctx); err != nil { log.Fatalf("Shutdown failed: %v", err) } log.Println("Main: shutdown complete") } Best Practices 1. Use Context for Timeouts // Set realistic shutdown timeout ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { // Handle timeout } 2. Implement Shutdown Sequence func (app *Application) Shutdown(ctx context.Context) error { // 1. Stop health checks (remove from load balancer) app.setUnhealthy() // 2. Wait for de-registration time.Sleep(5 * time.Second) // 3. Stop accepting new requests app.server.Shutdown(ctx) // 4. Finish in-flight requests app.wg.Wait() // 5. Close resources app.db.Close() return nil } 3. Test Graceful Shutdown func TestGracefulShutdown(t *testing.T) { app := NewApplication() app.Run() // Start long-running request go func() { resp, err := http.Get("http://localhost:8080/slow") if err != nil { t.Errorf("Request failed: %v", err) } defer resp.Body.Close() }() // Wait for request to start time.Sleep(100 * time.Millisecond) // Initiate shutdown ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := app.Shutdown(ctx); err != nil { t.Fatalf("Shutdown failed: %v", err) } } Kubernetes Configuration Pod Termination Lifecycle apiVersion: apps/v1 kind: Deployment metadata: name: myapp spec: template: spec: containers: - name: myapp image: myapp:latest ports: - containerPort: 8080 # Liveness probe (restart if unhealthy) livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 # Readiness probe (remove from service if not ready) readinessProbe: httpGet: path: /readiness port: 8080 initialDelaySeconds: 5 periodSeconds: 5 # Graceful shutdown configuration lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 5"] # Termination grace period (must be > shutdown delay + max request time) terminationGracePeriodSeconds: 30 Common Pitfalls Not Handling SIGTERM: Container orchestrators send SIGTERM for graceful shutdown Insufficient Timeout: Set timeout longer than longest request duration Ignoring In-Flight Requests: Always wait for active requests to complete Not Closing Resources: Explicitly close databases, files, connections Immediate Exit: Don’t call os.Exit(0) without cleanup Performance Considerations Shutdown Delay: Balance between deployment speed and zero downtime Timeout Values: Consider 95th percentile request duration Resource Cleanup: Close in order of dependency Logging: Flush logs before exit Metrics: Report shutdown metrics for monitoring Conclusion Graceful shutdown is essential for production Go services to ensure zero-downtime deployments and data integrity. ...

    June 27, 2024 · 13 min · Rafiul Alam