Layered Architecture in Go: Building Maintainable Applications

    Go Architecture Patterns Series: Series Overview | Next: Clean Architecture → What is Layered Architecture? Layered Architecture is one of the most common and fundamental architectural patterns in software development. It organizes code into horizontal layers, where each layer has a specific responsibility and only communicates with adjacent layers. Key Principles: Separation of Concerns: Each layer handles a specific aspect of the application Layer Independence: Layers are loosely coupled and can be changed independently Unidirectional Dependencies: Dependencies flow in one direction (top-down) Clear Boundaries: Well-defined interfaces between layers Testability: Each layer can be tested in isolation Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph TD A[Presentation Layer] --> B[Business Logic Layer] B --> C[Data Access Layer] C --> D[(Database)] style A fill:#1e3a5f,color:#fff style B fill:#4a4420,color:#fff style C fill:#1e4620,color:#fff style D fill:#3a1e4a,color:#fff Traditional 3-Tier Layered Architecture %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph LR subgraph "Presentation Layer" A1[HTTP Handlers] A2[Templates/Views] A3[Request/Response DTOs] end subgraph "Business Logic Layer" B1[Services] B2[Business Rules] B3[Domain Models] end subgraph "Data Access Layer" C1[Repositories] C2[Database Access] C3[Data Models] end A1 --> B1 A2 --> B1 B1 --> C1 C1 --> C2 style A1 fill:#1e3a5f,color:#fff style B1 fill:#4a4420,color:#fff style C1 fill:#1e4620,color:#fff Real-World Use Cases Web Applications: RESTful APIs and web services Enterprise Applications: Business management systems CRUD Applications: Standard create-read-update-delete operations Monolithic Applications: Traditional single-deployment applications Internal Tools: Admin panels and dashboards Legacy System Modernization: Refactoring existing codebases Basic Layered Architecture Implementation Project Structure ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── handlers/ # Presentation Layer │ │ ├── user_handler.go │ │ └── product_handler.go │ ├── services/ # Business Logic Layer │ │ ├── user_service.go │ │ └── product_service.go │ ├── repositories/ # Data Access Layer │ │ ├── user_repository.go │ │ └── product_repository.go │ └── models/ # Domain Models │ ├── user.go │ └── product.go └── go.mod Layer 1: Domain Models package models import "time" // User represents a user in the system type User struct { ID int64 `json:"id"` Email string `json:"email"` Name string `json:"name"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // Product represents a product in the system type Product struct { ID int64 `json:"id"` Name string `json:"name"` Description string `json:"description"` Price float64 `json:"price"` Stock int `json:"stock"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // Order represents an order in the system type Order struct { ID int64 `json:"id"` UserID int64 `json:"user_id"` ProductID int64 `json:"product_id"` Quantity int `json:"quantity"` Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` } Layer 2: Data Access Layer (Repository) package repositories import ( "context" "database/sql" "fmt" "time" "myapp/internal/models" ) // UserRepository handles user data access type UserRepository struct { db *sql.DB } // NewUserRepository creates a new user repository func NewUserRepository(db *sql.DB) *UserRepository { return &UserRepository{db: db} } // Create creates a new user func (r *UserRepository) Create(ctx context.Context, user *models.User) error { query := ` INSERT INTO users (email, name, created_at, updated_at) VALUES ($1, $2, $3, $4) RETURNING id ` now := time.Now() user.CreatedAt = now user.UpdatedAt = now err := r.db.QueryRowContext( ctx, query, user.Email, user.Name, user.CreatedAt, user.UpdatedAt, ).Scan(&user.ID) if err != nil { return fmt.Errorf("failed to create user: %w", err) } return nil } // GetByID retrieves a user by ID func (r *UserRepository) GetByID(ctx context.Context, id int64) (*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users WHERE id = $1 ` user := &models.User{} err := r.db.QueryRowContext(ctx, query, id).Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, fmt.Errorf("user not found") } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } return user, nil } // GetByEmail retrieves a user by email func (r *UserRepository) GetByEmail(ctx context.Context, email string) (*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users WHERE email = $1 ` user := &models.User{} err := r.db.QueryRowContext(ctx, query, email).Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, fmt.Errorf("user not found") } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } return user, nil } // Update updates a user func (r *UserRepository) Update(ctx context.Context, user *models.User) error { query := ` UPDATE users SET email = $1, name = $2, updated_at = $3 WHERE id = $4 ` user.UpdatedAt = time.Now() result, err := r.db.ExecContext( ctx, query, user.Email, user.Name, user.UpdatedAt, user.ID, ) if err != nil { return fmt.Errorf("failed to update user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("user not found") } return nil } // Delete deletes a user func (r *UserRepository) Delete(ctx context.Context, id int64) error { query := `DELETE FROM users WHERE id = $1` result, err := r.db.ExecContext(ctx, query, id) if err != nil { return fmt.Errorf("failed to delete user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("user not found") } return nil } // List retrieves all users func (r *UserRepository) List(ctx context.Context, limit, offset int) ([]*models.User, error) { query := ` SELECT id, email, name, created_at, updated_at FROM users ORDER BY id DESC LIMIT $1 OFFSET $2 ` rows, err := r.db.QueryContext(ctx, query, limit, offset) if err != nil { return nil, fmt.Errorf("failed to list users: %w", err) } defer rows.Close() var users []*models.User for rows.Next() { user := &models.User{} err := rows.Scan( &user.ID, &user.Email, &user.Name, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to scan user: %w", err) } users = append(users, user) } return users, nil } // ProductRepository handles product data access type ProductRepository struct { db *sql.DB } // NewProductRepository creates a new product repository func NewProductRepository(db *sql.DB) *ProductRepository { return &ProductRepository{db: db} } // UpdateStock updates product stock func (r *ProductRepository) UpdateStock(ctx context.Context, productID int64, quantity int) error { query := ` UPDATE products SET stock = stock + $1, updated_at = $2 WHERE id = $3 ` result, err := r.db.ExecContext(ctx, query, quantity, time.Now(), productID) if err != nil { return fmt.Errorf("failed to update stock: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return fmt.Errorf("product not found") } return nil } Layer 3: Business Logic Layer (Service) package services import ( "context" "fmt" "regexp" "myapp/internal/models" "myapp/internal/repositories" ) // UserService handles user business logic type UserService struct { userRepo *repositories.UserRepository } // NewUserService creates a new user service func NewUserService(userRepo *repositories.UserRepository) *UserService { return &UserService{ userRepo: userRepo, } } // CreateUser creates a new user with validation func (s *UserService) CreateUser(ctx context.Context, email, name string) (*models.User, error) { // Business rule: Validate email format if !isValidEmail(email) { return nil, fmt.Errorf("invalid email format") } // Business rule: Name must not be empty if name == "" { return nil, fmt.Errorf("name cannot be empty") } // Business rule: Email must be unique existingUser, err := s.userRepo.GetByEmail(ctx, email) if err == nil && existingUser != nil { return nil, fmt.Errorf("email already exists") } user := &models.User{ Email: email, Name: name, } if err := s.userRepo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } return user, nil } // GetUser retrieves a user by ID func (s *UserService) GetUser(ctx context.Context, id int64) (*models.User, error) { return s.userRepo.GetByID(ctx, id) } // UpdateUser updates a user with validation func (s *UserService) UpdateUser(ctx context.Context, id int64, email, name string) (*models.User, error) { // Business rule: Validate email format if !isValidEmail(email) { return nil, fmt.Errorf("invalid email format") } // Business rule: Name must not be empty if name == "" { return nil, fmt.Errorf("name cannot be empty") } user, err := s.userRepo.GetByID(ctx, id) if err != nil { return nil, fmt.Errorf("user not found: %w", err) } // Business rule: If email is changing, check uniqueness if user.Email != email { existingUser, err := s.userRepo.GetByEmail(ctx, email) if err == nil && existingUser != nil { return nil, fmt.Errorf("email already exists") } } user.Email = email user.Name = name if err := s.userRepo.Update(ctx, user); err != nil { return nil, fmt.Errorf("failed to update user: %w", err) } return user, nil } // DeleteUser deletes a user func (s *UserService) DeleteUser(ctx context.Context, id int64) error { return s.userRepo.Delete(ctx, id) } // ListUsers retrieves users with pagination func (s *UserService) ListUsers(ctx context.Context, page, pageSize int) ([]*models.User, error) { // Business rule: Validate pagination parameters if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } offset := (page - 1) * pageSize return s.userRepo.List(ctx, pageSize, offset) } // isValidEmail validates email format func isValidEmail(email string) bool { emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`) return emailRegex.MatchString(email) } // OrderService handles order business logic type OrderService struct { userRepo *repositories.UserRepository productRepo *repositories.ProductRepository } // NewOrderService creates a new order service func NewOrderService( userRepo *repositories.UserRepository, productRepo *repositories.ProductRepository, ) *OrderService { return &OrderService{ userRepo: userRepo, productRepo: productRepo, } } // PlaceOrder places a new order with business validation func (s *OrderService) PlaceOrder(ctx context.Context, userID, productID int64, quantity int) (*models.Order, error) { // Business rule: Validate user exists user, err := s.userRepo.GetByID(ctx, userID) if err != nil { return nil, fmt.Errorf("user not found: %w", err) } // Business rule: Validate quantity if quantity <= 0 { return nil, fmt.Errorf("quantity must be positive") } // Business rule: Check stock availability // (In a real implementation, this would be in ProductRepository) // Business rule: Calculate total // (In a real implementation, this would fetch product price) order := &models.Order{ UserID: user.ID, ProductID: productID, Quantity: quantity, Status: "pending", } return order, nil } Layer 4: Presentation Layer (HTTP Handlers) package handlers import ( "encoding/json" "net/http" "strconv" "myapp/internal/services" ) // UserHandler handles HTTP requests for users type UserHandler struct { userService *services.UserService } // NewUserHandler creates a new user handler func NewUserHandler(userService *services.UserService) *UserHandler { return &UserHandler{ userService: userService, } } // CreateUserRequest represents the request to create a user type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` } // CreateUser handles POST /users func (h *UserHandler) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request body") return } user, err := h.userService.CreateUser(r.Context(), req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusCreated, user) } // GetUser handles GET /users/:id func (h *UserHandler) GetUser(w http.ResponseWriter, r *http.Request) { // Extract ID from URL (implementation depends on router) id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } user, err := h.userService.GetUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, user) } // UpdateUser handles PUT /users/:id func (h *UserHandler) UpdateUser(w http.ResponseWriter, r *http.Request) { id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request body") return } user, err := h.userService.UpdateUser(r.Context(), id, req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusOK, user) } // DeleteUser handles DELETE /users/:id func (h *UserHandler) DeleteUser(w http.ResponseWriter, r *http.Request) { id, err := extractIDFromPath(r) if err != nil { respondError(w, http.StatusBadRequest, "invalid user ID") return } if err := h.userService.DeleteUser(r.Context(), id); err != nil { respondError(w, http.StatusNotFound, err.Error()) return } w.WriteHeader(http.StatusNoContent) } // ListUsers handles GET /users func (h *UserHandler) ListUsers(w http.ResponseWriter, r *http.Request) { page, _ := strconv.Atoi(r.URL.Query().Get("page")) pageSize, _ := strconv.Atoi(r.URL.Query().Get("page_size")) users, err := h.userService.ListUsers(r.Context(), page, pageSize) if err != nil { respondError(w, http.StatusInternalServerError, "failed to list users") return } respondJSON(w, http.StatusOK, users) } // Helper functions func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func respondError(w http.ResponseWriter, status int, message string) { respondJSON(w, status, map[string]string{"error": message}) } func extractIDFromPath(r *http.Request) (int64, error) { // This is a simplified version - use your router's method // For example with chi: chi.URLParam(r, "id") return 1, nil } Main Application package main import ( "database/sql" "log" "net/http" _ "github.com/lib/pq" "myapp/internal/handlers" "myapp/internal/repositories" "myapp/internal/services" ) func main() { // Initialize database connection db, err := sql.Open("postgres", "postgres://user:pass@localhost/dbname?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize layers from bottom to top // Data Access Layer userRepo := repositories.NewUserRepository(db) productRepo := repositories.NewProductRepository(db) // Business Logic Layer userService := services.NewUserService(userRepo) orderService := services.NewOrderService(userRepo, productRepo) // Presentation Layer userHandler := handlers.NewUserHandler(userService) // Setup routes mux := http.NewServeMux() mux.HandleFunc("POST /users", userHandler.CreateUser) mux.HandleFunc("GET /users/{id}", userHandler.GetUser) mux.HandleFunc("PUT /users/{id}", userHandler.UpdateUser) mux.HandleFunc("DELETE /users/{id}", userHandler.DeleteUser) mux.HandleFunc("GET /users", userHandler.ListUsers) // Start server log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatal(err) } } Testing Layered Architecture package services_test import ( "context" "testing" "myapp/internal/models" "myapp/internal/services" ) // MockUserRepository for testing type MockUserRepository struct { users map[int64]*models.User nextID int64 } func NewMockUserRepository() *MockUserRepository { return &MockUserRepository{ users: make(map[int64]*models.User), nextID: 1, } } func (m *MockUserRepository) Create(ctx context.Context, user *models.User) error { user.ID = m.nextID m.nextID++ m.users[user.ID] = user return nil } func (m *MockUserRepository) GetByID(ctx context.Context, id int64) (*models.User, error) { user, exists := m.users[id] if !exists { return nil, fmt.Errorf("user not found") } return user, nil } func (m *MockUserRepository) GetByEmail(ctx context.Context, email string) (*models.User, error) { for _, user := range m.users { if user.Email == email { return user, nil } } return nil, fmt.Errorf("user not found") } func TestUserService_CreateUser(t *testing.T) { mockRepo := NewMockUserRepository() service := services.NewUserService(mockRepo) tests := []struct { name string email string userName string wantErr bool }{ { name: "valid user", email: "[email protected]", userName: "Test User", wantErr: false, }, { name: "invalid email", email: "invalid-email", userName: "Test User", wantErr: true, }, { name: "empty name", email: "[email protected]", userName: "", wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { user, err := service.CreateUser(context.Background(), tt.email, tt.userName) if tt.wantErr { if err == nil { t.Errorf("expected error but got none") } return } if err != nil { t.Errorf("unexpected error: %v", err) return } if user.Email != tt.email { t.Errorf("expected email %s, got %s", tt.email, user.Email) } }) } } Best Practices Clear Layer Boundaries: Define clear interfaces between layers Dependency Direction: Always depend downward, never upward Keep Layers Thin: Avoid bloated service layers with too much logic Use Dependency Injection: Inject dependencies rather than creating them Interface Segregation: Define minimal interfaces for layer communication Error Handling: Handle errors appropriately at each layer Testing: Test each layer independently with mocks Common Pitfalls Layer Violation: Skipping layers (e.g., handler directly accessing repository) Anemic Domain Models: Models with no behavior, only data Fat Service Layer: Putting all logic in the service layer Tight Coupling: Layers knowing too much about each other God Objects: Services or repositories handling too many responsibilities Inconsistent Abstractions: Different patterns in different layers When to Use Layered Architecture Use When: ...

    January 14, 2025 · 13 min · Rafiul Alam

    Clean Architecture in Go: Building Independent and Testable Systems

    Go Architecture Patterns Series: ← Layered Architecture | Series Overview | Next: Hexagonal Architecture → What is Clean Architecture? Clean Architecture, popularized by Robert C. Martin (Uncle Bob), is an architectural pattern that emphasizes separation of concerns and independence from frameworks, databases, and external agencies. It organizes code in concentric circles with dependencies pointing inward. Key Principles: Independence of Frameworks: Architecture doesn’t depend on frameworks Testability: Business rules can be tested without UI, database, or external elements Independence of UI: UI can change without changing business rules Independence of Database: Business rules don’t know about the database Independence of External Agency: Business rules don’t know about the outside world Dependency Rule: Dependencies only point inward toward higher-level policies Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph TD subgraph "External Layer (Frameworks & Drivers)" A1[Web Framework] A2[Database] A3[External APIs] end subgraph "Interface Adapters" B1[Controllers] B2[Presenters] B3[Gateways] end subgraph "Use Cases (Application Business Rules)" C1[Interactors] C2[Use Case Logic] end subgraph "Entities (Enterprise Business Rules)" D1[Domain Models] D2[Business Logic] end A1 --> B1 A2 --> B3 B1 --> C1 B3 --> C1 C1 --> D1 style D1 fill:#4a4420,color:#fff style C1 fill:#1e3a5f,color:#fff style B1 fill:#1e4620,color:#fff style A1 fill:#3a1e4a,color:#fff Clean Architecture Circles %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph LR subgraph "Layer 1: Entities" E[Core BusinessRules & Models] end subgraph "Layer 2: Use Cases" U[ApplicationBusiness Rules] end subgraph "Layer 3: Interface Adapters" I[ControllersPresentersGateways] end subgraph "Layer 4: Frameworks & Drivers" F[WebDBUI] end F -.->|depends on| I I -.->|depends on| U U -.->|depends on| E style E fill:#4a4420,color:#fff style U fill:#1e3a5f,color:#fff style I fill:#1e4620,color:#fff style F fill:#3a1e4a,color:#fff Dependency Flow %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph TB subgraph "Outer Layers (Low-level Details)" direction LR DB[Database] Web[Web Server] API[External APIs] end subgraph "Interface Adapters" direction LR Repo[RepositoryImplementation] Controller[HTTP Controller] Gateway[API Gateway] end subgraph "Use Cases" direction LR UC1[Create UserUse Case] UC2[Get UserUse Case] end subgraph "Entities (Core)" direction LR Entity[User Entity] Rules[Business Rules] end DB --> Repo Web --> Controller API --> Gateway Repo -.->|implements| UC1 Controller -.->|calls| UC1 Gateway -.->|implements| UC2 UC1 -.->|uses| Entity UC2 -.->|uses| Entity style Entity fill:#4a4420,stroke:#fb923c,stroke-width:3px,color:#fff style UC1 fill:#1e3a5f,color:#fff style Repo fill:#1e4620,color:#fff style DB fill:#3a1e4a,color:#fff Real-World Use Cases Enterprise Applications: Complex business logic that needs isolation Long-lived Systems: Applications that need to evolve over time Multi-platform Applications: Same core logic, different interfaces Testing-Critical Systems: Financial, healthcare, or mission-critical apps API-first Applications: Where business logic is reused across interfaces Microservices: Each service following clean architecture principles Project Structure ├── cmd/ │ └── api/ │ └── main.go ├── internal/ │ ├── entity/ # Layer 1: Entities │ │ ├── user.go │ │ └── errors.go │ ├── usecase/ # Layer 2: Use Cases │ │ ├── user_usecase.go │ │ ├── interfaces.go # Repository & Presenter interfaces │ │ └── user_interactor.go │ ├── adapter/ # Layer 3: Interface Adapters │ │ ├── repository/ │ │ │ └── user_repository.go │ │ ├── presenter/ │ │ │ └── user_presenter.go │ │ └── controller/ │ │ └── user_controller.go │ └── infrastructure/ # Layer 4: Frameworks & Drivers │ ├── database/ │ │ └── postgres.go │ ├── router/ │ │ └── router.go │ └── config/ │ └── config.go └── go.mod Layer 1: Entities (Core Business Rules) package entity import ( "errors" "regexp" "time" ) // User represents the core user entity with business rules type User struct { ID string Email string Name string Age int Status UserStatus CreatedAt time.Time UpdatedAt time.Time } // UserStatus represents user account status type UserStatus string const ( UserStatusActive UserStatus = "active" UserStatusInactive UserStatus = "inactive" UserStatusSuspended UserStatus = "suspended" ) // Business rule validation errors var ( ErrInvalidEmail = errors.New("invalid email format") ErrInvalidName = errors.New("name must not be empty") ErrInvalidAge = errors.New("age must be between 0 and 150") ErrUserNotFound = errors.New("user not found") ErrUserAlreadyExists = errors.New("user already exists") ErrUnauthorized = errors.New("unauthorized action") ) // NewUser creates a new user with business rule validation func NewUser(email, name string, age int) (*User, error) { user := &User{ Email: email, Name: name, Age: age, Status: UserStatusActive, CreatedAt: time.Now(), UpdatedAt: time.Now(), } if err := user.Validate(); err != nil { return nil, err } return user, nil } // Validate validates the user entity according to business rules func (u *User) Validate() error { // Business Rule: Email must be valid format if !isValidEmail(u.Email) { return ErrInvalidEmail } // Business Rule: Name must not be empty if u.Name == "" { return ErrInvalidName } // Business Rule: Age must be realistic if u.Age < 0 || u.Age > 150 { return ErrInvalidAge } return nil } // UpdateEmail updates user email with validation func (u *User) UpdateEmail(email string) error { if !isValidEmail(email) { return ErrInvalidEmail } u.Email = email u.UpdatedAt = time.Now() return nil } // UpdateName updates user name with validation func (u *User) UpdateName(name string) error { if name == "" { return ErrInvalidName } u.Name = name u.UpdatedAt = time.Now() return nil } // Activate activates the user account func (u *User) Activate() { u.Status = UserStatusActive u.UpdatedAt = time.Now() } // Deactivate deactivates the user account func (u *User) Deactivate() { u.Status = UserStatusInactive u.UpdatedAt = time.Now() } // Suspend suspends the user account func (u *User) Suspend() { u.Status = UserStatusSuspended u.UpdatedAt = time.Now() } // IsActive checks if user is active func (u *User) IsActive() bool { return u.Status == UserStatusActive } // CanPerformAction checks if user can perform actions (business rule) func (u *User) CanPerformAction() bool { return u.Status == UserStatusActive } // isValidEmail validates email format func isValidEmail(email string) bool { emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`) return emailRegex.MatchString(email) } // Product entity with business rules type Product struct { ID string Name string Description string Price Money Stock int CreatedAt time.Time UpdatedAt time.Time } // Money represents monetary value (value object) type Money struct { Amount int64 // in cents Currency string } // NewMoney creates a new money value object func NewMoney(amount int64, currency string) Money { return Money{ Amount: amount, Currency: currency, } } // Add adds two money values func (m Money) Add(other Money) (Money, error) { if m.Currency != other.Currency { return Money{}, errors.New("currency mismatch") } return Money{ Amount: m.Amount + other.Amount, Currency: m.Currency, }, nil } // Multiply multiplies money by a factor func (m Money) Multiply(factor int) Money { return Money{ Amount: m.Amount * int64(factor), Currency: m.Currency, } } Layer 2: Use Cases (Application Business Rules) Use Case Interfaces package usecase import ( "context" "myapp/internal/entity" ) // UserRepository defines the interface for user data access // This interface is defined in the use case layer but implemented in outer layers type UserRepository interface { Create(ctx context.Context, user *entity.User) error GetByID(ctx context.Context, id string) (*entity.User, error) GetByEmail(ctx context.Context, email string) (*entity.User, error) Update(ctx context.Context, user *entity.User) error Delete(ctx context.Context, id string) error List(ctx context.Context, offset, limit int) ([]*entity.User, error) } // UserPresenter defines the interface for presenting user data type UserPresenter interface { PresentUser(user *entity.User) interface{} PresentUsers(users []*entity.User) interface{} PresentError(err error) interface{} } // EmailService defines the interface for email operations type EmailService interface { SendWelcomeEmail(ctx context.Context, user *entity.User) error SendPasswordResetEmail(ctx context.Context, user *entity.User, token string) error } // IDGenerator defines the interface for generating IDs type IDGenerator interface { Generate() string } Use Case Implementation package usecase import ( "context" "fmt" "myapp/internal/entity" ) // CreateUserInput represents input for creating a user type CreateUserInput struct { Email string Name string Age int } // UpdateUserInput represents input for updating a user type UpdateUserInput struct { ID string Email string Name string Age int } // UserInteractor implements user use cases type UserInteractor struct { repo UserRepository emailService EmailService idGen IDGenerator } // NewUserInteractor creates a new user interactor func NewUserInteractor( repo UserRepository, emailService EmailService, idGen IDGenerator, ) *UserInteractor { return &UserInteractor{ repo: repo, emailService: emailService, idGen: idGen, } } // CreateUser creates a new user (use case) func (i *UserInteractor) CreateUser(ctx context.Context, input CreateUserInput) (*entity.User, error) { // Use case logic: Check if user already exists existingUser, err := i.repo.GetByEmail(ctx, input.Email) if err == nil && existingUser != nil { return nil, entity.ErrUserAlreadyExists } // Use case logic: Create new user entity user, err := entity.NewUser(input.Email, input.Name, input.Age) if err != nil { return nil, fmt.Errorf("failed to create user entity: %w", err) } // Use case logic: Generate ID user.ID = i.idGen.Generate() // Use case logic: Save user if err := i.repo.Create(ctx, user); err != nil { return nil, fmt.Errorf("failed to save user: %w", err) } // Use case logic: Send welcome email (async in real system) if err := i.emailService.SendWelcomeEmail(ctx, user); err != nil { // Log error but don't fail the use case fmt.Printf("failed to send welcome email: %v\n", err) } return user, nil } // GetUser retrieves a user by ID (use case) func (i *UserInteractor) GetUser(ctx context.Context, id string) (*entity.User, error) { user, err := i.repo.GetByID(ctx, id) if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } if user == nil { return nil, entity.ErrUserNotFound } return user, nil } // UpdateUser updates a user (use case) func (i *UserInteractor) UpdateUser(ctx context.Context, input UpdateUserInput) (*entity.User, error) { // Use case logic: Get existing user user, err := i.repo.GetByID(ctx, input.ID) if err != nil { return nil, entity.ErrUserNotFound } // Use case logic: Check if email is changing and if it already exists if user.Email != input.Email { existingUser, err := i.repo.GetByEmail(ctx, input.Email) if err == nil && existingUser != nil { return nil, entity.ErrUserAlreadyExists } } // Use case logic: Update user fields if err := user.UpdateEmail(input.Email); err != nil { return nil, err } if err := user.UpdateName(input.Name); err != nil { return nil, err } user.Age = input.Age if err := user.Validate(); err != nil { return nil, err } // Use case logic: Save updated user if err := i.repo.Update(ctx, user); err != nil { return nil, fmt.Errorf("failed to update user: %w", err) } return user, nil } // DeleteUser deletes a user (use case) func (i *UserInteractor) DeleteUser(ctx context.Context, id string) error { // Use case logic: Verify user exists user, err := i.repo.GetByID(ctx, id) if err != nil { return entity.ErrUserNotFound } // Use case logic: Check if user can be deleted (business rule) if !user.CanPerformAction() { return entity.ErrUnauthorized } // Use case logic: Delete user if err := i.repo.Delete(ctx, id); err != nil { return fmt.Errorf("failed to delete user: %w", err) } return nil } // ListUsers lists users with pagination (use case) func (i *UserInteractor) ListUsers(ctx context.Context, page, pageSize int) ([]*entity.User, error) { // Use case logic: Validate pagination if page < 1 { page = 1 } if pageSize < 1 || pageSize > 100 { pageSize = 20 } offset := (page - 1) * pageSize // Use case logic: Get users users, err := i.repo.List(ctx, offset, pageSize) if err != nil { return nil, fmt.Errorf("failed to list users: %w", err) } return users, nil } // ActivateUser activates a user account (use case) func (i *UserInteractor) ActivateUser(ctx context.Context, id string) error { user, err := i.repo.GetByID(ctx, id) if err != nil { return entity.ErrUserNotFound } user.Activate() if err := i.repo.Update(ctx, user); err != nil { return fmt.Errorf("failed to activate user: %w", err) } return nil } Layer 3: Interface Adapters Repository Implementation package repository import ( "context" "database/sql" "fmt" "time" "myapp/internal/entity" ) // PostgresUserRepository implements UserRepository interface type PostgresUserRepository struct { db *sql.DB } // NewPostgresUserRepository creates a new Postgres user repository func NewPostgresUserRepository(db *sql.DB) *PostgresUserRepository { return &PostgresUserRepository{db: db} } // Create creates a new user in the database func (r *PostgresUserRepository) Create(ctx context.Context, user *entity.User) error { query := ` INSERT INTO users (id, email, name, age, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ` _, err := r.db.ExecContext( ctx, query, user.ID, user.Email, user.Name, user.Age, user.Status, user.CreatedAt, user.UpdatedAt, ) if err != nil { return fmt.Errorf("failed to create user: %w", err) } return nil } // GetByID retrieves a user by ID func (r *PostgresUserRepository) GetByID(ctx context.Context, id string) (*entity.User, error) { query := ` SELECT id, email, name, age, status, created_at, updated_at FROM users WHERE id = $1 ` var user entity.User var status string err := r.db.QueryRowContext(ctx, query, id).Scan( &user.ID, &user.Email, &user.Name, &user.Age, &status, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, entity.ErrUserNotFound } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } user.Status = entity.UserStatus(status) return &user, nil } // GetByEmail retrieves a user by email func (r *PostgresUserRepository) GetByEmail(ctx context.Context, email string) (*entity.User, error) { query := ` SELECT id, email, name, age, status, created_at, updated_at FROM users WHERE email = $1 ` var user entity.User var status string err := r.db.QueryRowContext(ctx, query, email).Scan( &user.ID, &user.Email, &user.Name, &user.Age, &status, &user.CreatedAt, &user.UpdatedAt, ) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } user.Status = entity.UserStatus(status) return &user, nil } // Update updates a user func (r *PostgresUserRepository) Update(ctx context.Context, user *entity.User) error { query := ` UPDATE users SET email = $1, name = $2, age = $3, status = $4, updated_at = $5 WHERE id = $6 ` result, err := r.db.ExecContext( ctx, query, user.Email, user.Name, user.Age, user.Status, time.Now(), user.ID, ) if err != nil { return fmt.Errorf("failed to update user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return entity.ErrUserNotFound } return nil } // Delete deletes a user func (r *PostgresUserRepository) Delete(ctx context.Context, id string) error { query := `DELETE FROM users WHERE id = $1` result, err := r.db.ExecContext(ctx, query, id) if err != nil { return fmt.Errorf("failed to delete user: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows: %w", err) } if rows == 0 { return entity.ErrUserNotFound } return nil } // List retrieves users with pagination func (r *PostgresUserRepository) List(ctx context.Context, offset, limit int) ([]*entity.User, error) { query := ` SELECT id, email, name, age, status, created_at, updated_at FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2 ` rows, err := r.db.QueryContext(ctx, query, limit, offset) if err != nil { return nil, fmt.Errorf("failed to list users: %w", err) } defer rows.Close() var users []*entity.User for rows.Next() { var user entity.User var status string err := rows.Scan( &user.ID, &user.Email, &user.Name, &user.Age, &status, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to scan user: %w", err) } user.Status = entity.UserStatus(status) users = append(users, &user) } return users, nil } Controller Implementation package controller import ( "encoding/json" "net/http" "strconv" "myapp/internal/entity" "myapp/internal/usecase" ) // UserController handles HTTP requests for users type UserController struct { interactor *usecase.UserInteractor presenter usecase.UserPresenter } // NewUserController creates a new user controller func NewUserController( interactor *usecase.UserInteractor, presenter usecase.UserPresenter, ) *UserController { return &UserController{ interactor: interactor, presenter: presenter, } } // CreateUserRequest represents the HTTP request for creating a user type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` Age int `json:"age"` } // CreateUser handles POST /users func (c *UserController) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { c.respondError(w, http.StatusBadRequest, err) return } input := usecase.CreateUserInput{ Email: req.Email, Name: req.Name, Age: req.Age, } user, err := c.interactor.CreateUser(r.Context(), input) if err != nil { switch err { case entity.ErrInvalidEmail, entity.ErrInvalidName, entity.ErrInvalidAge: c.respondError(w, http.StatusBadRequest, err) case entity.ErrUserAlreadyExists: c.respondError(w, http.StatusConflict, err) default: c.respondError(w, http.StatusInternalServerError, err) } return } c.respond(w, http.StatusCreated, c.presenter.PresentUser(user)) } // GetUser handles GET /users/:id func (c *UserController) GetUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") user, err := c.interactor.GetUser(r.Context(), id) if err != nil { if err == entity.ErrUserNotFound { c.respondError(w, http.StatusNotFound, err) } else { c.respondError(w, http.StatusInternalServerError, err) } return } c.respond(w, http.StatusOK, c.presenter.PresentUser(user)) } // UpdateUser handles PUT /users/:id func (c *UserController) UpdateUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { c.respondError(w, http.StatusBadRequest, err) return } input := usecase.UpdateUserInput{ ID: id, Email: req.Email, Name: req.Name, Age: req.Age, } user, err := c.interactor.UpdateUser(r.Context(), input) if err != nil { switch err { case entity.ErrUserNotFound: c.respondError(w, http.StatusNotFound, err) case entity.ErrInvalidEmail, entity.ErrInvalidName, entity.ErrInvalidAge: c.respondError(w, http.StatusBadRequest, err) case entity.ErrUserAlreadyExists: c.respondError(w, http.StatusConflict, err) default: c.respondError(w, http.StatusInternalServerError, err) } return } c.respond(w, http.StatusOK, c.presenter.PresentUser(user)) } // DeleteUser handles DELETE /users/:id func (c *UserController) DeleteUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") if err := c.interactor.DeleteUser(r.Context(), id); err != nil { if err == entity.ErrUserNotFound { c.respondError(w, http.StatusNotFound, err) } else { c.respondError(w, http.StatusInternalServerError, err) } return } w.WriteHeader(http.StatusNoContent) } // ListUsers handles GET /users func (c *UserController) ListUsers(w http.ResponseWriter, r *http.Request) { page, _ := strconv.Atoi(r.URL.Query().Get("page")) pageSize, _ := strconv.Atoi(r.URL.Query().Get("page_size")) users, err := c.interactor.ListUsers(r.Context(), page, pageSize) if err != nil { c.respondError(w, http.StatusInternalServerError, err) return } c.respond(w, http.StatusOK, c.presenter.PresentUsers(users)) } // Helper methods func (c *UserController) respond(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func (c *UserController) respondError(w http.ResponseWriter, status int, err error) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(c.presenter.PresentError(err)) } Presenter Implementation package presenter import ( "myapp/internal/entity" ) // UserJSONPresenter presents users as JSON type UserJSONPresenter struct{} // NewUserJSONPresenter creates a new user JSON presenter func NewUserJSONPresenter() *UserJSONPresenter { return &UserJSONPresenter{} } // UserResponse represents the JSON response for a user type UserResponse struct { ID string `json:"id"` Email string `json:"email"` Name string `json:"name"` Age int `json:"age"` Status string `json:"status"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } // PresentUser presents a single user func (p *UserJSONPresenter) PresentUser(user *entity.User) interface{} { return UserResponse{ ID: user.ID, Email: user.Email, Name: user.Name, Age: user.Age, Status: string(user.Status), CreatedAt: user.CreatedAt.Format("2006-01-02T15:04:05Z"), UpdatedAt: user.UpdatedAt.Format("2006-01-02T15:04:05Z"), } } // PresentUsers presents multiple users func (p *UserJSONPresenter) PresentUsers(users []*entity.User) interface{} { responses := make([]UserResponse, len(users)) for i, user := range users { responses[i] = UserResponse{ ID: user.ID, Email: user.Email, Name: user.Name, Age: user.Age, Status: string(user.Status), CreatedAt: user.CreatedAt.Format("2006-01-02T15:04:05Z"), UpdatedAt: user.UpdatedAt.Format("2006-01-02T15:04:05Z"), } } return map[string]interface{}{ "users": responses, "count": len(responses), } } // ErrorResponse represents an error response type ErrorResponse struct { Error string `json:"error"` Code string `json:"code,omitempty"` } // PresentError presents an error func (p *UserJSONPresenter) PresentError(err error) interface{} { return ErrorResponse{ Error: err.Error(), } } Layer 4: Main Application (Dependency Injection) package main import ( "database/sql" "log" "net/http" _ "github.com/lib/pq" "github.com/google/uuid" "myapp/internal/adapter/controller" "myapp/internal/adapter/presenter" "myapp/internal/adapter/repository" "myapp/internal/usecase" ) // UUIDGenerator implements IDGenerator type UUIDGenerator struct{} func (g *UUIDGenerator) Generate() string { return uuid.New().String() } // MockEmailService implements EmailService type MockEmailService struct{} func (s *MockEmailService) SendWelcomeEmail(ctx context.Context, user *entity.User) error { log.Printf("Sending welcome email to %s", user.Email) return nil } func (s *MockEmailService) SendPasswordResetEmail(ctx context.Context, user *entity.User, token string) error { log.Printf("Sending password reset email to %s with token %s", user.Email, token) return nil } func main() { // Initialize database db, err := sql.Open("postgres", "postgres://user:pass@localhost/cleanarch?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Layer 4: Initialize adapters (implementations) userRepo := repository.NewPostgresUserRepository(db) emailService := &MockEmailService{} idGen := &UUIDGenerator{} userPresenter := presenter.NewUserJSONPresenter() // Layer 2: Initialize use cases (with dependency injection) userInteractor := usecase.NewUserInteractor(userRepo, emailService, idGen) // Layer 3: Initialize controllers userController := controller.NewUserController(userInteractor, userPresenter) // Setup routes mux := http.NewServeMux() mux.HandleFunc("POST /users", userController.CreateUser) mux.HandleFunc("GET /users/{id}", userController.GetUser) mux.HandleFunc("PUT /users/{id}", userController.UpdateUser) mux.HandleFunc("DELETE /users/{id}", userController.DeleteUser) mux.HandleFunc("GET /users", userController.ListUsers) // Start server log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatal(err) } } Testing in Clean Architecture package usecase_test import ( "context" "testing" "myapp/internal/entity" "myapp/internal/usecase" ) // Mock implementations type MockUserRepository struct { users map[string]*entity.User } func NewMockUserRepository() *MockUserRepository { return &MockUserRepository{ users: make(map[string]*entity.User), } } func (m *MockUserRepository) Create(ctx context.Context, user *entity.User) error { m.users[user.ID] = user return nil } func (m *MockUserRepository) GetByID(ctx context.Context, id string) (*entity.User, error) { user, exists := m.users[id] if !exists { return nil, entity.ErrUserNotFound } return user, nil } func (m *MockUserRepository) GetByEmail(ctx context.Context, email string) (*entity.User, error) { for _, user := range m.users { if user.Email == email { return user, nil } } return nil, nil } type MockEmailService struct { sentEmails []string } func (m *MockEmailService) SendWelcomeEmail(ctx context.Context, user *entity.User) error { m.sentEmails = append(m.sentEmails, user.Email) return nil } type MockIDGenerator struct { nextID int } func (m *MockIDGenerator) Generate() string { m.nextID++ return fmt.Sprintf("user-%d", m.nextID) } func TestCreateUser(t *testing.T) { // Arrange repo := NewMockUserRepository() emailService := &MockEmailService{} idGen := &MockIDGenerator{} interactor := usecase.NewUserInteractor(repo, emailService, idGen) input := usecase.CreateUserInput{ Email: "[email protected]", Name: "Test User", Age: 25, } // Act user, err := interactor.CreateUser(context.Background(), input) // Assert if err != nil { t.Fatalf("expected no error, got %v", err) } if user.Email != input.Email { t.Errorf("expected email %s, got %s", input.Email, user.Email) } if len(emailService.sentEmails) != 1 { t.Errorf("expected 1 welcome email, got %d", len(emailService.sentEmails)) } } Best Practices Dependency Rule: Always point dependencies inward Interface Segregation: Define minimal interfaces in use case layer Dependency Injection: Inject all dependencies explicitly Entity Purity: Keep entities free from framework dependencies Use Case Focus: Each use case should have a single responsibility Test Independence: Test each layer independently Avoid Anemic Models: Put business logic in entities Common Pitfalls Breaking Dependency Rule: Outer layers should not be imported by inner layers Leaking Infrastructure: Database or framework details leaking into entities Fat Use Cases: Use cases doing too much or too little Ignoring Presenters: Directly returning entities from controllers Over-engineering: Applying Clean Architecture to simple CRUD apps Missing Boundaries: Not clearly defining layer boundaries When to Use Clean Architecture Use When: ...

    January 13, 2025 · 18 min · Rafiul Alam

    Hexagonal Architecture in Go: Ports and Adapters Pattern

    Go Architecture Patterns Series: ← Clean Architecture | Series Overview | Next: Domain-Driven Design → What is Hexagonal Architecture? Hexagonal Architecture, also known as Ports and Adapters pattern, was introduced by Alistair Cockburn. It emphasizes separating the core business logic from external concerns by defining clear boundaries through ports (interfaces) and adapters (implementations). Key Principles: Core Domain Isolation: Business logic is completely isolated from external dependencies Ports: Interfaces that define how the application communicates with the outside world Adapters: Concrete implementations of ports for specific technologies Symmetry: No distinction between “front-end” and “back-end” - all external systems are treated equally Testability: Core can be tested in isolation without any external dependencies Pluggability: Adapters can be swapped without changing the core Architecture Overview %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph TB subgraph "Primary Adapters (Driving)" HTTP[HTTP REST API] CLI[CLI Interface] GRPC[gRPC Server] end subgraph "Hexagon (Core Domain)" PORT1[Primary PortsDriving Interfaces] CORE[Business LogicDomain ModelsUse Cases] PORT2[Secondary PortsDriven Interfaces] end subgraph "Secondary Adapters (Driven)" DB[PostgreSQL Adapter] CACHE[Redis Adapter] MSG[Message Queue Adapter] EXT[External API Adapter] end HTTP --> PORT1 CLI --> PORT1 GRPC --> PORT1 PORT1 --> CORE CORE --> PORT2 PORT2 --> DB PORT2 --> CACHE PORT2 --> MSG PORT2 --> EXT style CORE fill:#4a4420,color:#fff style PORT1 fill:#1e3a5f,color:#fff style PORT2 fill:#1e3a5f,color:#fff style HTTP fill:#1e4620,color:#fff style DB fill:#3a1e4a,color:#fff Ports and Adapters Visualization %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% graph LR subgraph "Primary Side (Driving)" REST[REST Adapter] GraphQL[GraphQL Adapter] end subgraph "Core Application" IP[Input PortInterface] BL[Business Logic] OP[Output PortInterface] end subgraph "Secondary Side (Driven)" DBAdapter[Database Adapter] EmailAdapter[Email Adapter] end REST -.->|implements| IP GraphQL -.->|implements| IP IP --> BL BL --> OP OP -.->|implemented by| DBAdapter OP -.->|implemented by| EmailAdapter style BL fill:#4a4420,color:#fff style IP fill:#1e3a5f,color:#fff style OP fill:#1e3a5f,color:#fff style REST fill:#1e4620,color:#fff style DBAdapter fill:#3a1e4a,color:#fff Complete Hexagonal Flow %%{init: {'theme':'dark', 'themeVariables': {'primaryTextColor':'#fff','secondaryTextColor':'#fff','tertiaryTextColor':'#fff','textColor':'#fff','nodeTextColor':'#fff','edgeLabelText':'#fff','clusterTextColor':'#fff','actorTextColor':'#fff'}}}%% sequenceDiagram participant Client participant HTTPAdapter participant InputPort participant Core participant OutputPort participant DBAdapter participant Database Client->>HTTPAdapter: HTTP Request HTTPAdapter->>InputPort: Call Method InputPort->>Core: Execute Business Logic Core->>OutputPort: Request Data OutputPort->>DBAdapter: Interface Call DBAdapter->>Database: SQL Query Database-->>DBAdapter: Data DBAdapter-->>OutputPort: Domain Object OutputPort-->>Core: Result Core-->>InputPort: Response InputPort-->>HTTPAdapter: DTO HTTPAdapter-->>Client: HTTP Response Real-World Use Cases API Gateways: Multiple protocols (REST, gRPC, GraphQL) with same core logic Multi-tenant Applications: Different adapters for different tenants Legacy System Integration: Adapter for each legacy system Testing-Critical Systems: Easy mocking of all external dependencies Cloud-Native Applications: Easy switching between cloud providers Evolutionary Architecture: System that needs to adapt over time Project Structure ├── cmd/ │ ├── http/ │ │ └── main.go # HTTP server entry point │ └── cli/ │ └── main.go # CLI entry point ├── internal/ │ ├── core/ │ │ ├── domain/ # Domain entities and value objects │ │ │ ├── user.go │ │ │ └── order.go │ │ ├── port/ # Ports (interfaces) │ │ │ ├── input.go # Primary/Driving ports │ │ │ └── output.go # Secondary/Driven ports │ │ └── service/ # Business logic │ │ └── user_service.go │ └── adapter/ │ ├── input/ # Primary/Driving adapters │ │ ├── http/ │ │ │ └── user_handler.go │ │ └── grpc/ │ │ └── user_server.go │ └── output/ # Secondary/Driven adapters │ ├── persistence/ │ │ └── postgres_user_repository.go │ └── notification/ │ └── email_service.go └── go.mod Core Domain Layer Domain Entities package domain import ( "errors" "time" ) // User represents a user entity in the domain type User struct { ID string Email Email Name string Status UserStatus CreatedAt time.Time UpdatedAt time.Time } // Email is a value object representing an email address type Email struct { value string } // NewEmail creates a new email value object with validation func NewEmail(email string) (Email, error) { if !isValidEmail(email) { return Email{}, errors.New("invalid email format") } return Email{value: email}, nil } // String returns the email string value func (e Email) String() string { return e.value } // UserStatus represents the status of a user type UserStatus string const ( UserStatusActive UserStatus = "active" UserStatusInactive UserStatus = "inactive" UserStatusSuspended UserStatus = "suspended" ) // NewUser creates a new user with business rule validation func NewUser(email, name string) (*User, error) { emailVO, err := NewEmail(email) if err != nil { return nil, err } if name == "" { return nil, errors.New("name cannot be empty") } return &User{ Email: emailVO, Name: name, Status: UserStatusActive, CreatedAt: time.Now(), UpdatedAt: time.Now(), }, nil } // Activate activates the user func (u *User) Activate() { u.Status = UserStatusActive u.UpdatedAt = time.Now() } // Suspend suspends the user func (u *User) Suspend() { u.Status = UserStatusSuspended u.UpdatedAt = time.Now() } // IsActive returns true if user is active func (u *User) IsActive() bool { return u.Status == UserStatusActive } func isValidEmail(email string) bool { // Simplified validation return len(email) > 3 && contains(email, "@") } func contains(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false } // Order represents an order in the domain type Order struct { ID string UserID string Items []OrderItem Total Money Status OrderStatus CreatedAt time.Time } // OrderItem represents a line item in an order type OrderItem struct { ProductID string Quantity int Price Money } // OrderStatus represents the status of an order type OrderStatus string const ( OrderStatusPending OrderStatus = "pending" OrderStatusConfirmed OrderStatus = "confirmed" OrderStatusShipped OrderStatus = "shipped" OrderStatusDelivered OrderStatus = "delivered" OrderStatusCancelled OrderStatus = "cancelled" ) // Money represents monetary value type Money struct { Amount int64 Currency string } // NewMoney creates a new money value object func NewMoney(amount int64, currency string) Money { return Money{ Amount: amount, Currency: currency, } } // Add adds two money values func (m Money) Add(other Money) (Money, error) { if m.Currency != other.Currency { return Money{}, errors.New("cannot add different currencies") } return Money{ Amount: m.Amount + other.Amount, Currency: m.Currency, }, nil } Ports (Interfaces) Primary Ports (Driving/Input) package port import ( "context" "myapp/internal/core/domain" ) // UserService defines the input port for user operations // This is what drives the application (called by adapters) type UserService interface { CreateUser(ctx context.Context, email, name string) (*domain.User, error) GetUser(ctx context.Context, id string) (*domain.User, error) UpdateUser(ctx context.Context, id, email, name string) (*domain.User, error) DeleteUser(ctx context.Context, id string) error ListUsers(ctx context.Context, offset, limit int) ([]*domain.User, error) ActivateUser(ctx context.Context, id string) error SuspendUser(ctx context.Context, id string) error } // OrderService defines the input port for order operations type OrderService interface { CreateOrder(ctx context.Context, userID string, items []OrderItemInput) (*domain.Order, error) GetOrder(ctx context.Context, id string) (*domain.Order, error) CancelOrder(ctx context.Context, id string) error GetUserOrders(ctx context.Context, userID string) ([]*domain.Order, error) } // OrderItemInput represents input for creating an order item type OrderItemInput struct { ProductID string Quantity int } Secondary Ports (Driven/Output) package port import ( "context" "myapp/internal/core/domain" ) // UserRepository defines the output port for user persistence // This is driven by the application (implemented by adapters) type UserRepository interface { Save(ctx context.Context, user *domain.User) error FindByID(ctx context.Context, id string) (*domain.User, error) FindByEmail(ctx context.Context, email string) (*domain.User, error) Update(ctx context.Context, user *domain.User) error Delete(ctx context.Context, id string) error FindAll(ctx context.Context, offset, limit int) ([]*domain.User, error) } // OrderRepository defines the output port for order persistence type OrderRepository interface { Save(ctx context.Context, order *domain.Order) error FindByID(ctx context.Context, id string) (*domain.Order, error) FindByUserID(ctx context.Context, userID string) ([]*domain.Order, error) Update(ctx context.Context, order *domain.Order) error } // NotificationService defines the output port for notifications type NotificationService interface { SendWelcomeEmail(ctx context.Context, user *domain.User) error SendOrderConfirmation(ctx context.Context, order *domain.Order) error SendOrderCancellation(ctx context.Context, order *domain.Order) error } // IDGenerator defines the output port for ID generation type IDGenerator interface { GenerateID() string } // EventPublisher defines the output port for publishing domain events type EventPublisher interface { PublishUserCreated(ctx context.Context, user *domain.User) error PublishUserActivated(ctx context.Context, user *domain.User) error PublishOrderPlaced(ctx context.Context, order *domain.Order) error } Core Service (Business Logic) package service import ( "context" "errors" "fmt" "myapp/internal/core/domain" "myapp/internal/core/port" ) // userService implements the UserService port type userService struct { repo port.UserRepository notification port.NotificationService idGen port.IDGenerator eventPub port.EventPublisher } // NewUserService creates a new user service func NewUserService( repo port.UserRepository, notification port.NotificationService, idGen port.IDGenerator, eventPub port.EventPublisher, ) port.UserService { return &userService{ repo: repo, notification: notification, idGen: idGen, eventPub: eventPub, } } // CreateUser creates a new user func (s *userService) CreateUser(ctx context.Context, email, name string) (*domain.User, error) { // Check if user already exists existingUser, err := s.repo.FindByEmail(ctx, email) if err == nil && existingUser != nil { return nil, errors.New("user already exists") } // Create user entity with business rules user, err := domain.NewUser(email, name) if err != nil { return nil, fmt.Errorf("failed to create user: %w", err) } // Generate ID user.ID = s.idGen.GenerateID() // Save user if err := s.repo.Save(ctx, user); err != nil { return nil, fmt.Errorf("failed to save user: %w", err) } // Send notification (best effort, don't fail on error) if err := s.notification.SendWelcomeEmail(ctx, user); err != nil { // Log error but don't fail fmt.Printf("failed to send welcome email: %v\n", err) } // Publish event if err := s.eventPub.PublishUserCreated(ctx, user); err != nil { // Log error but don't fail fmt.Printf("failed to publish user created event: %v\n", err) } return user, nil } // GetUser retrieves a user by ID func (s *userService) GetUser(ctx context.Context, id string) (*domain.User, error) { user, err := s.repo.FindByID(ctx, id) if err != nil { return nil, fmt.Errorf("failed to get user: %w", err) } if user == nil { return nil, errors.New("user not found") } return user, nil } // UpdateUser updates a user func (s *userService) UpdateUser(ctx context.Context, id, email, name string) (*domain.User, error) { user, err := s.repo.FindByID(ctx, id) if err != nil { return nil, fmt.Errorf("user not found: %w", err) } // Update email if changed if email != "" && email != user.Email.String() { newEmail, err := domain.NewEmail(email) if err != nil { return nil, err } // Check if new email is already taken existingUser, _ := s.repo.FindByEmail(ctx, email) if existingUser != nil && existingUser.ID != id { return nil, errors.New("email already exists") } user.Email = newEmail } // Update name if changed if name != "" && name != user.Name { user.Name = name } // Save updated user if err := s.repo.Update(ctx, user); err != nil { return nil, fmt.Errorf("failed to update user: %w", err) } return user, nil } // DeleteUser deletes a user func (s *userService) DeleteUser(ctx context.Context, id string) error { user, err := s.repo.FindByID(ctx, id) if err != nil { return errors.New("user not found") } if !user.IsActive() { return errors.New("cannot delete inactive user") } return s.repo.Delete(ctx, id) } // ListUsers lists users with pagination func (s *userService) ListUsers(ctx context.Context, offset, limit int) ([]*domain.User, error) { if limit <= 0 || limit > 100 { limit = 20 } if offset < 0 { offset = 0 } return s.repo.FindAll(ctx, offset, limit) } // ActivateUser activates a user func (s *userService) ActivateUser(ctx context.Context, id string) error { user, err := s.repo.FindByID(ctx, id) if err != nil { return errors.New("user not found") } user.Activate() if err := s.repo.Update(ctx, user); err != nil { return fmt.Errorf("failed to activate user: %w", err) } // Publish event if err := s.eventPub.PublishUserActivated(ctx, user); err != nil { fmt.Printf("failed to publish user activated event: %v\n", err) } return nil } // SuspendUser suspends a user func (s *userService) SuspendUser(ctx context.Context, id string) error { user, err := s.repo.FindByID(ctx, id) if err != nil { return errors.New("user not found") } user.Suspend() return s.repo.Update(ctx, user) } Primary Adapters (Driving/Input) HTTP Adapter package http import ( "encoding/json" "net/http" "myapp/internal/core/port" ) // UserHandler is the HTTP adapter for user operations type UserHandler struct { service port.UserService } // NewUserHandler creates a new HTTP user handler func NewUserHandler(service port.UserService) *UserHandler { return &UserHandler{service: service} } // CreateUserRequest represents the HTTP request type CreateUserRequest struct { Email string `json:"email"` Name string `json:"name"` } // UserResponse represents the HTTP response type UserResponse struct { ID string `json:"id"` Email string `json:"email"` Name string `json:"name"` Status string `json:"status"` CreatedAt string `json:"created_at"` } // CreateUser handles POST /users func (h *UserHandler) CreateUser(w http.ResponseWriter, r *http.Request) { var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request") return } user, err := h.service.CreateUser(r.Context(), req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusCreated, toUserResponse(user)) } // GetUser handles GET /users/{id} func (h *UserHandler) GetUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") user, err := h.service.GetUser(r.Context(), id) if err != nil { respondError(w, http.StatusNotFound, "user not found") return } respondJSON(w, http.StatusOK, toUserResponse(user)) } // UpdateUser handles PUT /users/{id} func (h *UserHandler) UpdateUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") var req CreateUserRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { respondError(w, http.StatusBadRequest, "invalid request") return } user, err := h.service.UpdateUser(r.Context(), id, req.Email, req.Name) if err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } respondJSON(w, http.StatusOK, toUserResponse(user)) } // DeleteUser handles DELETE /users/{id} func (h *UserHandler) DeleteUser(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") if err := h.service.DeleteUser(r.Context(), id); err != nil { respondError(w, http.StatusBadRequest, err.Error()) return } w.WriteHeader(http.StatusNoContent) } // Helper functions func toUserResponse(user *domain.User) UserResponse { return UserResponse{ ID: user.ID, Email: user.Email.String(), Name: user.Name, Status: string(user.Status), CreatedAt: user.CreatedAt.Format("2006-01-02T15:04:05Z"), } } func respondJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } func respondError(w http.ResponseWriter, status int, message string) { respondJSON(w, status, map[string]string{"error": message}) } Secondary Adapters (Driven/Output) Database Adapter package persistence import ( "context" "database/sql" "errors" "time" "myapp/internal/core/domain" "myapp/internal/core/port" ) // postgresUserRepository implements the UserRepository port type postgresUserRepository struct { db *sql.DB } // NewPostgresUserRepository creates a new Postgres user repository func NewPostgresUserRepository(db *sql.DB) port.UserRepository { return &postgresUserRepository{db: db} } // Save saves a user func (r *postgresUserRepository) Save(ctx context.Context, user *domain.User) error { query := ` INSERT INTO users (id, email, name, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) ` _, err := r.db.ExecContext( ctx, query, user.ID, user.Email.String(), user.Name, user.Status, user.CreatedAt, user.UpdatedAt, ) return err } // FindByID finds a user by ID func (r *postgresUserRepository) FindByID(ctx context.Context, id string) (*domain.User, error) { query := ` SELECT id, email, name, status, created_at, updated_at FROM users WHERE id = $1 ` var ( userID string email string name string status string createdAt time.Time updatedAt time.Time ) err := r.db.QueryRowContext(ctx, query, id).Scan( &userID, &email, &name, &status, &createdAt, &updatedAt, ) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } emailVO, _ := domain.NewEmail(email) return &domain.User{ ID: userID, Email: emailVO, Name: name, Status: domain.UserStatus(status), CreatedAt: createdAt, UpdatedAt: updatedAt, }, nil } // FindByEmail finds a user by email func (r *postgresUserRepository) FindByEmail(ctx context.Context, email string) (*domain.User, error) { query := ` SELECT id, email, name, status, created_at, updated_at FROM users WHERE email = $1 ` var ( userID string emailStr string name string status string createdAt time.Time updatedAt time.Time ) err := r.db.QueryRowContext(ctx, query, email).Scan( &userID, &emailStr, &name, &status, &createdAt, &updatedAt, ) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } emailVO, _ := domain.NewEmail(emailStr) return &domain.User{ ID: userID, Email: emailVO, Name: name, Status: domain.UserStatus(status), CreatedAt: createdAt, UpdatedAt: updatedAt, }, nil } // Update updates a user func (r *postgresUserRepository) Update(ctx context.Context, user *domain.User) error { query := ` UPDATE users SET email = $1, name = $2, status = $3, updated_at = $4 WHERE id = $5 ` result, err := r.db.ExecContext( ctx, query, user.Email.String(), user.Name, user.Status, time.Now(), user.ID, ) if err != nil { return err } rows, _ := result.RowsAffected() if rows == 0 { return errors.New("user not found") } return nil } // Delete deletes a user func (r *postgresUserRepository) Delete(ctx context.Context, id string) error { query := `DELETE FROM users WHERE id = $1` result, err := r.db.ExecContext(ctx, query, id) if err != nil { return err } rows, _ := result.RowsAffected() if rows == 0 { return errors.New("user not found") } return nil } // FindAll finds all users with pagination func (r *postgresUserRepository) FindAll(ctx context.Context, offset, limit int) ([]*domain.User, error) { query := ` SELECT id, email, name, status, created_at, updated_at FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2 ` rows, err := r.db.QueryContext(ctx, query, limit, offset) if err != nil { return nil, err } defer rows.Close() var users []*domain.User for rows.Next() { var ( userID string email string name string status string createdAt time.Time updatedAt time.Time ) if err := rows.Scan(&userID, &email, &name, &status, &createdAt, &updatedAt); err != nil { return nil, err } emailVO, _ := domain.NewEmail(email) users = append(users, &domain.User{ ID: userID, Email: emailVO, Name: name, Status: domain.UserStatus(status), CreatedAt: createdAt, UpdatedAt: updatedAt, }) } return users, nil } Email Notification Adapter package notification import ( "context" "fmt" "log" "myapp/internal/core/domain" "myapp/internal/core/port" ) // emailService implements the NotificationService port type emailService struct { smtpHost string smtpPort int fromAddr string } // NewEmailService creates a new email notification service func NewEmailService(smtpHost string, smtpPort int, fromAddr string) port.NotificationService { return &emailService{ smtpHost: smtpHost, smtpPort: smtpPort, fromAddr: fromAddr, } } // SendWelcomeEmail sends a welcome email to a new user func (s *emailService) SendWelcomeEmail(ctx context.Context, user *domain.User) error { // In production, use actual email service log.Printf("Sending welcome email to %s", user.Email.String()) return s.sendEmail(ctx, user.Email.String(), "Welcome!", "Welcome to our service!") } // SendOrderConfirmation sends an order confirmation email func (s *emailService) SendOrderConfirmation(ctx context.Context, order *domain.Order) error { log.Printf("Sending order confirmation for order %s", order.ID) return nil } // SendOrderCancellation sends an order cancellation email func (s *emailService) SendOrderCancellation(ctx context.Context, order *domain.Order) error { log.Printf("Sending order cancellation for order %s", order.ID) return nil } func (s *emailService) sendEmail(ctx context.Context, to, subject, body string) error { // Simulate email sending fmt.Printf("Email sent to %s: %s\n", to, subject) return nil } Main Application with Dependency Injection package main import ( "database/sql" "log" "net/http" _ "github.com/lib/pq" "github.com/google/uuid" "myapp/internal/adapter/input/http" "myapp/internal/adapter/output/notification" "myapp/internal/adapter/output/persistence" "myapp/internal/core/port" "myapp/internal/core/service" ) // UUIDGenerator implements IDGenerator port type UUIDGenerator struct{} func (g *UUIDGenerator) GenerateID() string { return uuid.New().String() } // MockEventPublisher implements EventPublisher port type MockEventPublisher struct{} func (p *MockEventPublisher) PublishUserCreated(ctx context.Context, user *domain.User) error { log.Printf("Event: User created - %s", user.ID) return nil } func (p *MockEventPublisher) PublishUserActivated(ctx context.Context, user *domain.User) error { log.Printf("Event: User activated - %s", user.ID) return nil } func (p *MockEventPublisher) PublishOrderPlaced(ctx context.Context, order *domain.Order) error { log.Printf("Event: Order placed - %s", order.ID) return nil } func main() { // Initialize database db, err := sql.Open("postgres", "postgres://user:pass@localhost/hexarch?sslmode=disable") if err != nil { log.Fatal(err) } defer db.Close() // Initialize secondary adapters (driven/output) userRepo := persistence.NewPostgresUserRepository(db) emailService := notification.NewEmailService("smtp.example.com", 587, "[email protected]") idGen := &UUIDGenerator{} eventPub := &MockEventPublisher{} // Initialize core service userService := service.NewUserService(userRepo, emailService, idGen, eventPub) // Initialize primary adapters (driving/input) httpHandler := httpAdapter.NewUserHandler(userService) // Setup routes mux := http.NewServeMux() mux.HandleFunc("POST /users", httpHandler.CreateUser) mux.HandleFunc("GET /users/{id}", httpHandler.GetUser) mux.HandleFunc("PUT /users/{id}", httpHandler.UpdateUser) mux.HandleFunc("DELETE /users/{id}", httpHandler.DeleteUser) // Start server log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", mux); err != nil { log.Fatal(err) } } Testing with Mock Adapters package service_test import ( "context" "testing" "myapp/internal/core/domain" "myapp/internal/core/service" ) // Mock repository type mockUserRepository struct { users map[string]*domain.User } func newMockUserRepository() *mockUserRepository { return &mockUserRepository{ users: make(map[string]*domain.User), } } func (m *mockUserRepository) Save(ctx context.Context, user *domain.User) error { m.users[user.ID] = user return nil } func (m *mockUserRepository) FindByID(ctx context.Context, id string) (*domain.User, error) { return m.users[id], nil } func (m *mockUserRepository) FindByEmail(ctx context.Context, email string) (*domain.User, error) { for _, user := range m.users { if user.Email.String() == email { return user, nil } } return nil, nil } // Mock notification service type mockNotificationService struct { sentEmails []string } func (m *mockNotificationService) SendWelcomeEmail(ctx context.Context, user *domain.User) error { m.sentEmails = append(m.sentEmails, user.Email.String()) return nil } // Mock ID generator type mockIDGenerator struct { id int } func (m *mockIDGenerator) GenerateID() string { m.id++ return fmt.Sprintf("user-%d", m.id) } // Mock event publisher type mockEventPublisher struct{} func (m *mockEventPublisher) PublishUserCreated(ctx context.Context, user *domain.User) error { return nil } func TestCreateUser(t *testing.T) { // Arrange repo := newMockUserRepository() notif := &mockNotificationService{} idGen := &mockIDGenerator{} eventPub := &mockEventPublisher{} service := service.NewUserService(repo, notif, idGen, eventPub) // Act user, err := service.CreateUser(context.Background(), "[email protected]", "Test User") // Assert if err != nil { t.Fatalf("expected no error, got %v", err) } if user.Email.String() != "[email protected]" { t.Errorf("expected email [email protected], got %s", user.Email.String()) } if len(notif.sentEmails) != 1 { t.Errorf("expected 1 welcome email, got %d", len(notif.sentEmails)) } } Best Practices Port Definition: Define ports (interfaces) in the core domain Adapter Independence: Adapters should not know about each other Domain First: Design domain model before thinking about adapters Single Responsibility: Each adapter handles one external concern Configuration Injection: Inject configuration into adapters, not core Error Handling: Let domain define error types Testing: Use mock adapters for testing core logic Common Pitfalls Adapter Coupling: Adapters depending on each other directly Leaky Abstractions: Infrastructure details leaking into core Anemic Ports: Ports that are too thin or just data transfer Adapter in Core: Importing adapter packages in core Forgetting Symmetry: Treating primary and secondary adapters differently Over-abstraction: Creating too many small ports When to Use Hexagonal Architecture Use When: ...

    January 11, 2025 · 17 min · Rafiul Alam

    Worker Pool Pattern in Go

    Go Concurrency Patterns Series: ← Request/Response | Series Overview | Mutex Patterns → What is the Worker Pool Pattern? The Worker Pool pattern manages a fixed number of worker goroutines that process jobs from a shared queue. This pattern is essential for controlling resource usage, preventing system overload, and ensuring predictable performance under varying loads. Key Components: Job Queue: Channel containing work to be processed Worker Pool: Fixed number of worker goroutines Result Channel: Optional channel for collecting results Dispatcher: Coordinates job distribution to workers Real-World Use Cases Image Processing: Resize/compress images with limited CPU cores Database Operations: Limit concurrent database connections API Rate Limiting: Control outbound API call rates File Processing: Process files with bounded I/O operations Web Scraping: Limit concurrent HTTP requests Background Jobs: Process queued tasks with resource limits Basic Worker Pool Implementation package main import ( "fmt" "math/rand" "sync" "time" ) // Job represents work to be processed type Job struct { ID int Data interface{} } // Result represents the outcome of processing a job type Result struct { JobID int Output interface{} Error error } // WorkerPool manages a pool of workers type WorkerPool struct { workerCount int jobQueue chan Job resultQueue chan Result quit chan bool wg sync.WaitGroup } // NewWorkerPool creates a new worker pool func NewWorkerPool(workerCount, jobQueueSize int) *WorkerPool { return &WorkerPool{ workerCount: workerCount, jobQueue: make(chan Job, jobQueueSize), resultQueue: make(chan Result, jobQueueSize), quit: make(chan bool), } } // Start initializes and starts all workers func (wp *WorkerPool) Start() { for i := 0; i < wp.workerCount; i++ { wp.wg.Add(1) go wp.worker(i) } } // worker processes jobs from the job queue func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() for { select { case job := <-wp.jobQueue: fmt.Printf("Worker %d processing job %d\n", id, job.ID) result := wp.processJob(job) wp.resultQueue <- result case <-wp.quit: fmt.Printf("Worker %d stopping\n", id) return } } } // processJob simulates job processing func (wp *WorkerPool) processJob(job Job) Result { // Simulate work time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the job (example: square the number) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * num, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // Submit adds a job to the queue func (wp *WorkerPool) Submit(job Job) { wp.jobQueue <- job } // Results returns the result channel func (wp *WorkerPool) Results() <-chan Result { return wp.resultQueue } // Stop gracefully shuts down the worker pool func (wp *WorkerPool) Stop() { close(wp.quit) wp.wg.Wait() close(wp.jobQueue) close(wp.resultQueue) } func main() { // Create worker pool with 3 workers pool := NewWorkerPool(3, 10) pool.Start() defer pool.Stop() // Submit jobs go func() { for i := 1; i <= 10; i++ { job := Job{ ID: i, Data: i * 10, } pool.Submit(job) } }() // Collect results for i := 0; i < 10; i++ { result := <-pool.Results() if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d result: %v\n", result.JobID, result.Output) } } } Advanced Worker Pool with Context package main import ( "context" "fmt" "sync" "time" ) // ContextJob includes context for cancellation type ContextJob struct { ID string Data interface{} Context context.Context } // ContextResult includes timing and context information type ContextResult struct { JobID string Output interface{} Error error Duration time.Duration WorkerID int } // AdvancedWorkerPool supports context cancellation and monitoring type AdvancedWorkerPool struct { workerCount int jobQueue chan ContextJob resultQueue chan ContextResult ctx context.Context cancel context.CancelFunc wg sync.WaitGroup metrics *PoolMetrics } // PoolMetrics tracks worker pool performance type PoolMetrics struct { mu sync.RWMutex jobsProcessed int64 jobsFailed int64 totalDuration time.Duration activeWorkers int } func (pm *PoolMetrics) RecordJob(duration time.Duration, success bool) { pm.mu.Lock() defer pm.mu.Unlock() if success { pm.jobsProcessed++ } else { pm.jobsFailed++ } pm.totalDuration += duration } func (pm *PoolMetrics) SetActiveWorkers(count int) { pm.mu.Lock() defer pm.mu.Unlock() pm.activeWorkers = count } func (pm *PoolMetrics) GetStats() (processed, failed int64, avgDuration time.Duration, active int) { pm.mu.RLock() defer pm.mu.RUnlock() processed = pm.jobsProcessed failed = pm.jobsFailed active = pm.activeWorkers if pm.jobsProcessed > 0 { avgDuration = pm.totalDuration / time.Duration(pm.jobsProcessed) } return } // NewAdvancedWorkerPool creates a new advanced worker pool func NewAdvancedWorkerPool(ctx context.Context, workerCount, queueSize int) *AdvancedWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &AdvancedWorkerPool{ workerCount: workerCount, jobQueue: make(chan ContextJob, queueSize), resultQueue: make(chan ContextResult, queueSize), ctx: poolCtx, cancel: cancel, metrics: &PoolMetrics{}, } } // Start begins processing with all workers func (awp *AdvancedWorkerPool) Start() { awp.metrics.SetActiveWorkers(awp.workerCount) for i := 0; i < awp.workerCount; i++ { awp.wg.Add(1) go awp.worker(i) } // Start metrics reporter go awp.reportMetrics() } // worker processes jobs with context support func (awp *AdvancedWorkerPool) worker(id int) { defer awp.wg.Done() for { select { case job := <-awp.jobQueue: start := time.Now() result := awp.processContextJob(job, id) duration := time.Since(start) awp.metrics.RecordJob(duration, result.Error == nil) select { case awp.resultQueue <- result: case <-awp.ctx.Done(): return } case <-awp.ctx.Done(): fmt.Printf("Worker %d shutting down\n", id) return } } } // processContextJob handles job processing with context func (awp *AdvancedWorkerPool) processContextJob(job ContextJob, workerID int) ContextResult { start := time.Now() // Check if job context is already cancelled select { case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } default: } // Simulate work that respects context cancellation workDone := make(chan interface{}, 1) workErr := make(chan error, 1) go func() { // Simulate processing time time.Sleep(time.Duration(50+rand.Intn(100)) * time.Millisecond) if num, ok := job.Data.(int); ok { workDone <- num * num } else { workErr <- fmt.Errorf("invalid data type") } }() select { case result := <-workDone: return ContextResult{ JobID: job.ID, Output: result, Duration: time.Since(start), WorkerID: workerID, } case err := <-workErr: return ContextResult{ JobID: job.ID, Error: err, Duration: time.Since(start), WorkerID: workerID, } case <-job.Context.Done(): return ContextResult{ JobID: job.ID, Error: job.Context.Err(), Duration: time.Since(start), WorkerID: workerID, } case <-awp.ctx.Done(): return ContextResult{ JobID: job.ID, Error: awp.ctx.Err(), Duration: time.Since(start), WorkerID: workerID, } } } // Submit adds a job to the queue func (awp *AdvancedWorkerPool) Submit(job ContextJob) error { select { case awp.jobQueue <- job: return nil case <-awp.ctx.Done(): return awp.ctx.Err() } } // Results returns the result channel func (awp *AdvancedWorkerPool) Results() <-chan ContextResult { return awp.resultQueue } // reportMetrics periodically reports pool statistics func (awp *AdvancedWorkerPool) reportMetrics() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: processed, failed, avgDuration, active := awp.metrics.GetStats() fmt.Printf("Pool Stats - Processed: %d, Failed: %d, Avg Duration: %v, Active Workers: %d\n", processed, failed, avgDuration, active) case <-awp.ctx.Done(): return } } } // Stop gracefully shuts down the worker pool func (awp *AdvancedWorkerPool) Stop() { awp.cancel() awp.wg.Wait() close(awp.jobQueue) close(awp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 4, 20) pool.Start() defer pool.Stop() // Submit jobs with individual timeouts go func() { for i := 1; i <= 15; i++ { jobCtx, jobCancel := context.WithTimeout(ctx, 200*time.Millisecond) job := ContextJob{ ID: fmt.Sprintf("job-%d", i), Data: i * 5, Context: jobCtx, } if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) jobCancel() break } // Cancel some jobs early to demonstrate cancellation if i%5 == 0 { go func() { time.Sleep(50 * time.Millisecond) jobCancel() }() } else { defer jobCancel() } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %s failed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Error, result.Duration) } else { fmt.Printf("Job %s completed (worker %d): %v (took %v)\n", result.JobID, result.WorkerID, result.Output, result.Duration) } if resultCount >= 15 { break } } } Dynamic Worker Pool package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // DynamicWorkerPool can scale workers up and down based on load type DynamicWorkerPool struct { minWorkers int maxWorkers int currentWorkers int64 jobQueue chan Job resultQueue chan Result ctx context.Context cancel context.CancelFunc wg sync.WaitGroup workerControl chan int // +1 to add worker, -1 to remove worker metrics *DynamicMetrics } // DynamicMetrics tracks load and performance for scaling decisions type DynamicMetrics struct { mu sync.RWMutex queueLength int64 avgProcessingTime time.Duration lastScaleTime time.Time scaleUpThreshold int scaleDownThreshold int } func (dm *DynamicMetrics) UpdateQueueLength(length int) { atomic.StoreInt64(&dm.queueLength, int64(length)) } func (dm *DynamicMetrics) GetQueueLength() int { return int(atomic.LoadInt64(&dm.queueLength)) } func (dm *DynamicMetrics) ShouldScaleUp(currentWorkers int, maxWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers < maxWorkers && dm.GetQueueLength() > dm.scaleUpThreshold && time.Since(dm.lastScaleTime) > 5*time.Second } func (dm *DynamicMetrics) ShouldScaleDown(currentWorkers int, minWorkers int) bool { dm.mu.RLock() defer dm.mu.RUnlock() return currentWorkers > minWorkers && dm.GetQueueLength() < dm.scaleDownThreshold && time.Since(dm.lastScaleTime) > 10*time.Second } func (dm *DynamicMetrics) RecordScale() { dm.mu.Lock() defer dm.mu.Unlock() dm.lastScaleTime = time.Now() } // NewDynamicWorkerPool creates a new dynamic worker pool func NewDynamicWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *DynamicWorkerPool { poolCtx, cancel := context.WithCancel(ctx) return &DynamicWorkerPool{ minWorkers: minWorkers, maxWorkers: maxWorkers, currentWorkers: 0, jobQueue: make(chan Job, queueSize), resultQueue: make(chan Result, queueSize), ctx: poolCtx, cancel: cancel, workerControl: make(chan int, maxWorkers), metrics: &DynamicMetrics{ scaleUpThreshold: queueSize / 2, scaleDownThreshold: queueSize / 4, }, } } // Start initializes the pool with minimum workers func (dwp *DynamicWorkerPool) Start() { // Start with minimum workers for i := 0; i < dwp.minWorkers; i++ { dwp.addWorker() } // Start the scaler go dwp.scaler() // Start queue monitor go dwp.queueMonitor() } // addWorker creates and starts a new worker func (dwp *DynamicWorkerPool) addWorker() { workerID := atomic.AddInt64(&dwp.currentWorkers, 1) dwp.wg.Add(1) go func(id int64) { defer dwp.wg.Done() defer atomic.AddInt64(&dwp.currentWorkers, -1) fmt.Printf("Worker %d started\n", id) for { select { case job := <-dwp.jobQueue: start := time.Now() result := dwp.processJob(job) duration := time.Since(start) fmt.Printf("Worker %d processed job %d in %v\n", id, job.ID, duration) select { case dwp.resultQueue <- result: case <-dwp.ctx.Done(): return } case <-dwp.ctx.Done(): fmt.Printf("Worker %d stopping\n", id) return } } }(workerID) } // processJob simulates job processing func (dwp *DynamicWorkerPool) processJob(job Job) Result { // Simulate variable processing time time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond) if num, ok := job.Data.(int); ok { return Result{ JobID: job.ID, Output: num * 2, } } return Result{ JobID: job.ID, Error: fmt.Errorf("invalid job data"), } } // scaler monitors load and adjusts worker count func (dwp *DynamicWorkerPool) scaler() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: currentWorkers := int(atomic.LoadInt64(&dwp.currentWorkers)) queueLength := dwp.metrics.GetQueueLength() fmt.Printf("Scaler check - Workers: %d, Queue: %d\n", currentWorkers, queueLength) if dwp.metrics.ShouldScaleUp(currentWorkers, dwp.maxWorkers) { fmt.Printf("Scaling up: adding worker (current: %d)\n", currentWorkers) dwp.addWorker() dwp.metrics.RecordScale() } else if dwp.metrics.ShouldScaleDown(currentWorkers, dwp.minWorkers) { fmt.Printf("Scaling down: removing worker (current: %d)\n", currentWorkers) // Signal one worker to stop by closing context // In a real implementation, you might use a more sophisticated approach dwp.metrics.RecordScale() } case <-dwp.ctx.Done(): return } } } // queueMonitor tracks queue length for scaling decisions func (dwp *DynamicWorkerPool) queueMonitor() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: queueLength := len(dwp.jobQueue) dwp.metrics.UpdateQueueLength(queueLength) case <-dwp.ctx.Done(): return } } } // Submit adds a job to the queue func (dwp *DynamicWorkerPool) Submit(job Job) error { select { case dwp.jobQueue <- job: return nil case <-dwp.ctx.Done(): return dwp.ctx.Err() } } // Results returns the result channel func (dwp *DynamicWorkerPool) Results() <-chan Result { return dwp.resultQueue } // Stop gracefully shuts down the pool func (dwp *DynamicWorkerPool) Stop() { dwp.cancel() dwp.wg.Wait() close(dwp.jobQueue) close(dwp.resultQueue) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() pool := NewDynamicWorkerPool(ctx, 2, 6, 20) pool.Start() defer pool.Stop() // Submit jobs in bursts to trigger scaling go func() { // Initial burst for i := 1; i <= 10; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(8 * time.Second) // Second burst for i := 11; i <= 25; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } time.Sleep(5 * time.Second) // Final smaller batch for i := 26; i <= 30; i++ { job := Job{ID: i, Data: i * 10} if err := pool.Submit(job); err != nil { fmt.Printf("Failed to submit job %d: %v\n", i, err) break } } }() // Collect results resultCount := 0 for result := range pool.Results() { resultCount++ if result.Error != nil { fmt.Printf("Job %d failed: %v\n", result.JobID, result.Error) } else { fmt.Printf("Job %d completed: %v\n", result.JobID, result.Output) } if resultCount >= 30 { break } } } Best Practices Right-Size the Pool: Match worker count to available resources Monitor Performance: Track queue length, processing times, and throughput Handle Backpressure: Implement proper queue management Graceful Shutdown: Ensure all workers complete current jobs Error Handling: Isolate worker failures from the pool Resource Cleanup: Properly close channels and cancel contexts Load Balancing: Distribute work evenly across workers Common Pitfalls Too Many Workers: Creating more workers than CPU cores for CPU-bound tasks Unbounded Queues: Memory issues with unlimited job queues Worker Leaks: Not properly shutting down workers Blocking Operations: Long-running jobs blocking other work No Backpressure: Not handling queue overflow situations Testing Worker Pools package main import ( "context" "testing" "time" ) func TestWorkerPool(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() pool := NewAdvancedWorkerPool(ctx, 2, 5) pool.Start() defer pool.Stop() // Submit test jobs jobCount := 5 for i := 1; i <= jobCount; i++ { job := ContextJob{ ID: fmt.Sprintf("test-%d", i), Data: i, Context: ctx, } if err := pool.Submit(job); err != nil { t.Fatalf("Failed to submit job: %v", err) } } // Collect results results := make(map[string]ContextResult) for i := 0; i < jobCount; i++ { select { case result := <-pool.Results(): results[result.JobID] = result case <-time.After(2 * time.Second): t.Fatal("Timeout waiting for results") } } // Verify all jobs completed if len(results) != jobCount { t.Errorf("Expected %d results, got %d", jobCount, len(results)) } // Verify results are correct for i := 1; i <= jobCount; i++ { jobID := fmt.Sprintf("test-%d", i) result, exists := results[jobID] if !exists { t.Errorf("Missing result for job %s", jobID) continue } if result.Error != nil { t.Errorf("Job %s failed: %v", jobID, result.Error) continue } expected := i * i if result.Output != expected { t.Errorf("Job %s: expected %d, got %v", jobID, expected, result.Output) } } } The Worker Pool pattern is essential for building scalable, resource-efficient concurrent applications in Go. It provides controlled concurrency, predictable resource usage, and excellent performance characteristics for both CPU-bound and I/O-bound workloads. ...

    August 21, 2024 · 12 min · Rafiul Alam

    WaitGroup Pattern in Go

    Go Concurrency Patterns Series: ← Mutex Patterns | Series Overview | Once Pattern → What is the WaitGroup Pattern? The WaitGroup pattern uses sync.WaitGroup to coordinate the completion of multiple goroutines. It acts as a counter that blocks until all registered goroutines have finished executing, making it perfect for implementing barriers and waiting for parallel tasks to complete. Key Operations: Add(n): Increment the counter by n Done(): Decrement the counter by 1 (usually called with defer) Wait(): Block until counter reaches zero Real-World Use Cases Parallel Processing: Wait for all workers to complete Batch Operations: Process multiple items concurrently Service Initialization: Wait for all services to start Data Collection: Gather results from multiple sources Cleanup Operations: Ensure all cleanup tasks finish Testing: Coordinate test goroutines Basic WaitGroup Usage package main import ( "fmt" "math/rand" "sync" "time" ) // Task represents work to be done type Task struct { ID int Name string } // processTask simulates processing a task func processTask(task Task, wg *sync.WaitGroup) { defer wg.Done() // Always call Done when goroutine finishes fmt.Printf("Starting task %d: %s\n", task.ID, task.Name) // Simulate work duration := time.Duration(rand.Intn(1000)) * time.Millisecond time.Sleep(duration) fmt.Printf("Completed task %d: %s (took %v)\n", task.ID, task.Name, duration) } func main() { tasks := []Task{ {1, "Process images"}, {2, "Send emails"}, {3, "Update database"}, {4, "Generate reports"}, {5, "Backup files"}, } var wg sync.WaitGroup fmt.Println("Starting parallel task processing...") // Start all tasks for _, task := range tasks { wg.Add(1) // Increment counter for each goroutine go processTask(task, &wg) } // Wait for all tasks to complete wg.Wait() fmt.Println("All tasks completed!") } WaitGroup with Error Handling package main import ( "fmt" "math/rand" "sync" "time" ) // Result represents the outcome of a task type Result struct { TaskID int Data interface{} Error error } // TaskProcessor handles tasks with error collection type TaskProcessor struct { wg sync.WaitGroup results chan Result errors []error mu sync.Mutex } // NewTaskProcessor creates a new task processor func NewTaskProcessor(bufferSize int) *TaskProcessor { return &TaskProcessor{ results: make(chan Result, bufferSize), } } // processTaskWithError simulates task processing that might fail func (tp *TaskProcessor) processTaskWithError(taskID int, data interface{}) { defer tp.wg.Done() fmt.Printf("Processing task %d\n", taskID) // Simulate work time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // Simulate random failures if rand.Float32() < 0.3 { err := fmt.Errorf("task %d failed", taskID) tp.results <- Result{TaskID: taskID, Error: err} // Collect error tp.mu.Lock() tp.errors = append(tp.errors, err) tp.mu.Unlock() fmt.Printf("Task %d failed\n", taskID) return } // Success result := fmt.Sprintf("Result from task %d", taskID) tp.results <- Result{TaskID: taskID, Data: result} fmt.Printf("Task %d completed successfully\n", taskID) } // ProcessTasks processes multiple tasks and collects results func (tp *TaskProcessor) ProcessTasks(taskCount int) ([]Result, []error) { // Start all tasks for i := 1; i <= taskCount; i++ { tp.wg.Add(1) go tp.processTaskWithError(i, fmt.Sprintf("data-%d", i)) } // Close results channel when all tasks complete go func() { tp.wg.Wait() close(tp.results) }() // Collect results var results []Result for result := range tp.results { results = append(results, result) } tp.mu.Lock() errors := make([]error, len(tp.errors)) copy(errors, tp.errors) tp.mu.Unlock() return results, errors } func main() { processor := NewTaskProcessor(10) fmt.Println("Starting task processing with error handling...") results, errors := processor.ProcessTasks(8) fmt.Printf("\nProcessing complete!\n") fmt.Printf("Successful tasks: %d\n", len(results)-len(errors)) fmt.Printf("Failed tasks: %d\n", len(errors)) if len(errors) > 0 { fmt.Println("\nErrors:") for _, err := range errors { fmt.Printf(" - %v\n", err) } } fmt.Println("\nResults:") for _, result := range results { if result.Error == nil { fmt.Printf(" Task %d: %v\n", result.TaskID, result.Data) } } } Nested WaitGroups for Hierarchical Tasks package main import ( "fmt" "sync" "time" ) // Department represents a department with multiple teams type Department struct { Name string Teams []Team } // Team represents a team with multiple workers type Team struct { Name string Workers []string } // processDepartment processes all teams in a department func processDepartment(dept Department, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Department %s starting work\n", dept.Name) var teamWG sync.WaitGroup // Process all teams in parallel for _, team := range dept.Teams { teamWG.Add(1) go processTeam(team, &teamWG) } // Wait for all teams to complete teamWG.Wait() fmt.Printf("Department %s completed all work\n", dept.Name) } // processTeam processes all workers in a team func processTeam(team Team, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(" Team %s starting work\n", team.Name) var workerWG sync.WaitGroup // Process all workers in parallel for _, worker := range team.Workers { workerWG.Add(1) go processWorker(worker, &workerWG) } // Wait for all workers to complete workerWG.Wait() fmt.Printf(" Team %s completed all work\n", team.Name) } // processWorker simulates worker processing func processWorker(worker string, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(" Worker %s working...\n", worker) time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) fmt.Printf(" Worker %s finished\n", worker) } func main() { departments := []Department{ { Name: "Engineering", Teams: []Team{ { Name: "Backend", Workers: []string{"Alice", "Bob", "Charlie"}, }, { Name: "Frontend", Workers: []string{"Diana", "Eve"}, }, }, }, { Name: "Marketing", Teams: []Team{ { Name: "Digital", Workers: []string{"Frank", "Grace"}, }, { Name: "Content", Workers: []string{"Henry", "Ivy", "Jack"}, }, }, }, } var deptWG sync.WaitGroup fmt.Println("Starting company-wide project...") // Process all departments in parallel for _, dept := range departments { deptWG.Add(1) go processDepartment(dept, &deptWG) } // Wait for all departments to complete deptWG.Wait() fmt.Println("Company-wide project completed!") } WaitGroup with Timeout package main import ( "context" "fmt" "sync" "time" ) // TimedTaskRunner runs tasks with timeout support type TimedTaskRunner struct { timeout time.Duration } // NewTimedTaskRunner creates a new timed task runner func NewTimedTaskRunner(timeout time.Duration) *TimedTaskRunner { return &TimedTaskRunner{timeout: timeout} } // RunWithTimeout runs tasks with a timeout func (ttr *TimedTaskRunner) RunWithTimeout(tasks []func()) error { ctx, cancel := context.WithTimeout(context.Background(), ttr.timeout) defer cancel() var wg sync.WaitGroup done := make(chan struct{}) // Start all tasks for i, task := range tasks { wg.Add(1) go func(taskID int, taskFunc func()) { defer wg.Done() fmt.Printf("Starting task %d\n", taskID) taskFunc() fmt.Printf("Completed task %d\n", taskID) }(i+1, task) } // Wait for completion in separate goroutine go func() { wg.Wait() close(done) }() // Wait for either completion or timeout select { case <-done: fmt.Println("All tasks completed successfully") return nil case <-ctx.Done(): fmt.Println("Tasks timed out") return ctx.Err() } } // simulateTask creates a task that takes a specific duration func simulateTask(duration time.Duration, name string) func() { return func() { fmt.Printf(" %s working for %v\n", name, duration) time.Sleep(duration) fmt.Printf(" %s finished\n", name) } } func main() { runner := NewTimedTaskRunner(2 * time.Second) // Test with tasks that complete within timeout fmt.Println("Test 1: Tasks that complete within timeout") tasks1 := []func(){ simulateTask(300*time.Millisecond, "Quick task 1"), simulateTask(500*time.Millisecond, "Quick task 2"), simulateTask(400*time.Millisecond, "Quick task 3"), } if err := runner.RunWithTimeout(tasks1); err != nil { fmt.Printf("Error: %v\n", err) } fmt.Println("\nTest 2: Tasks that exceed timeout") tasks2 := []func(){ simulateTask(800*time.Millisecond, "Slow task 1"), simulateTask(1500*time.Millisecond, "Slow task 2"), simulateTask(2000*time.Millisecond, "Very slow task"), } if err := runner.RunWithTimeout(tasks2); err != nil { fmt.Printf("Error: %v\n", err) } } Dynamic WaitGroup Management package main import ( "fmt" "sync" "time" ) // DynamicTaskManager manages tasks that can spawn other tasks type DynamicTaskManager struct { wg sync.WaitGroup taskChan chan func() quit chan struct{} active sync.WaitGroup } // NewDynamicTaskManager creates a new dynamic task manager func NewDynamicTaskManager() *DynamicTaskManager { return &DynamicTaskManager{ taskChan: make(chan func(), 100), quit: make(chan struct{}), } } // Start begins processing tasks func (dtm *DynamicTaskManager) Start() { go dtm.taskProcessor() } // taskProcessor processes tasks from the channel func (dtm *DynamicTaskManager) taskProcessor() { for { select { case task := <-dtm.taskChan: dtm.active.Add(1) go func() { defer dtm.active.Done() task() }() case <-dtm.quit: return } } } // AddTask adds a task to be processed func (dtm *DynamicTaskManager) AddTask(task func()) { select { case dtm.taskChan <- task: case <-dtm.quit: } } // Wait waits for all active tasks to complete func (dtm *DynamicTaskManager) Wait() { dtm.active.Wait() } // Stop stops the task manager func (dtm *DynamicTaskManager) Stop() { close(dtm.quit) dtm.Wait() } // recursiveTask demonstrates a task that spawns other tasks func recursiveTask(manager *DynamicTaskManager, depth int, maxDepth int, id string) func() { return func() { fmt.Printf("Task %s (depth %d) starting\n", id, depth) time.Sleep(100 * time.Millisecond) if depth < maxDepth { // Spawn child tasks for i := 0; i < 2; i++ { childID := fmt.Sprintf("%s.%d", id, i+1) manager.AddTask(recursiveTask(manager, depth+1, maxDepth, childID)) } } fmt.Printf("Task %s (depth %d) completed\n", id, depth) } } func main() { manager := NewDynamicTaskManager() manager.Start() defer manager.Stop() fmt.Println("Starting dynamic task processing...") // Add initial tasks that will spawn more tasks for i := 0; i < 3; i++ { taskID := fmt.Sprintf("root-%d", i+1) manager.AddTask(recursiveTask(manager, 0, 2, taskID)) } // Wait for all tasks (including dynamically created ones) to complete manager.Wait() fmt.Println("All tasks completed!") } Best Practices Always Use defer: Call Done() with defer to ensure it’s called even if panic occurs Add Before Starting: Call Add() before starting goroutines to avoid race conditions Don’t Reuse WaitGroups: Create new WaitGroup for each batch of operations Handle Panics: Use recover in goroutines to prevent panic from affecting WaitGroup Avoid Negative Counters: Don’t call Done() more times than Add() Use Timeouts: Combine with context for timeout handling Consider Alternatives: Use channels for complex coordination scenarios Common Pitfalls 1. Race Condition with Add/Done // Bad: Race condition func badExample() { var wg sync.WaitGroup for i := 0; i < 5; i++ { go func() { wg.Add(1) // Race: might be called after Wait() defer wg.Done() // do work }() } wg.Wait() // Might not wait for all goroutines } // Good: Add before starting goroutines func goodExample() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) // Add before starting goroutine go func() { defer wg.Done() // do work }() } wg.Wait() } 2. Forgetting to Call Done // Bad: Missing Done() call func badTask(wg *sync.WaitGroup) { // do work if someCondition { return // Forgot to call Done()! } wg.Done() } // Good: Always use defer func goodTask(wg *sync.WaitGroup) { defer wg.Done() // Always called // do work if someCondition { return // Done() still called } } Testing WaitGroup Patterns package main import ( "sync" "testing" "time" ) func TestWaitGroupCompletion(t *testing.T) { var wg sync.WaitGroup completed := make([]bool, 5) for i := 0; i < 5; i++ { wg.Add(1) go func(index int) { defer wg.Done() time.Sleep(10 * time.Millisecond) completed[index] = true }(i) } wg.Wait() // Verify all tasks completed for i, done := range completed { if !done { t.Errorf("Task %d did not complete", i) } } } func TestWaitGroupWithTimeout(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) wg.Add(1) go func() { defer wg.Done() time.Sleep(50 * time.Millisecond) }() go func() { wg.Wait() close(done) }() select { case <-done: // Success case <-time.After(100 * time.Millisecond): t.Error("WaitGroup did not complete within timeout") } } The WaitGroup pattern is essential for coordinating goroutines in Go. It provides a simple yet powerful way to wait for multiple concurrent operations to complete, making it perfect for parallel processing, batch operations, and synchronization barriers. ...

    August 14, 2024 · 9 min · Rafiul Alam

    Semaphore Pattern in Go

    Go Concurrency Patterns Series: ← Rate Limiter | Series Overview | Actor Model → What is the Semaphore Pattern? A semaphore is a synchronization primitive that maintains a count of available resources and controls access to them. It allows a specified number of goroutines to access a resource concurrently while blocking others until resources become available. Types: Binary Semaphore: Acts like a mutex (0 or 1) Counting Semaphore: Allows N concurrent accesses Weighted Semaphore: Resources have different weights/costs Real-World Use Cases Connection Pools: Limit database/HTTP connections Resource Management: Control access to limited resources Download Managers: Limit concurrent downloads API Rate Limiting: Control concurrent API calls Worker Pools: Limit concurrent workers Memory Management: Control memory-intensive operations Basic Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // Semaphore implements a counting semaphore type Semaphore struct { ch chan struct{} } // NewSemaphore creates a new semaphore with given capacity func NewSemaphore(capacity int) *Semaphore { return &Semaphore{ ch: make(chan struct{}, capacity), } } // Acquire acquires a resource from the semaphore func (s *Semaphore) Acquire() { s.ch <- struct{}{} } // TryAcquire tries to acquire a resource without blocking func (s *Semaphore) TryAcquire() bool { select { case s.ch <- struct{}{}: return true default: return false } } // AcquireWithContext acquires a resource with context cancellation func (s *Semaphore) AcquireWithContext(ctx context.Context) error { select { case s.ch <- struct{}{}: return nil case <-ctx.Done(): return ctx.Err() } } // Release releases a resource back to the semaphore func (s *Semaphore) Release() { <-s.ch } // Available returns the number of available resources func (s *Semaphore) Available() int { return cap(s.ch) - len(s.ch) } // Used returns the number of used resources func (s *Semaphore) Used() int { return len(s.ch) } // Capacity returns the total capacity func (s *Semaphore) Capacity() int { return cap(s.ch) } // simulateWork simulates work that requires a resource func simulateWork(id int, duration time.Duration, sem *Semaphore) { fmt.Printf("Worker %d: Requesting resource...\n", id) sem.Acquire() fmt.Printf("Worker %d: Acquired resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) // Simulate work time.Sleep(duration) sem.Release() fmt.Printf("Worker %d: Released resource (available: %d/%d)\n", id, sem.Available(), sem.Capacity()) } func main() { // Create semaphore with capacity of 3 sem := NewSemaphore(3) fmt.Println("=== Basic Semaphore Demo ===") fmt.Printf("Semaphore capacity: %d\n\n", sem.Capacity()) var wg sync.WaitGroup // Start 6 workers, but only 3 can work concurrently for i := 1; i <= 6; i++ { wg.Add(1) go func(id int) { defer wg.Done() simulateWork(id, time.Duration(1+id%3)*time.Second, sem) }(i) time.Sleep(200 * time.Millisecond) // Stagger starts } wg.Wait() fmt.Printf("\nFinal state - Available: %d/%d\n", sem.Available(), sem.Capacity()) } Advanced Semaphore with Timeout and Context package main import ( "context" "fmt" "sync" "sync/atomic" "time" ) // AdvancedSemaphore provides additional features like metrics and timeouts type AdvancedSemaphore struct { ch chan struct{} capacity int // Metrics totalAcquires int64 totalReleases int64 timeouts int64 cancellations int64 // Monitoring mu sync.RWMutex waitingGoroutines int } // NewAdvancedSemaphore creates a new advanced semaphore func NewAdvancedSemaphore(capacity int) *AdvancedSemaphore { return &AdvancedSemaphore{ ch: make(chan struct{}, capacity), capacity: capacity, } } // Acquire acquires a resource (blocking) func (as *AdvancedSemaphore) Acquire() { as.incrementWaiting() defer as.decrementWaiting() as.ch <- struct{}{} atomic.AddInt64(&as.totalAcquires, 1) } // TryAcquire tries to acquire without blocking func (as *AdvancedSemaphore) TryAcquire() bool { select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return true default: return false } } // AcquireWithTimeout acquires with a timeout func (as *AdvancedSemaphore) AcquireWithTimeout(timeout time.Duration) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-time.After(timeout): atomic.AddInt64(&as.timeouts, 1) return fmt.Errorf("timeout after %v", timeout) } } // AcquireWithContext acquires with context cancellation func (as *AdvancedSemaphore) AcquireWithContext(ctx context.Context) error { as.incrementWaiting() defer as.decrementWaiting() select { case as.ch <- struct{}{}: atomic.AddInt64(&as.totalAcquires, 1) return nil case <-ctx.Done(): atomic.AddInt64(&as.cancellations, 1) return ctx.Err() } } // Release releases a resource func (as *AdvancedSemaphore) Release() { <-as.ch atomic.AddInt64(&as.totalReleases, 1) } // incrementWaiting increments waiting goroutines counter func (as *AdvancedSemaphore) incrementWaiting() { as.mu.Lock() as.waitingGoroutines++ as.mu.Unlock() } // decrementWaiting decrements waiting goroutines counter func (as *AdvancedSemaphore) decrementWaiting() { as.mu.Lock() as.waitingGoroutines-- as.mu.Unlock() } // GetStats returns semaphore statistics func (as *AdvancedSemaphore) GetStats() map[string]interface{} { as.mu.RLock() waiting := as.waitingGoroutines as.mu.RUnlock() return map[string]interface{}{ "capacity": as.capacity, "available": as.Available(), "used": as.Used(), "waiting": waiting, "total_acquires": atomic.LoadInt64(&as.totalAcquires), "total_releases": atomic.LoadInt64(&as.totalReleases), "timeouts": atomic.LoadInt64(&as.timeouts), "cancellations": atomic.LoadInt64(&as.cancellations), } } // Available returns available resources func (as *AdvancedSemaphore) Available() int { return cap(as.ch) - len(as.ch) } // Used returns used resources func (as *AdvancedSemaphore) Used() int { return len(as.ch) } // Capacity returns total capacity func (as *AdvancedSemaphore) Capacity() int { return as.capacity } // ResourceManager demonstrates semaphore usage for resource management type ResourceManager struct { semaphore *AdvancedSemaphore resources []string } // NewResourceManager creates a new resource manager func NewResourceManager(resources []string) *ResourceManager { return &ResourceManager{ semaphore: NewAdvancedSemaphore(len(resources)), resources: resources, } } // UseResource uses a resource with timeout func (rm *ResourceManager) UseResource(ctx context.Context, userID string, timeout time.Duration) error { fmt.Printf("User %s: Requesting resource...\n", userID) // Try to acquire with timeout if err := rm.semaphore.AcquireWithTimeout(timeout); err != nil { fmt.Printf("User %s: Failed to acquire resource: %v\n", userID, err) return err } defer rm.semaphore.Release() resourceIndex := rm.semaphore.Used() - 1 resourceName := rm.resources[resourceIndex] fmt.Printf("User %s: Using resource '%s'\n", userID, resourceName) // Simulate resource usage select { case <-time.After(time.Duration(1+len(userID)%3) * time.Second): fmt.Printf("User %s: Finished using resource '%s'\n", userID, resourceName) return nil case <-ctx.Done(): fmt.Printf("User %s: Resource usage cancelled\n", userID) return ctx.Err() } } // GetStats returns resource manager statistics func (rm *ResourceManager) GetStats() map[string]interface{} { return rm.semaphore.GetStats() } func main() { resources := []string{"Database-1", "Database-2", "API-Gateway"} manager := NewResourceManager(resources) fmt.Println("=== Advanced Semaphore Demo ===") fmt.Printf("Available resources: %v\n\n", resources) // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := manager.GetStats() fmt.Printf(" Stats: Used=%d/%d, Waiting=%d, Timeouts=%d\n", stats["used"], stats["capacity"], stats["waiting"], stats["timeouts"]) } }() var wg sync.WaitGroup // Simulate users requesting resources users := []string{"Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"} for i, user := range users { wg.Add(1) go func(userID string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Some users have shorter timeouts timeout := 3 * time.Second if len(userID)%2 == 0 { timeout = 1 * time.Second } err := manager.UseResource(ctx, userID, timeout) if err != nil { fmt.Printf(" User %s failed: %v\n", userID, err) } }(user, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := manager.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Weighted Semaphore Implementation package main import ( "context" "fmt" "sync" "time" ) // WeightedSemaphore allows acquiring resources with different weights type WeightedSemaphore struct { mu sync.Mutex capacity int64 current int64 waiters []waiter } // waiter represents a goroutine waiting for resources type waiter struct { weight int64 ready chan struct{} } // NewWeightedSemaphore creates a new weighted semaphore func NewWeightedSemaphore(capacity int64) *WeightedSemaphore { return &WeightedSemaphore{ capacity: capacity, waiters: make([]waiter, 0), } } // Acquire acquires resources with given weight func (ws *WeightedSemaphore) Acquire(weight int64) { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() <-ready } // TryAcquire tries to acquire resources without blocking func (ws *WeightedSemaphore) TryAcquire(weight int64) bool { ws.mu.Lock() defer ws.mu.Unlock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { ws.current += weight return true } return false } // AcquireWithContext acquires resources with context cancellation func (ws *WeightedSemaphore) AcquireWithContext(ctx context.Context, weight int64) error { ws.mu.Lock() if ws.current+weight <= ws.capacity && len(ws.waiters) == 0 { // Can acquire immediately ws.current += weight ws.mu.Unlock() return nil } // Need to wait ready := make(chan struct{}) ws.waiters = append(ws.waiters, waiter{weight: weight, ready: ready}) ws.mu.Unlock() select { case <-ready: return nil case <-ctx.Done(): // Remove from waiters list ws.mu.Lock() for i, w := range ws.waiters { if w.ready == ready { ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) break } } ws.mu.Unlock() return ctx.Err() } } // Release releases resources with given weight func (ws *WeightedSemaphore) Release(weight int64) { ws.mu.Lock() defer ws.mu.Unlock() ws.current -= weight ws.notifyWaiters() } // notifyWaiters notifies waiting goroutines that can now proceed func (ws *WeightedSemaphore) notifyWaiters() { for i := 0; i < len(ws.waiters); { w := ws.waiters[i] if ws.current+w.weight <= ws.capacity { // This waiter can proceed ws.current += w.weight close(w.ready) // Remove from waiters ws.waiters = append(ws.waiters[:i], ws.waiters[i+1:]...) } else { i++ } } } // GetStats returns current statistics func (ws *WeightedSemaphore) GetStats() map[string]interface{} { ws.mu.Lock() defer ws.mu.Unlock() return map[string]interface{}{ "capacity": ws.capacity, "current": ws.current, "available": ws.capacity - ws.current, "waiters": len(ws.waiters), } } // Task represents a task with resource requirements type Task struct { ID string Weight int64 Duration time.Duration } // TaskProcessor processes tasks using weighted semaphore type TaskProcessor struct { semaphore *WeightedSemaphore } // NewTaskProcessor creates a new task processor func NewTaskProcessor(capacity int64) *TaskProcessor { return &TaskProcessor{ semaphore: NewWeightedSemaphore(capacity), } } // ProcessTask processes a task func (tp *TaskProcessor) ProcessTask(ctx context.Context, task Task) error { fmt.Printf("Task %s: Requesting %d units of resource...\n", task.ID, task.Weight) if err := tp.semaphore.AcquireWithContext(ctx, task.Weight); err != nil { fmt.Printf("Task %s: Failed to acquire resources: %v\n", task.ID, err) return err } defer tp.semaphore.Release(task.Weight) stats := tp.semaphore.GetStats() fmt.Printf("Task %s: Acquired %d units (available: %d/%d)\n", task.ID, task.Weight, stats["available"], stats["capacity"]) // Simulate task processing select { case <-time.After(task.Duration): fmt.Printf("Task %s: Completed\n", task.ID) return nil case <-ctx.Done(): fmt.Printf("Task %s: Cancelled\n", task.ID) return ctx.Err() } } // GetStats returns processor statistics func (tp *TaskProcessor) GetStats() map[string]interface{} { return tp.semaphore.GetStats() } func main() { // Create weighted semaphore with capacity of 10 units processor := NewTaskProcessor(10) fmt.Println("=== Weighted Semaphore Demo ===") fmt.Println("Total capacity: 10 units") // Define tasks with different resource requirements tasks := []Task{ {"Small-1", 2, 2 * time.Second}, {"Medium-1", 4, 3 * time.Second}, {"Large-1", 6, 4 * time.Second}, {"Small-2", 1, 1 * time.Second}, {"Small-3", 2, 2 * time.Second}, {"Medium-2", 5, 3 * time.Second}, {"Large-2", 8, 5 * time.Second}, } // Start monitoring go func() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for range ticker.C { stats := processor.GetStats() fmt.Printf(" Resources: %d/%d used, %d waiters\n", stats["current"], stats["capacity"], stats["waiters"]) } }() var wg sync.WaitGroup // Process tasks concurrently for i, task := range tasks { wg.Add(1) go func(t Task, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger task starts ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := processor.ProcessTask(ctx, t) if err != nil { fmt.Printf(" Task %s failed: %v\n", t.ID, err) } }(task, time.Duration(i*200)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := processor.GetStats() for key, value := range stats { fmt.Printf("%s: %v\n", key, value) } } Semaphore-based Connection Pool package main import ( "context" "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int InUse bool LastUsed time.Time } // ConnectionPool manages database connections using semaphore type ConnectionPool struct { connections []*Connection semaphore *AdvancedSemaphore mu sync.Mutex } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { connections := make([]*Connection, size) for i := 0; i < size; i++ { connections[i] = &Connection{ ID: i + 1, InUse: false, LastUsed: time.Now(), } } return &ConnectionPool{ connections: connections, semaphore: NewAdvancedSemaphore(size), } } // GetConnection acquires a connection from the pool func (cp *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error) { if err := cp.semaphore.AcquireWithContext(ctx); err != nil { return nil, err } cp.mu.Lock() defer cp.mu.Unlock() // Find an available connection for _, conn := range cp.connections { if !conn.InUse { conn.InUse = true conn.LastUsed = time.Now() return conn, nil } } // This shouldn't happen if semaphore is working correctly cp.semaphore.Release() return nil, fmt.Errorf("no available connections") } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { cp.mu.Lock() conn.InUse = false conn.LastUsed = time.Now() cp.mu.Unlock() cp.semaphore.Release() } // GetStats returns pool statistics func (cp *ConnectionPool) GetStats() map[string]interface{} { cp.mu.Lock() defer cp.mu.Unlock() inUse := 0 for _, conn := range cp.connections { if conn.InUse { inUse++ } } semStats := cp.semaphore.GetStats() return map[string]interface{}{ "total_connections": len(cp.connections), "in_use": inUse, "available": len(cp.connections) - inUse, "semaphore_stats": semStats, } } // DatabaseService simulates a service using the connection pool type DatabaseService struct { pool *ConnectionPool } // NewDatabaseService creates a new database service func NewDatabaseService(poolSize int) *DatabaseService { return &DatabaseService{ pool: NewConnectionPool(poolSize), } } // ExecuteQuery simulates executing a database query func (ds *DatabaseService) ExecuteQuery(ctx context.Context, userID string, query string) error { fmt.Printf("User %s: Requesting database connection for query: %s\n", userID, query) conn, err := ds.pool.GetConnection(ctx) if err != nil { fmt.Printf("User %s: Failed to get connection: %v\n", userID, err) return err } defer ds.pool.ReturnConnection(conn) fmt.Printf("User %s: Using connection %d\n", userID, conn.ID) // Simulate query execution queryDuration := time.Duration(500+len(query)*10) * time.Millisecond select { case <-time.After(queryDuration): fmt.Printf("User %s: Query completed on connection %d\n", userID, conn.ID) return nil case <-ctx.Done(): fmt.Printf("User %s: Query cancelled on connection %d\n", userID, conn.ID) return ctx.Err() } } // GetStats returns service statistics func (ds *DatabaseService) GetStats() map[string]interface{} { return ds.pool.GetStats() } func main() { // Create database service with 3 connections service := NewDatabaseService(3) fmt.Println("=== Connection Pool Demo ===") fmt.Println("Pool size: 3 connections") // Start monitoring go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for range ticker.C { stats := service.GetStats() fmt.Printf(" Pool: %d/%d in use, %d available\n", stats["in_use"], stats["total_connections"], stats["available"]) } }() var wg sync.WaitGroup // Simulate multiple users making database queries users := []struct { id string query string }{ {"Alice", "SELECT * FROM users"}, {"Bob", "SELECT * FROM orders WHERE user_id = 123"}, {"Charlie", "UPDATE users SET last_login = NOW()"}, {"Diana", "SELECT COUNT(*) FROM products"}, {"Eve", "INSERT INTO logs (message) VALUES ('test')"}, {"Frank", "SELECT * FROM analytics WHERE date > '2024-01-01'"}, } for i, user := range users { wg.Add(1) go func(userID, query string, delay time.Duration) { defer wg.Done() time.Sleep(delay) // Stagger requests ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := service.ExecuteQuery(ctx, userID, query) if err != nil { fmt.Printf(" User %s query failed: %v\n", userID, err) } }(user.id, user.query, time.Duration(i*300)*time.Millisecond) } wg.Wait() // Final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetStats() for key, value := range stats { if key == "semaphore_stats" { fmt.Printf("%s:\n", key) semStats := value.(map[string]interface{}) for k, v := range semStats { fmt.Printf(" %s: %v\n", k, v) } } else { fmt.Printf("%s: %v\n", key, value) } } } Best Practices Choose Right Capacity: Set semaphore capacity based on available resources Always Release: Use defer to ensure resources are released Handle Context: Support cancellation in long-running operations Monitor Usage: Track semaphore statistics and resource utilization Avoid Deadlocks: Don’t acquire multiple semaphores in different orders Use Timeouts: Prevent indefinite blocking with timeouts Consider Weighted: Use weighted semaphores for resources with different costs Common Pitfalls Resource Leaks: Forgetting to release acquired resources Deadlocks: Circular dependencies between semaphores Starvation: Large requests blocking smaller ones indefinitely Over-allocation: Setting capacity higher than actual resources Under-utilization: Setting capacity too low for available resources The Semaphore pattern is essential for managing limited resources in concurrent applications. It provides controlled access to resources, prevents overload, and ensures fair resource distribution among competing goroutines. ...

    August 7, 2024 · 12 min · Rafiul Alam

    Request/Response Pattern in Go

    Go Concurrency Patterns Series: ← Pub/Sub Pattern | Series Overview | Worker Pool → What is the Request/Response Pattern? The Request/Response pattern enables synchronous communication between goroutines, where a sender waits for a response from a receiver. This pattern is essential for RPC-style communication, database queries, API calls, and any scenario where you need to get a result back from an operation. Key Components: Request: Contains data and a response channel Response: Contains result data and/or error information Requester: Sends request and waits for response Responder: Processes request and sends response Real-World Use Cases Database Operations: Query execution with results API Gateways: Forwarding requests to microservices Cache Systems: Get/Set operations with confirmation File Operations: Read/Write with status feedback Validation Services: Input validation with results Authentication: Login requests with tokens Basic Request/Response Implementation package main import ( "fmt" "math/rand" "time" ) // Request represents a request with a response channel type Request struct { ID string Data interface{} Response chan Response } // Response represents the response to a request type Response struct { ID string Result interface{} Error error } // Server processes requests type Server struct { requests chan Request quit chan bool } // NewServer creates a new server func NewServer() *Server { return &Server{ requests: make(chan Request), quit: make(chan bool), } } // Start begins processing requests func (s *Server) Start() { go func() { for { select { case req := <-s.requests: s.processRequest(req) case <-s.quit: return } } }() } // processRequest handles a single request func (s *Server) processRequest(req Request) { // Simulate processing time time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Process the request (example: double the number) var response Response response.ID = req.ID if num, ok := req.Data.(int); ok { response.Result = num * 2 } else { response.Error = fmt.Errorf("invalid data type") } // Send response back req.Response <- response } // SendRequest sends a request and waits for response func (s *Server) SendRequest(id string, data interface{}) (interface{}, error) { responseChan := make(chan Response, 1) request := Request{ ID: id, Data: data, Response: responseChan, } s.requests <- request // Wait for response response := <-responseChan return response.Result, response.Error } // Stop shuts down the server func (s *Server) Stop() { close(s.quit) } func main() { server := NewServer() server.Start() defer server.Stop() // Send multiple requests for i := 1; i <= 5; i++ { result, err := server.SendRequest(fmt.Sprintf("req-%d", i), i*10) if err != nil { fmt.Printf("Request %d failed: %v\n", i, err) } else { fmt.Printf("Request %d result: %v\n", i, result) } } } Request/Response with Timeout package main import ( "context" "fmt" "math/rand" "time" ) // TimedRequest includes context for timeout handling type TimedRequest struct { ID string Data interface{} Response chan TimedResponse Context context.Context } // TimedResponse includes timing information type TimedResponse struct { ID string Result interface{} Error error Duration time.Duration Timestamp time.Time } // TimedServer processes requests with timeout support type TimedServer struct { requests chan TimedRequest quit chan bool } func NewTimedServer() *TimedServer { return &TimedServer{ requests: make(chan TimedRequest, 10), quit: make(chan bool), } } func (ts *TimedServer) Start() { go func() { for { select { case req := <-ts.requests: go ts.processTimedRequest(req) case <-ts.quit: return } } }() } func (ts *TimedServer) processTimedRequest(req TimedRequest) { start := time.Now() // Check if context is already cancelled select { case <-req.Context.Done(): ts.sendResponse(req, nil, req.Context.Err(), start) return default: } // Simulate work with random duration workDuration := time.Duration(rand.Intn(200)) * time.Millisecond select { case <-time.After(workDuration): // Work completed if num, ok := req.Data.(int); ok { ts.sendResponse(req, num*2, nil, start) } else { ts.sendResponse(req, nil, fmt.Errorf("invalid data type"), start) } case <-req.Context.Done(): // Context cancelled during work ts.sendResponse(req, nil, req.Context.Err(), start) } } func (ts *TimedServer) sendResponse(req TimedRequest, result interface{}, err error, start time.Time) { response := TimedResponse{ ID: req.ID, Result: result, Error: err, Duration: time.Since(start), Timestamp: time.Now(), } select { case req.Response <- response: case <-req.Context.Done(): // Client no longer waiting } } // SendRequestWithTimeout sends a request with a timeout func (ts *TimedServer) SendRequestWithTimeout(id string, data interface{}, timeout time.Duration) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() responseChan := make(chan TimedResponse, 1) request := TimedRequest{ ID: id, Data: data, Response: responseChan, Context: ctx, } select { case ts.requests <- request: case <-ctx.Done(): return nil, ctx.Err() } select { case response := <-responseChan: fmt.Printf("Request %s completed in %v\n", response.ID, response.Duration) return response.Result, response.Error case <-ctx.Done(): return nil, ctx.Err() } } func (ts *TimedServer) Stop() { close(ts.quit) } func main() { server := NewTimedServer() server.Start() defer server.Stop() // Send requests with different timeouts requests := []struct { id string data int timeout time.Duration }{ {"fast", 10, 300 * time.Millisecond}, {"medium", 20, 150 * time.Millisecond}, {"slow", 30, 50 * time.Millisecond}, // This might timeout } for _, req := range requests { result, err := server.SendRequestWithTimeout(req.id, req.data, req.timeout) if err != nil { fmt.Printf("Request %s failed: %v\n", req.id, err) } else { fmt.Printf("Request %s result: %v\n", req.id, result) } } } Future/Promise Pattern package main import ( "context" "fmt" "sync" "time" ) // Future represents a value that will be available in the future type Future struct { mu sync.Mutex done chan struct{} result interface{} err error computed bool } // NewFuture creates a new future func NewFuture() *Future { return &Future{ done: make(chan struct{}), } } // Set sets the future's value func (f *Future) Set(result interface{}, err error) { f.mu.Lock() defer f.mu.Unlock() if f.computed { return // Already set } f.result = result f.err = err f.computed = true close(f.done) } // Get waits for and returns the future's value func (f *Future) Get() (interface{}, error) { <-f.done return f.result, f.err } // GetWithTimeout waits for the value with a timeout func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-time.After(timeout): return nil, fmt.Errorf("timeout waiting for future") } } // GetWithContext waits for the value with context cancellation func (f *Future) GetWithContext(ctx context.Context) (interface{}, error) { select { case <-f.done: return f.result, f.err case <-ctx.Done(): return nil, ctx.Err() } } // IsReady returns true if the future has been computed func (f *Future) IsReady() bool { f.mu.Lock() defer f.mu.Unlock() return f.computed } // AsyncService demonstrates async operations with futures type AsyncService struct { workers chan struct{} } func NewAsyncService(maxWorkers int) *AsyncService { return &AsyncService{ workers: make(chan struct{}, maxWorkers), } } // ProcessAsync starts async processing and returns a future func (as *AsyncService) ProcessAsync(data interface{}) *Future { future := NewFuture() go func() { // Acquire worker slot as.workers <- struct{}{} defer func() { <-as.workers }() // Simulate processing time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond) // Process data if num, ok := data.(int); ok { future.Set(num*num, nil) } else { future.Set(nil, fmt.Errorf("invalid data type")) } }() return future } func main() { service := NewAsyncService(3) // Start multiple async operations futures := make([]*Future, 5) for i := 0; i < 5; i++ { fmt.Printf("Starting async operation %d\n", i+1) futures[i] = service.ProcessAsync((i + 1) * 10) } // Wait for all results fmt.Println("\nWaiting for results...") for i, future := range futures { result, err := future.Get() if err != nil { fmt.Printf("Operation %d failed: %v\n", i+1, err) } else { fmt.Printf("Operation %d result: %v\n", i+1, result) } } // Example with timeout fmt.Println("\nTesting timeout...") timeoutFuture := service.ProcessAsync(100) result, err := timeoutFuture.GetWithTimeout(50 * time.Millisecond) if err != nil { fmt.Printf("Timeout example failed: %v\n", err) } else { fmt.Printf("Timeout example result: %v\n", result) } } Batch Request/Response package main import ( "fmt" "sync" "time" ) // BatchRequest represents multiple requests processed together type BatchRequest struct { ID string Items []interface{} Response chan BatchResponse } // BatchResponse contains results for all items in a batch type BatchResponse struct { ID string Results []BatchResult Error error } // BatchResult represents the result of processing one item type BatchResult struct { Index int Result interface{} Error error } // BatchProcessor processes requests in batches for efficiency type BatchProcessor struct { requests chan BatchRequest batchSize int batchWindow time.Duration quit chan bool } func NewBatchProcessor(batchSize int, batchWindow time.Duration) *BatchProcessor { return &BatchProcessor{ requests: make(chan BatchRequest, 100), batchSize: batchSize, batchWindow: batchWindow, quit: make(chan bool), } } func (bp *BatchProcessor) Start() { go func() { batch := make([]BatchRequest, 0, bp.batchSize) timer := time.NewTimer(bp.batchWindow) timer.Stop() for { select { case req := <-bp.requests: batch = append(batch, req) if len(batch) == 1 { timer.Reset(bp.batchWindow) } if len(batch) >= bp.batchSize { bp.processBatch(batch) batch = batch[:0] timer.Stop() } case <-timer.C: if len(batch) > 0 { bp.processBatch(batch) batch = batch[:0] } case <-bp.quit: if len(batch) > 0 { bp.processBatch(batch) } return } } }() } func (bp *BatchProcessor) processBatch(batch []BatchRequest) { fmt.Printf("Processing batch of %d requests\n", len(batch)) var wg sync.WaitGroup for _, req := range batch { wg.Add(1) go func(r BatchRequest) { defer wg.Done() bp.processRequest(r) }(req) } wg.Wait() } func (bp *BatchProcessor) processRequest(req BatchRequest) { results := make([]BatchResult, len(req.Items)) for i, item := range req.Items { // Simulate processing each item time.Sleep(10 * time.Millisecond) if num, ok := item.(int); ok { results[i] = BatchResult{ Index: i, Result: num * 3, } } else { results[i] = BatchResult{ Index: i, Error: fmt.Errorf("invalid item type at index %d", i), } } } response := BatchResponse{ ID: req.ID, Results: results, } req.Response <- response } // SendBatchRequest sends a batch request and waits for response func (bp *BatchProcessor) SendBatchRequest(id string, items []interface{}) ([]BatchResult, error) { responseChan := make(chan BatchResponse, 1) request := BatchRequest{ ID: id, Items: items, Response: responseChan, } bp.requests <- request response := <-responseChan return response.Results, response.Error } func (bp *BatchProcessor) Stop() { close(bp.quit) } func main() { processor := NewBatchProcessor(3, 100*time.Millisecond) processor.Start() defer processor.Stop() // Send individual batch requests go func() { results, err := processor.SendBatchRequest("batch1", []interface{}{1, 2, 3, 4, 5}) if err != nil { fmt.Printf("Batch 1 failed: %v\n", err) return } fmt.Println("Batch 1 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() go func() { results, err := processor.SendBatchRequest("batch2", []interface{}{10, 20, 30}) if err != nil { fmt.Printf("Batch 2 failed: %v\n", err) return } fmt.Println("Batch 2 results:") for _, result := range results { if result.Error != nil { fmt.Printf(" Item %d error: %v\n", result.Index, result.Error) } else { fmt.Printf(" Item %d result: %v\n", result.Index, result.Result) } } }() // Wait for processing time.Sleep(500 * time.Millisecond) } Best Practices Always Use Timeouts: Prevent indefinite blocking Handle Context Cancellation: Support graceful cancellation Buffer Response Channels: Avoid blocking responders Error Handling: Always include error information in responses Resource Cleanup: Ensure channels and goroutines are cleaned up Monitoring: Track request/response times and success rates Backpressure: Handle situations when responders are overwhelmed Common Pitfalls Deadlocks: Not buffering response channels Goroutine Leaks: Not handling context cancellation Memory Leaks: Not closing channels properly Blocking Operations: Long-running operations without timeouts Lost Responses: Not handling channel closure Testing Request/Response package main import ( "context" "testing" "time" ) func TestRequestResponse(t *testing.T) { server := NewTimedServer() server.Start() defer server.Stop() // Test successful request result, err := server.SendRequestWithTimeout("test1", 42, 200*time.Millisecond) if err != nil { t.Fatalf("Request failed: %v", err) } if result != 84 { t.Errorf("Expected 84, got %v", result) } // Test timeout _, err = server.SendRequestWithTimeout("test2", 42, 10*time.Millisecond) if err == nil { t.Error("Expected timeout error") } } func TestFuture(t *testing.T) { future := NewFuture() // Test that future is not ready initially if future.IsReady() { t.Error("Future should not be ready initially") } // Set value in goroutine go func() { time.Sleep(50 * time.Millisecond) future.Set("test result", nil) }() // Get value result, err := future.Get() if err != nil { t.Fatalf("Future failed: %v", err) } if result != "test result" { t.Errorf("Expected 'test result', got %v", result) } // Test that future is ready after setting if !future.IsReady() { t.Error("Future should be ready after setting") } } The Request/Response pattern is essential for building synchronous communication systems in Go. It provides the foundation for RPC systems, database operations, and any scenario where you need to wait for a result from an asynchronous operation. ...

    July 31, 2024 · 10 min · Rafiul Alam

    Rate Limiter Pattern in Go

    Go Concurrency Patterns Series: ← Circuit Breaker | Series Overview | Semaphore Pattern → What is the Rate Limiter Pattern? Rate limiting controls the rate at which operations are performed, preventing system overload and ensuring fair resource usage. It’s essential for protecting services from abuse, managing resource consumption, and maintaining system stability under load. Common Algorithms: Token Bucket: Allows bursts up to bucket capacity Fixed Window: Fixed number of requests per time window Sliding Window: Smooth rate limiting over time Leaky Bucket: Constant output rate regardless of input Real-World Use Cases API Rate Limiting: Prevent API abuse and ensure fair usage Database Throttling: Control database query rates File Processing: Limit file processing rate Network Operations: Control bandwidth usage Background Jobs: Throttle job processing User Actions: Prevent spam and abuse Token Bucket Rate Limiter package main import ( "context" "fmt" "sync" "time" ) // TokenBucket implements the token bucket rate limiting algorithm type TokenBucket struct { mu sync.Mutex capacity int // Maximum number of tokens tokens int // Current number of tokens refillRate int // Tokens added per second lastRefill time.Time // Last refill time } // NewTokenBucket creates a new token bucket rate limiter func NewTokenBucket(capacity, refillRate int) *TokenBucket { return &TokenBucket{ capacity: capacity, tokens: capacity, // Start with full bucket refillRate: refillRate, lastRefill: time.Now(), } } // Allow checks if a request should be allowed func (tb *TokenBucket) Allow() bool { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() if tb.tokens > 0 { tb.tokens-- return true } return false } // AllowN checks if n requests should be allowed func (tb *TokenBucket) AllowN(n int) bool { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() if tb.tokens >= n { tb.tokens -= n return true } return false } // Wait waits until a token is available func (tb *TokenBucket) Wait(ctx context.Context) error { for { if tb.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } // refill adds tokens based on elapsed time func (tb *TokenBucket) refill() { now := time.Now() elapsed := now.Sub(tb.lastRefill) tokensToAdd := int(elapsed.Seconds() * float64(tb.refillRate)) if tokensToAdd > 0 { tb.tokens += tokensToAdd if tb.tokens > tb.capacity { tb.tokens = tb.capacity } tb.lastRefill = now } } // GetStats returns current bucket statistics func (tb *TokenBucket) GetStats() (tokens, capacity int) { tb.mu.Lock() defer tb.mu.Unlock() tb.refill() return tb.tokens, tb.capacity } func main() { // Create a token bucket: 5 tokens capacity, 2 tokens per second refill rate limiter := NewTokenBucket(5, 2) fmt.Println("=== Token Bucket Rate Limiter Demo ===") // Test burst capability fmt.Println("\n--- Testing Burst Capability ---") for i := 1; i <= 7; i++ { allowed := limiter.Allow() tokens, capacity := limiter.GetStats() fmt.Printf("Request %d: %s (tokens: %d/%d)\n", i, allowedStatus(allowed), tokens, capacity) } // Wait for refill fmt.Println("\n--- Waiting 3 seconds for refill ---") time.Sleep(3 * time.Second) // Test after refill fmt.Println("\n--- Testing After Refill ---") for i := 1; i <= 4; i++ { allowed := limiter.Allow() tokens, capacity := limiter.GetStats() fmt.Printf("Request %d: %s (tokens: %d/%d)\n", i, allowedStatus(allowed), tokens, capacity) } // Test AllowN fmt.Println("\n--- Testing AllowN (requesting 3 tokens) ---") allowed := limiter.AllowN(3) tokens, capacity := limiter.GetStats() fmt.Printf("Bulk request: %s (tokens: %d/%d)\n", allowedStatus(allowed), tokens, capacity) } func allowedStatus(allowed bool) string { if allowed { return " ALLOWED" } return " DENIED" } Sliding Window Rate Limiter package main import ( "fmt" "sync" "time" ) // SlidingWindow implements sliding window rate limiting type SlidingWindow struct { mu sync.Mutex requests []time.Time limit int // Maximum requests per window window time.Duration // Time window duration } // NewSlidingWindow creates a new sliding window rate limiter func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow { return &SlidingWindow{ requests: make([]time.Time, 0), limit: limit, window: window, } } // Allow checks if a request should be allowed func (sw *SlidingWindow) Allow() bool { sw.mu.Lock() defer sw.mu.Unlock() now := time.Now() sw.cleanOldRequests(now) if len(sw.requests) < sw.limit { sw.requests = append(sw.requests, now) return true } return false } // cleanOldRequests removes requests outside the current window func (sw *SlidingWindow) cleanOldRequests(now time.Time) { cutoff := now.Add(-sw.window) // Find first request within window start := 0 for i, req := range sw.requests { if req.After(cutoff) { start = i break } start = len(sw.requests) // All requests are old } // Keep only recent requests if start > 0 { copy(sw.requests, sw.requests[start:]) sw.requests = sw.requests[:len(sw.requests)-start] } } // GetStats returns current window statistics func (sw *SlidingWindow) GetStats() (current, limit int, window time.Duration) { sw.mu.Lock() defer sw.mu.Unlock() sw.cleanOldRequests(time.Now()) return len(sw.requests), sw.limit, sw.window } // GetRequestTimes returns timestamps of requests in current window func (sw *SlidingWindow) GetRequestTimes() []time.Time { sw.mu.Lock() defer sw.mu.Unlock() sw.cleanOldRequests(time.Now()) result := make([]time.Time, len(sw.requests)) copy(result, sw.requests) return result } func main() { // Create sliding window: 3 requests per 2 seconds limiter := NewSlidingWindow(3, 2*time.Second) fmt.Println("=== Sliding Window Rate Limiter Demo ===") fmt.Println("Limit: 3 requests per 2 seconds") // Test requests over time for i := 1; i <= 8; i++ { allowed := limiter.Allow() current, limit, window := limiter.GetStats() fmt.Printf("Request %d: %s (current: %d/%d in %v window)\n", i, allowedStatus(allowed), current, limit, window) if i == 4 { fmt.Println("--- Waiting 1 second ---") time.Sleep(1 * time.Second) } else if i == 6 { fmt.Println("--- Waiting 1.5 seconds ---") time.Sleep(1500 * time.Millisecond) } else { time.Sleep(200 * time.Millisecond) } } // Show request timeline fmt.Println("\n--- Request Timeline ---") requests := limiter.GetRequestTimes() now := time.Now() for i, req := range requests { age := now.Sub(req) fmt.Printf("Request %d: %v ago\n", i+1, age.Round(time.Millisecond)) } } Fixed Window Rate Limiter package main import ( "fmt" "sync" "time" ) // FixedWindow implements fixed window rate limiting type FixedWindow struct { mu sync.Mutex limit int // Maximum requests per window window time.Duration // Window duration currentCount int // Current window request count windowStart time.Time // Current window start time } // NewFixedWindow creates a new fixed window rate limiter func NewFixedWindow(limit int, window time.Duration) *FixedWindow { return &FixedWindow{ limit: limit, window: window, windowStart: time.Now(), } } // Allow checks if a request should be allowed func (fw *FixedWindow) Allow() bool { fw.mu.Lock() defer fw.mu.Unlock() now := time.Now() // Check if we need to start a new window if now.Sub(fw.windowStart) >= fw.window { fw.currentCount = 0 fw.windowStart = now } if fw.currentCount < fw.limit { fw.currentCount++ return true } return false } // GetStats returns current window statistics func (fw *FixedWindow) GetStats() (current, limit int, windowRemaining time.Duration) { fw.mu.Lock() defer fw.mu.Unlock() now := time.Now() elapsed := now.Sub(fw.windowStart) if elapsed >= fw.window { return 0, fw.limit, fw.window } return fw.currentCount, fw.limit, fw.window - elapsed } func main() { // Create fixed window: 3 requests per 2 seconds limiter := NewFixedWindow(3, 2*time.Second) fmt.Println("=== Fixed Window Rate Limiter Demo ===") fmt.Println("Limit: 3 requests per 2 seconds") // Test requests over time for i := 1; i <= 10; i++ { allowed := limiter.Allow() current, limit, remaining := limiter.GetStats() fmt.Printf("Request %d: %s (current: %d/%d, window resets in: %v)\n", i, allowedStatus(allowed), current, limit, remaining.Round(time.Millisecond)) time.Sleep(400 * time.Millisecond) } } Advanced Rate Limiter with Multiple Algorithms package main import ( "context" "fmt" "sync" "time" ) // RateLimiterType represents different rate limiting algorithms type RateLimiterType int const ( TokenBucketType RateLimiterType = iota SlidingWindowType FixedWindowType ) // RateLimiter interface for different rate limiting algorithms type RateLimiter interface { Allow() bool Wait(ctx context.Context) error GetStats() map[string]interface{} } // MultiRateLimiter combines multiple rate limiters type MultiRateLimiter struct { limiters []RateLimiter names []string } // NewMultiRateLimiter creates a new multi-algorithm rate limiter func NewMultiRateLimiter() *MultiRateLimiter { return &MultiRateLimiter{ limiters: make([]RateLimiter, 0), names: make([]string, 0), } } // AddLimiter adds a rate limiter with a name func (mrl *MultiRateLimiter) AddLimiter(name string, limiter RateLimiter) { mrl.limiters = append(mrl.limiters, limiter) mrl.names = append(mrl.names, name) } // Allow checks if request is allowed by all limiters func (mrl *MultiRateLimiter) Allow() bool { for _, limiter := range mrl.limiters { if !limiter.Allow() { return false } } return true } // Wait waits until all limiters allow the request func (mrl *MultiRateLimiter) Wait(ctx context.Context) error { for _, limiter := range mrl.limiters { if err := limiter.Wait(ctx); err != nil { return err } } return nil } // GetStats returns stats from all limiters func (mrl *MultiRateLimiter) GetStats() map[string]interface{} { stats := make(map[string]interface{}) for i, limiter := range mrl.limiters { stats[mrl.names[i]] = limiter.GetStats() } return stats } // Enhanced TokenBucket with RateLimiter interface type EnhancedTokenBucket struct { *TokenBucket } func (etb *EnhancedTokenBucket) GetStats() map[string]interface{} { tokens, capacity := etb.TokenBucket.GetStats() return map[string]interface{}{ "type": "token_bucket", "tokens": tokens, "capacity": capacity, "rate": etb.refillRate, } } // Enhanced SlidingWindow with RateLimiter interface type EnhancedSlidingWindow struct { *SlidingWindow } func (esw *EnhancedSlidingWindow) Wait(ctx context.Context) error { for { if esw.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } func (esw *EnhancedSlidingWindow) GetStats() map[string]interface{} { current, limit, window := esw.SlidingWindow.GetStats() return map[string]interface{}{ "type": "sliding_window", "current": current, "limit": limit, "window": window.String(), } } // Enhanced FixedWindow with RateLimiter interface type EnhancedFixedWindow struct { *FixedWindow } func (efw *EnhancedFixedWindow) Wait(ctx context.Context) error { for { if efw.Allow() { return nil } select { case <-time.After(time.Millisecond * 10): continue case <-ctx.Done(): return ctx.Err() } } } func (efw *EnhancedFixedWindow) GetStats() map[string]interface{} { current, limit, remaining := efw.FixedWindow.GetStats() return map[string]interface{}{ "type": "fixed_window", "current": current, "limit": limit, "remaining": remaining.String(), } } // RateLimitedService demonstrates rate limiting in a service type RateLimitedService struct { limiter RateLimiter mu sync.Mutex stats struct { totalRequests int allowedRequests int deniedRequests int } } // NewRateLimitedService creates a new rate limited service func NewRateLimitedService(limiter RateLimiter) *RateLimitedService { return &RateLimitedService{ limiter: limiter, } } // ProcessRequest processes a request with rate limiting func (rls *RateLimitedService) ProcessRequest(ctx context.Context, requestID string) error { rls.mu.Lock() rls.stats.totalRequests++ rls.mu.Unlock() if !rls.limiter.Allow() { rls.mu.Lock() rls.stats.deniedRequests++ rls.mu.Unlock() return fmt.Errorf("request %s denied by rate limiter", requestID) } rls.mu.Lock() rls.stats.allowedRequests++ rls.mu.Unlock() // Simulate processing time.Sleep(50 * time.Millisecond) fmt.Printf(" Processed request %s\n", requestID) return nil } // GetServiceStats returns service statistics func (rls *RateLimitedService) GetServiceStats() map[string]interface{} { rls.mu.Lock() defer rls.mu.Unlock() return map[string]interface{}{ "total_requests": rls.stats.totalRequests, "allowed_requests": rls.stats.allowedRequests, "denied_requests": rls.stats.deniedRequests, "rate_limiter": rls.limiter.GetStats(), } } func main() { // Create multi-algorithm rate limiter multiLimiter := NewMultiRateLimiter() // Add different rate limiters multiLimiter.AddLimiter("token_bucket", &EnhancedTokenBucket{ TokenBucket: NewTokenBucket(5, 2), // 5 tokens, 2 per second }) multiLimiter.AddLimiter("sliding_window", &EnhancedSlidingWindow{ SlidingWindow: NewSlidingWindow(3, 2*time.Second), // 3 requests per 2 seconds }) multiLimiter.AddLimiter("fixed_window", &EnhancedFixedWindow{ FixedWindow: NewFixedWindow(4, 3*time.Second), // 4 requests per 3 seconds }) service := NewRateLimitedService(multiLimiter) fmt.Println("=== Multi-Algorithm Rate Limiter Demo ===") fmt.Println("Using Token Bucket (5 tokens, 2/sec) + Sliding Window (3/2sec) + Fixed Window (4/3sec)") // Simulate concurrent requests var wg sync.WaitGroup for i := 1; i <= 15; i++ { wg.Add(1) go func(id int) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() requestID := fmt.Sprintf("req-%d", id) err := service.ProcessRequest(ctx, requestID) if err != nil { fmt.Printf(" %v\n", err) } }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() // Print final statistics fmt.Println("\n=== Final Statistics ===") stats := service.GetServiceStats() fmt.Printf("Total Requests: %d\n", stats["total_requests"]) fmt.Printf("Allowed Requests: %d\n", stats["allowed_requests"]) fmt.Printf("Denied Requests: %d\n", stats["denied_requests"]) fmt.Println("\nRate Limiter Details:") rateLimiterStats := stats["rate_limiter"].(map[string]interface{}) for name, limiterStats := range rateLimiterStats { fmt.Printf(" %s: %+v\n", name, limiterStats) } } Best Practices Choose Right Algorithm: Select based on your specific requirements Token Bucket: Allow bursts, good for APIs Sliding Window: Smooth rate limiting Fixed Window: Simple, memory efficient Configure Appropriately: Set limits based on system capacity Handle Rejections Gracefully: Provide meaningful error messages Monitor Metrics: Track allowed/denied requests and adjust limits Use Context: Support cancellation in Wait operations Consider Distributed Systems: Use Redis or similar for distributed rate limiting Implement Backoff: Add exponential backoff for denied requests Common Pitfalls Too Restrictive: Setting limits too low affects user experience Too Permissive: High limits don’t protect against abuse Memory Leaks: Not cleaning old requests in sliding window Race Conditions: Not properly synchronizing access to counters Ignoring Bursts: Fixed windows can allow double the limit at boundaries Rate limiting is essential for protecting services from overload and ensuring fair resource usage. Choose the right algorithm based on your requirements and always monitor the effectiveness of your rate limiting strategy. ...

    July 24, 2024 · 10 min · Rafiul Alam

    Pub/Sub Pattern in Go

    Go Concurrency Patterns Series: ← Fan-Out/Fan-In | Series Overview | Request/Response → What is the Pub/Sub Pattern? The Publisher/Subscriber (Pub/Sub) pattern is a messaging pattern where publishers send messages without knowing who will receive them, and subscribers receive messages without knowing who sent them. This creates a loosely coupled system where components can communicate through events without direct dependencies. Key Components: Publisher: Sends messages/events Subscriber: Receives and processes messages/events Message Broker: Routes messages from publishers to subscribers Topics/Channels: Categories for organizing messages Real-World Use Cases Event-Driven Architecture: Microservices communication Real-Time Notifications: User activity feeds, alerts Data Streaming: Log aggregation, metrics collection UI Updates: React to state changes across components Workflow Orchestration: Trigger actions based on events Cache Invalidation: Notify when data changes Basic Pub/Sub Implementation package main import ( "fmt" "sync" "time" ) // Message represents a pub/sub message type Message struct { Topic string Payload interface{} } // Subscriber represents a message handler type Subscriber func(Message) // PubSub is a simple in-memory pub/sub system type PubSub struct { mu sync.RWMutex subscribers map[string][]Subscriber closed bool } // NewPubSub creates a new pub/sub instance func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]Subscriber), } } // Subscribe adds a subscriber to a topic func (ps *PubSub) Subscribe(topic string, subscriber Subscriber) { ps.mu.Lock() defer ps.mu.Unlock() if ps.closed { return } ps.subscribers[topic] = append(ps.subscribers[topic], subscriber) } // Publish sends a message to all subscribers of a topic func (ps *PubSub) Publish(topic string, payload interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } message := Message{ Topic: topic, Payload: payload, } // Send to all subscribers asynchronously for _, subscriber := range ps.subscribers[topic] { go subscriber(message) } } // Close shuts down the pub/sub system func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true } func main() { pubsub := NewPubSub() defer pubsub.Close() // Subscribe to user events pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Email service: Welcome %v!\n", msg.Payload) }) pubsub.Subscribe("user.created", func(msg Message) { fmt.Printf("Analytics: New user registered: %v\n", msg.Payload) }) pubsub.Subscribe("user.deleted", func(msg Message) { fmt.Printf("Cleanup service: Remove user data for %v\n", msg.Payload) }) // Publish events pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.created", "[email protected]") pubsub.Publish("user.deleted", "[email protected]") // Wait for async processing time.Sleep(100 * time.Millisecond) } Advanced Pub/Sub with Channels package main import ( "context" "fmt" "sync" "time" ) // Event represents a structured event type Event struct { ID string Type string Timestamp time.Time Data interface{} } // Subscription represents an active subscription type Subscription struct { ID string Topic string Channel chan Event Filter func(Event) bool cancel context.CancelFunc } // Close cancels the subscription func (s *Subscription) Close() { if s.cancel != nil { s.cancel() } } // EventBus is a channel-based pub/sub system type EventBus struct { mu sync.RWMutex subscriptions map[string][]*Subscription buffer int closed bool } // NewEventBus creates a new event bus func NewEventBus(bufferSize int) *EventBus { return &EventBus{ subscriptions: make(map[string][]*Subscription), buffer: bufferSize, } } // Subscribe creates a new subscription with optional filtering func (eb *EventBus) Subscribe(ctx context.Context, topic string, filter func(Event) bool) *Subscription { eb.mu.Lock() defer eb.mu.Unlock() if eb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) subscription := &Subscription{ ID: fmt.Sprintf("sub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan Event, eb.buffer), Filter: filter, cancel: cancel, } eb.subscriptions[topic] = append(eb.subscriptions[topic], subscription) // Clean up subscription when context is cancelled go func() { <-subCtx.Done() eb.unsubscribe(subscription) close(subscription.Channel) }() return subscription } // unsubscribe removes a subscription func (eb *EventBus) unsubscribe(sub *Subscription) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscriptions[sub.Topic] for i, s := range subs { if s.ID == sub.ID { eb.subscriptions[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish sends an event to all matching subscribers func (eb *EventBus) Publish(event Event) { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return } event.Timestamp = time.Now() for _, subscription := range eb.subscriptions[event.Type] { // Apply filter if present if subscription.Filter != nil && !subscription.Filter(event) { continue } // Non-blocking send select { case subscription.Channel <- event: default: // Channel is full, could log this fmt.Printf("Warning: Subscription %s channel is full\n", subscription.ID) } } } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true // Close all subscriptions for _, subs := range eb.subscriptions { for _, sub := range subs { sub.Close() } } } func main() { ctx := context.Background() eventBus := NewEventBus(10) defer eventBus.Close() // Subscribe to all user events userSub := eventBus.Subscribe(ctx, "user", nil) // Subscribe to only high-priority events prioritySub := eventBus.Subscribe(ctx, "user", func(e Event) bool { if data, ok := e.Data.(map[string]interface{}); ok { return data["priority"] == "high" } return false }) // Start event processors go func() { for event := range userSub.Channel { fmt.Printf("User processor: %s - %v\n", event.Type, event.Data) } }() go func() { for event := range prioritySub.Channel { fmt.Printf("Priority processor: %s - %v\n", event.Type, event.Data) } }() // Publish events eventBus.Publish(Event{ ID: "1", Type: "user", Data: map[string]interface{}{ "action": "login", "user": "john", "priority": "low", }, }) eventBus.Publish(Event{ ID: "2", Type: "user", Data: map[string]interface{}{ "action": "payment", "user": "jane", "priority": "high", }, }) time.Sleep(100 * time.Millisecond) } Persistent Pub/Sub with Replay package main import ( "context" "fmt" "sync" "time" ) // StoredEvent represents an event with storage metadata type StoredEvent struct { Event Sequence int64 Stored time.Time } // PersistentEventBus stores events and supports replay type PersistentEventBus struct { mu sync.RWMutex events []StoredEvent sequence int64 subs map[string][]*PersistentSubscription closed bool } // PersistentSubscription supports replay from a specific point type PersistentSubscription struct { ID string Topic string Channel chan StoredEvent FromSeq int64 cancel context.CancelFunc } func (s *PersistentSubscription) Close() { if s.cancel != nil { s.cancel() } } // NewPersistentEventBus creates a new persistent event bus func NewPersistentEventBus() *PersistentEventBus { return &PersistentEventBus{ events: make([]StoredEvent, 0), subs: make(map[string][]*PersistentSubscription), } } // Subscribe creates a subscription with optional replay func (peb *PersistentEventBus) Subscribe(ctx context.Context, topic string, fromSequence int64) *PersistentSubscription { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return nil } subCtx, cancel := context.WithCancel(ctx) sub := &PersistentSubscription{ ID: fmt.Sprintf("psub-%d", time.Now().UnixNano()), Topic: topic, Channel: make(chan StoredEvent, 100), FromSeq: fromSequence, cancel: cancel, } peb.subs[topic] = append(peb.subs[topic], sub) // Replay historical events if requested if fromSequence >= 0 { go peb.replayEvents(sub) } // Clean up on context cancellation go func() { <-subCtx.Done() peb.unsubscribe(sub) close(sub.Channel) }() return sub } // replayEvents sends historical events to a subscription func (peb *PersistentEventBus) replayEvents(sub *PersistentSubscription) { peb.mu.RLock() defer peb.mu.RUnlock() for _, storedEvent := range peb.events { if storedEvent.Sequence >= sub.FromSeq && storedEvent.Type == sub.Topic { select { case sub.Channel <- storedEvent: default: // Channel full, skip } } } } // unsubscribe removes a subscription func (peb *PersistentEventBus) unsubscribe(sub *PersistentSubscription) { peb.mu.Lock() defer peb.mu.Unlock() subs := peb.subs[sub.Topic] for i, s := range subs { if s.ID == sub.ID { peb.subs[sub.Topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish stores and distributes an event func (peb *PersistentEventBus) Publish(event Event) int64 { peb.mu.Lock() defer peb.mu.Unlock() if peb.closed { return -1 } peb.sequence++ storedEvent := StoredEvent{ Event: event, Sequence: peb.sequence, Stored: time.Now(), } // Store event peb.events = append(peb.events, storedEvent) // Distribute to current subscribers for _, sub := range peb.subs[event.Type] { select { case sub.Channel <- storedEvent: default: // Channel full } } return peb.sequence } // GetLastSequence returns the last event sequence number func (peb *PersistentEventBus) GetLastSequence() int64 { peb.mu.RLock() defer peb.mu.RUnlock() return peb.sequence } func main() { ctx := context.Background() eventBus := NewPersistentEventBus() // Publish some initial events eventBus.Publish(Event{ID: "1", Type: "order", Data: "Order created"}) eventBus.Publish(Event{ID: "2", Type: "order", Data: "Order paid"}) eventBus.Publish(Event{ID: "3", Type: "order", Data: "Order shipped"}) fmt.Printf("Published 3 events, last sequence: %d\n", eventBus.GetLastSequence()) // Subscribe from the beginning (replay all events) replaySub := eventBus.Subscribe(ctx, "order", 0) // Subscribe from current point (no replay) liveSub := eventBus.Subscribe(ctx, "order", -1) // Process replayed events go func() { fmt.Println("Replay subscription:") for event := range replaySub.Channel { fmt.Printf(" Replayed: seq=%d, %v\n", event.Sequence, event.Data) } }() // Process live events go func() { fmt.Println("Live subscription:") for event := range liveSub.Channel { fmt.Printf(" Live: seq=%d, %v\n", event.Sequence, event.Data) } }() time.Sleep(100 * time.Millisecond) // Publish new events eventBus.Publish(Event{ID: "4", Type: "order", Data: "Order delivered"}) eventBus.Publish(Event{ID: "5", Type: "order", Data: "Order completed"}) time.Sleep(100 * time.Millisecond) replaySub.Close() liveSub.Close() } Typed Pub/Sub System package main import ( "context" "fmt" "reflect" "sync" ) // TypedEventBus provides type-safe pub/sub type TypedEventBus struct { mu sync.RWMutex handlers map[reflect.Type][]reflect.Value closed bool } // NewTypedEventBus creates a new typed event bus func NewTypedEventBus() *TypedEventBus { return &TypedEventBus{ handlers: make(map[reflect.Type][]reflect.Value), } } // Subscribe registers a handler for a specific event type func (teb *TypedEventBus) Subscribe(handler interface{}) { teb.mu.Lock() defer teb.mu.Unlock() if teb.closed { return } handlerValue := reflect.ValueOf(handler) handlerType := handlerValue.Type() // Validate handler signature: func(EventType) if handlerType.Kind() != reflect.Func || handlerType.NumIn() != 1 || handlerType.NumOut() != 0 { panic("Handler must be func(EventType)") } eventType := handlerType.In(0) teb.handlers[eventType] = append(teb.handlers[eventType], handlerValue) } // Publish sends an event to all registered handlers func (teb *TypedEventBus) Publish(event interface{}) { teb.mu.RLock() defer teb.mu.RUnlock() if teb.closed { return } eventType := reflect.TypeOf(event) eventValue := reflect.ValueOf(event) for _, handler := range teb.handlers[eventType] { go handler.Call([]reflect.Value{eventValue}) } } // Event types type UserCreated struct { UserID string Email string } type OrderPlaced struct { OrderID string UserID string Amount float64 } type PaymentProcessed struct { PaymentID string OrderID string Success bool } func main() { eventBus := NewTypedEventBus() // Subscribe to different event types eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Email service: Send welcome email to %s\n", event.Email) }) eventBus.Subscribe(func(event UserCreated) { fmt.Printf("Analytics: Track user registration %s\n", event.UserID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Inventory: Reserve items for order %s\n", event.OrderID) }) eventBus.Subscribe(func(event OrderPlaced) { fmt.Printf("Payment: Process payment for order %s, amount $%.2f\n", event.OrderID, event.Amount) }) eventBus.Subscribe(func(event PaymentProcessed) { if event.Success { fmt.Printf("Fulfillment: Ship order %s\n", event.OrderID) } else { fmt.Printf("Orders: Cancel order %s due to payment failure\n", event.OrderID) } }) // Publish events eventBus.Publish(UserCreated{ UserID: "user123", Email: "[email protected]", }) eventBus.Publish(OrderPlaced{ OrderID: "order456", UserID: "user123", Amount: 99.99, }) eventBus.Publish(PaymentProcessed{ PaymentID: "pay789", OrderID: "order456", Success: true, }) // Wait for async processing time.Sleep(100 * time.Millisecond) } Best Practices Async Processing: Handle events asynchronously to avoid blocking publishers Error Handling: Implement proper error handling in subscribers Buffering: Use buffered channels to handle bursts of events Graceful Shutdown: Ensure clean shutdown of all subscribers Dead Letter Queues: Handle failed message processing Monitoring: Track message rates, processing times, and failures Type Safety: Use typed events when possible Idempotency: Design subscribers to handle duplicate messages Common Pitfalls Memory Leaks: Not closing subscriptions properly Blocking Publishers: Slow subscribers blocking the entire system Lost Messages: Not handling channel buffer overflows Circular Dependencies: Events triggering other events in loops No Error Handling: Panics in subscribers affecting the system Testing Pub/Sub Systems package main import ( "context" "testing" "time" ) func TestEventBus(t *testing.T) { eventBus := NewEventBus(10) defer eventBus.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Subscribe to events sub := eventBus.Subscribe(ctx, "test", nil) // Publish event testEvent := Event{ ID: "test1", Type: "test", Data: "test data", } eventBus.Publish(testEvent) // Verify event received select { case received := <-sub.Channel: if received.ID != testEvent.ID { t.Errorf("Expected event ID %s, got %s", testEvent.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("Event not received within timeout") } } The Pub/Sub pattern is fundamental for building scalable, event-driven systems in Go. It enables loose coupling between components and supports complex workflows through simple event-based communication. ...

    July 17, 2024 · 9 min · Rafiul Alam

    Once Pattern in Go

    Go Concurrency Patterns Series: ← WaitGroup Pattern | Series Overview | Context Pattern → What is the Once Pattern? The Once pattern uses sync.Once to ensure that a piece of code executes exactly once, regardless of how many goroutines call it. This is essential for thread-safe initialization, singleton patterns, and one-time setup operations in concurrent programs. Key Characteristics: Thread-safe: Multiple goroutines can call it safely Exactly once: Code executes only on the first call Blocking: Subsequent calls wait for the first execution to complete No return values: The function passed to Do() cannot return values Real-World Use Cases Singleton Initialization: Create single instances of objects Configuration Loading: Load config files once at startup Database Connections: Initialize connection pools Logger Setup: Configure logging systems Resource Initialization: Set up expensive resources Feature Flags: Initialize feature flag systems Basic Once Usage package main import ( "fmt" "sync" "time" ) var ( instance *Database once sync.Once ) // Database represents a database connection type Database struct { ConnectionString string IsConnected bool } // Connect simulates database connection func (db *Database) Connect() { fmt.Println("Connecting to database...") time.Sleep(100 * time.Millisecond) // Simulate connection time db.IsConnected = true fmt.Println("Database connected!") } // GetDatabase returns the singleton database instance func GetDatabase() *Database { once.Do(func() { fmt.Println("Initializing database instance...") instance = &Database{ ConnectionString: "localhost:5432", } instance.Connect() }) return instance } func main() { var wg sync.WaitGroup // Multiple goroutines trying to get database instance for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Goroutine %d requesting database\n", id) db := GetDatabase() fmt.Printf("Goroutine %d got database: %+v\n", id, db) }(i) } wg.Wait() // Verify all goroutines got the same instance fmt.Printf("Final instance: %p\n", GetDatabase()) } Configuration Manager with Once package main import ( "encoding/json" "fmt" "os" "sync" ) // Config represents application configuration type Config struct { DatabaseURL string `json:"database_url"` APIKey string `json:"api_key"` Debug bool `json:"debug"` Port int `json:"port"` } // ConfigManager manages application configuration type ConfigManager struct { config *Config once sync.Once err error } // NewConfigManager creates a new config manager func NewConfigManager() *ConfigManager { return &ConfigManager{} } // loadConfig loads configuration from file func (cm *ConfigManager) loadConfig() { fmt.Println("Loading configuration...") // Simulate config file reading configData := `{ "database_url": "postgres://localhost:5432/myapp", "api_key": "secret-api-key-123", "debug": true, "port": 8080 }` var config Config if err := json.Unmarshal([]byte(configData), &config); err != nil { cm.err = fmt.Errorf("failed to parse config: %w", err) return } cm.config = &config fmt.Println("Configuration loaded successfully!") } // GetConfig returns the configuration, loading it once if needed func (cm *ConfigManager) GetConfig() (*Config, error) { cm.once.Do(cm.loadConfig) return cm.config, cm.err } func main() { configManager := NewConfigManager() var wg sync.WaitGroup // Multiple goroutines accessing configuration for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() config, err := configManager.GetConfig() if err != nil { fmt.Printf("Goroutine %d: Error loading config: %v\n", id, err) return } fmt.Printf("Goroutine %d: Port=%d, Debug=%v\n", id, config.Port, config.Debug) }(i) } wg.Wait() } Logger Initialization with Once package main import ( "fmt" "log" "os" "sync" ) // Logger wraps the standard logger with additional functionality type Logger struct { *log.Logger level string } var ( logger *Logger loggerOnce sync.Once ) // initLogger initializes the global logger func initLogger() { fmt.Println("Initializing logger...") // Create log file file, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalln("Failed to open log file:", err) } logger = &Logger{ Logger: log.New(file, "APP: ", log.Ldate|log.Ltime|log.Lshortfile), level: "INFO", } logger.Println("Logger initialized") fmt.Println("Logger setup complete!") } // GetLogger returns the singleton logger instance func GetLogger() *Logger { loggerOnce.Do(initLogger) return logger } // Info logs an info message func (l *Logger) Info(msg string) { l.Printf("[INFO] %s", msg) } // Error logs an error message func (l *Logger) Error(msg string) { l.Printf("[ERROR] %s", msg) } func main() { var wg sync.WaitGroup // Multiple goroutines using the logger for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() logger := GetLogger() logger.Info(fmt.Sprintf("Message from goroutine %d", id)) if id%2 == 0 { logger.Error(fmt.Sprintf("Error from goroutine %d", id)) } }(i) } wg.Wait() // Clean up if logger != nil { logger.Info("Application shutting down") } } Resource Pool Initialization package main import ( "fmt" "sync" "time" ) // Connection represents a database connection type Connection struct { ID int Connected bool } // Connect simulates connecting to database func (c *Connection) Connect() error { time.Sleep(50 * time.Millisecond) // Simulate connection time c.Connected = true return nil } // Close simulates closing the connection func (c *Connection) Close() error { c.Connected = false return nil } // ConnectionPool manages a pool of database connections type ConnectionPool struct { connections []*Connection available chan *Connection once sync.Once initErr error } // NewConnectionPool creates a new connection pool func NewConnectionPool(size int) *ConnectionPool { return &ConnectionPool{ available: make(chan *Connection, size), } } // initialize sets up the connection pool func (cp *ConnectionPool) initialize() { fmt.Println("Initializing connection pool...") poolSize := cap(cp.available) cp.connections = make([]*Connection, poolSize) // Create and connect all connections for i := 0; i < poolSize; i++ { conn := &Connection{ID: i + 1} if err := conn.Connect(); err != nil { cp.initErr = fmt.Errorf("failed to connect connection %d: %w", i+1, err) return } cp.connections[i] = conn cp.available <- conn } fmt.Printf("Connection pool initialized with %d connections\n", poolSize) } // GetConnection gets a connection from the pool func (cp *ConnectionPool) GetConnection() (*Connection, error) { cp.once.Do(cp.initialize) if cp.initErr != nil { return nil, cp.initErr } select { case conn := <-cp.available: return conn, nil case <-time.After(5 * time.Second): return nil, fmt.Errorf("timeout waiting for connection") } } // ReturnConnection returns a connection to the pool func (cp *ConnectionPool) ReturnConnection(conn *Connection) { select { case cp.available <- conn: default: // Pool is full, close the connection conn.Close() } } // Close closes all connections in the pool func (cp *ConnectionPool) Close() error { close(cp.available) for _, conn := range cp.connections { if conn != nil { conn.Close() } } return nil } func main() { pool := NewConnectionPool(3) defer pool.Close() var wg sync.WaitGroup // Multiple goroutines using the connection pool for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() conn, err := pool.GetConnection() if err != nil { fmt.Printf("Worker %d: Failed to get connection: %v\n", id, err) return } fmt.Printf("Worker %d: Got connection %d\n", id, conn.ID) // Simulate work time.Sleep(200 * time.Millisecond) pool.ReturnConnection(conn) fmt.Printf("Worker %d: Returned connection %d\n", id, conn.ID) }(i) } wg.Wait() } Advanced Once Patterns 1. Once with Error Handling package main import ( "fmt" "sync" ) // OnceWithError provides Once functionality with error handling type OnceWithError struct { once sync.Once err error } // Do executes the function once and stores any error func (o *OnceWithError) Do(f func() error) error { o.once.Do(func() { o.err = f() }) return o.err } // ExpensiveResource represents a resource that's expensive to initialize type ExpensiveResource struct { Data string } var ( resource *ExpensiveResource resourceOnce OnceWithError ) // initResource initializes the expensive resource func initResource() error { fmt.Println("Initializing expensive resource...") // Simulate potential failure if false { // Change to true to simulate error return fmt.Errorf("failed to initialize resource") } resource = &ExpensiveResource{ Data: "Important data", } fmt.Println("Resource initialized successfully!") return nil } // GetResource returns the resource, initializing it once if needed func GetResource() (*ExpensiveResource, error) { err := resourceOnce.Do(initResource) if err != nil { return nil, err } return resource, nil } func main() { var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() resource, err := GetResource() if err != nil { fmt.Printf("Goroutine %d: Error: %v\n", id, err) return } fmt.Printf("Goroutine %d: Got resource: %s\n", id, resource.Data) }(i) } wg.Wait() } 2. Resettable Once package main import ( "fmt" "sync" "sync/atomic" ) // ResettableOnce allows resetting the once behavior type ResettableOnce struct { mu sync.Mutex done uint32 } // Do executes the function once func (ro *ResettableOnce) Do(f func()) { if atomic.LoadUint32(&ro.done) == 0 { ro.doSlow(f) } } func (ro *ResettableOnce) doSlow(f func()) { ro.mu.Lock() defer ro.mu.Unlock() if ro.done == 0 { defer atomic.StoreUint32(&ro.done, 1) f() } } // Reset allows the once to be used again func (ro *ResettableOnce) Reset() { ro.mu.Lock() defer ro.mu.Unlock() atomic.StoreUint32(&ro.done, 0) } // IsDone returns true if the function has been executed func (ro *ResettableOnce) IsDone() bool { return atomic.LoadUint32(&ro.done) == 1 } func main() { var once ResettableOnce counter := 0 task := func() { counter++ fmt.Printf("Task executed, counter: %d\n", counter) } // First round fmt.Println("First round:") for i := 0; i < 3; i++ { once.Do(task) } fmt.Printf("Done: %v\n", once.IsDone()) // Reset and second round fmt.Println("\nAfter reset:") once.Reset() fmt.Printf("Done: %v\n", once.IsDone()) for i := 0; i < 3; i++ { once.Do(task) } } Best Practices Use for Initialization: Perfect for one-time setup operations Keep Functions Simple: The function passed to Do() should be straightforward Handle Errors Separately: Use wrapper types for error handling Avoid Side Effects: Be careful with functions that have external side effects Don’t Nest Once Calls: Avoid calling Do() from within another Do() Consider Alternatives: Use init() for package-level initialization when appropriate Common Pitfalls 1. Expecting Return Values // Bad: Once.Do doesn't support return values var once sync.Once var result string func badExample() string { once.Do(func() { // Can't return from here result = "computed value" }) return result // This works but is not ideal } // Good: Use a wrapper or store results in accessible variables type OnceResult struct { once sync.Once result string err error } func (or *OnceResult) Get() (string, error) { or.once.Do(func() { or.result, or.err = computeValue() }) return or.result, or.err } 2. Panic in Once Function // Bad: Panic prevents future calls var once sync.Once func badOnceFunc() { once.Do(func() { panic("something went wrong") // Once will never execute again }) } // Good: Handle panics appropriately func goodOnceFunc() { once.Do(func() { defer func() { if r := recover(); r != nil { // Handle panic appropriately fmt.Printf("Recovered from panic: %v\n", r) } }() // risky operation }) } Testing Once Patterns package main import ( "sync" "testing" ) func TestOnceExecution(t *testing.T) { var once sync.Once counter := 0 var wg sync.WaitGroup // Start multiple goroutines for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() once.Do(func() { counter++ }) }() } wg.Wait() if counter != 1 { t.Errorf("Expected counter to be 1, got %d", counter) } } func TestOnceWithError(t *testing.T) { var onceErr OnceWithError callCount := 0 // First call with error err1 := onceErr.Do(func() error { callCount++ return fmt.Errorf("test error") }) // Second call should return same error without executing function err2 := onceErr.Do(func() error { callCount++ return nil }) if callCount != 1 { t.Errorf("Expected function to be called once, got %d", callCount) } if err1 == nil || err2 == nil { t.Error("Expected both calls to return error") } if err1.Error() != err2.Error() { t.Error("Expected same error from both calls") } } The Once pattern is essential for thread-safe initialization in Go. It ensures that expensive or critical setup operations happen exactly once, making it perfect for singletons, configuration loading, and resource initialization in concurrent applications. ...

    July 10, 2024 · 9 min · Rafiul Alam