File monitoring is a common requirement in many applications: watching for configuration changes, processing uploads, syncing directories, or building development tools. In this post, we’ll build a production-ready file monitoring service that tracks word counts across all files in a directory using Go’s powerful concurrency primitives.

The Challenge

Build a service that:

  • Monitors a directory and all its subdirectories
  • Counts words in all files in real-time
  • Updates counts when files are modified, created, or deleted
  • Processes multiple files concurrently
  • Provides current statistics on demand
  • Handles errors gracefully

Why This Matters

Real-time file monitoring powers many production systems:

  • Development tools: Auto-reloading servers, build systems
  • Data pipelines: Processing file uploads as they arrive
  • Configuration management: Detecting and applying config changes
  • Content management: Indexing documents in real-time
  • Security tools: Detecting unauthorized file modifications

Architecture Overview

graph TB A[File Watcher] -->|File Events| B[Event Channel] B --> C[Event Processor] C -->|New File| D[Word Counter Pool] C -->|Modified| D C -->|Deleted| E[Stats Manager] D -->|Results| F[Results Channel] F --> E G[API Server] -->|Get Stats| E E -->|Current Counts| G style A fill:#3498db,color:#fff style D fill:#9b59b6,color:#fff style E fill:#27ae60,color:#fff subgraph "Worker Pool" D W1[Worker 1] W2[Worker 2] W3[Worker N] D -.-> W1 D -.-> W2 D -.-> W3 end

Implementation

Step 1: Project Setup and Dependencies

package main

import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
    "time"

    "github.com/fsnotify/fsnotify"
)

// Install dependency: go get github.com/fsnotify/fsnotify

Step 2: Core Data Structures

// FileStats holds statistics for a single file
type FileStats struct {
    Path      string
    WordCount int
    LastSeen  time.Time
}

// MonitorResult represents the result of processing a file
type MonitorResult struct {
    Path      string
    WordCount int
    Error     error
}

// FileMonitor is our real-time file monitoring service
type FileMonitor struct {
    rootDir     string
    watcher     *fsnotify.Watcher
    stats       map[string]*FileStats
    statsMux    sync.RWMutex

    // Channels for coordination
    fileJobs    chan string
    results     chan MonitorResult

    // Worker pool
    numWorkers  int

    // Context for graceful shutdown
    ctx         context.Context
    cancel      context.CancelFunc
    wg          sync.WaitGroup
}

// NewFileMonitor creates a new file monitoring service
func NewFileMonitor(rootDir string, numWorkers int) (*FileMonitor, error) {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        return nil, fmt.Errorf("failed to create watcher: %w", err)
    }

    ctx, cancel := context.WithCancel(context.Background())

    fm := &FileMonitor{
        rootDir:    rootDir,
        watcher:    watcher,
        stats:      make(map[string]*FileStats),
        fileJobs:   make(chan string, 100),
        results:    make(chan MonitorResult, 100),
        numWorkers: numWorkers,
        ctx:        ctx,
        cancel:     cancel,
    }

    return fm, nil
}

Step 3: Worker Pool for Word Counting

// startWorkers launches the worker pool
func (fm *FileMonitor) startWorkers() {
    for i := 0; i < fm.numWorkers; i++ {
        fm.wg.Add(1)
        go fm.worker(i)
    }
}

// worker processes files from the job channel
func (fm *FileMonitor) worker(id int) {
    defer fm.wg.Done()

    log.Printf("Worker %d started", id)

    for {
        select {
        case <-fm.ctx.Done():
            log.Printf("Worker %d shutting down", id)
            return

        case filePath := <-fm.fileJobs:
            count, err := fm.countWords(filePath)

            result := MonitorResult{
                Path:      filePath,
                WordCount: count,
                Error:     err,
            }

            // Send result (non-blocking to prevent deadlock)
            select {
            case fm.results <- result:
            case <-fm.ctx.Done():
                return
            }
        }
    }
}

// countWords counts words in a file
func (fm *FileMonitor) countWords(filePath string) (int, error) {
    file, err := os.Open(filePath)
    if err != nil {
        return 0, fmt.Errorf("failed to open file: %w", err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanWords)

    count := 0
    for scanner.Scan() {
        count++
    }

    if err := scanner.Err(); err != nil {
        return count, fmt.Errorf("error scanning file: %w", err)
    }

    return count, nil
}

Step 4: Results Processor

// processResults handles word count results
func (fm *FileMonitor) processResults() {
    fm.wg.Add(1)
    defer fm.wg.Done()

    for {
        select {
        case <-fm.ctx.Done():
            log.Println("Results processor shutting down")
            return

        case result := <-fm.results:
            if result.Error != nil {
                log.Printf("Error processing %s: %v", result.Path, result.Error)
                continue
            }

            fm.statsMux.Lock()
            fm.stats[result.Path] = &FileStats{
                Path:      result.Path,
                WordCount: result.WordCount,
                LastSeen:  time.Now(),
            }
            fm.statsMux.Unlock()

            log.Printf("Updated: %s (%d words)", result.Path, result.WordCount)
        }
    }
}

Step 5: File System Watcher

// watchFiles monitors the file system for changes
func (fm *FileMonitor) watchFiles() {
    fm.wg.Add(1)
    defer fm.wg.Done()

    for {
        select {
        case <-fm.ctx.Done():
            log.Println("File watcher shutting down")
            return

        case event, ok := <-fm.watcher.Events:
            if !ok {
                return
            }

            fm.handleEvent(event)

        case err, ok := <-fm.watcher.Errors:
            if !ok {
                return
            }
            log.Printf("Watcher error: %v", err)
        }
    }
}

// handleEvent processes file system events
func (fm *FileMonitor) handleEvent(event fsnotify.Event) {
    // Ignore directories and non-text files
    if isDir(event.Name) || !isTextFile(event.Name) {
        return
    }

    switch {
    case event.Op&fsnotify.Write == fsnotify.Write:
        log.Printf("Modified: %s", event.Name)
        fm.queueFile(event.Name)

    case event.Op&fsnotify.Create == fsnotify.Create:
        log.Printf("Created: %s", event.Name)
        fm.queueFile(event.Name)

    case event.Op&fsnotify.Remove == fsnotify.Remove:
        log.Printf("Removed: %s", event.Name)
        fm.removeFile(event.Name)

    case event.Op&fsnotify.Rename == fsnotify.Rename:
        log.Printf("Renamed: %s", event.Name)
        fm.removeFile(event.Name)
    }
}

// queueFile sends a file to the worker pool
func (fm *FileMonitor) queueFile(filePath string) {
    select {
    case fm.fileJobs <- filePath:
    case <-fm.ctx.Done():
    default:
        log.Printf("Warning: job queue full, skipping %s", filePath)
    }
}

// removeFile removes a file from statistics
func (fm *FileMonitor) removeFile(filePath string) {
    fm.statsMux.Lock()
    delete(fm.stats, filePath)
    fm.statsMux.Unlock()
    log.Printf("Removed from stats: %s", filePath)
}

Step 6: Directory Walking and Initialization

// Start begins monitoring
func (fm *FileMonitor) Start() error {
    // Start worker pool
    fm.startWorkers()

    // Start results processor
    go fm.processResults()

    // Start file watcher
    go fm.watchFiles()

    // Walk directory and add all subdirectories to watcher
    if err := fm.addWatchers(); err != nil {
        return fmt.Errorf("failed to add watchers: %w", err)
    }

    // Initial scan of all files
    if err := fm.initialScan(); err != nil {
        return fmt.Errorf("failed to perform initial scan: %w", err)
    }

    log.Printf("Monitoring started on %s with %d workers", fm.rootDir, fm.numWorkers)
    return nil
}

// addWatchers recursively adds directories to the watcher
func (fm *FileMonitor) addWatchers() error {
    return filepath.Walk(fm.rootDir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        if info.IsDir() {
            if err := fm.watcher.Add(path); err != nil {
                return fmt.Errorf("failed to watch %s: %w", path, err)
            }
            log.Printf("Watching directory: %s", path)
        }

        return nil
    })
}

// initialScan counts words in all existing files
func (fm *FileMonitor) initialScan() error {
    return filepath.Walk(fm.rootDir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }

        if !info.IsDir() && isTextFile(path) {
            fm.queueFile(path)
        }

        return nil
    })
}

Step 7: Statistics API and Helpers

// GetStats returns current statistics
func (fm *FileMonitor) GetStats() map[string]FileStats {
    fm.statsMux.RLock()
    defer fm.statsMux.RUnlock()

    // Create a copy to avoid race conditions
    stats := make(map[string]FileStats, len(fm.stats))
    for path, stat := range fm.stats {
        stats[path] = *stat
    }

    return stats
}

// GetTotalWords returns total word count across all files
func (fm *FileMonitor) GetTotalWords() int {
    fm.statsMux.RLock()
    defer fm.statsMux.RUnlock()

    total := 0
    for _, stat := range fm.stats {
        total += stat.WordCount
    }

    return total
}

// Shutdown gracefully stops the monitor
func (fm *FileMonitor) Shutdown() {
    log.Println("Initiating shutdown...")

    fm.cancel()           // Signal all goroutines to stop
    fm.watcher.Close()    // Close file watcher

    // Wait for all goroutines to finish
    fm.wg.Wait()

    close(fm.fileJobs)
    close(fm.results)

    log.Println("Shutdown complete")
}

// Helper functions
func isDir(path string) bool {
    info, err := os.Stat(path)
    return err == nil && info.IsDir()
}

func isTextFile(path string) bool {
    ext := strings.ToLower(filepath.Ext(path))
    textExts := map[string]bool{
        ".txt": true, ".md": true, ".go": true, ".js": true,
        ".py": true, ".java": true, ".c": true, ".cpp": true,
        ".html": true, ".css": true, ".json": true, ".xml": true,
    }
    return textExts[ext]
}

Complete Example

func main() {
    // Monitor current directory with 4 workers
    monitor, err := NewFileMonitor("./test-files", 4)
    if err != nil {
        log.Fatalf("Failed to create monitor: %v", err)
    }

    if err := monitor.Start(); err != nil {
        log.Fatalf("Failed to start monitor: %v", err)
    }

    // Let it run and collect initial data
    time.Sleep(2 * time.Second)

    // Print statistics every 5 seconds
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    // Handle graceful shutdown
    done := make(chan struct{})

    go func() {
        for {
            select {
            case <-ticker.C:
                printStats(monitor)
            case <-done:
                return
            }
        }
    }()

    // Run for 30 seconds, then shutdown
    time.Sleep(30 * time.Second)
    close(done)

    monitor.Shutdown()

    // Final statistics
    fmt.Println("\n=== Final Statistics ===")
    printStats(monitor)
}

func printStats(monitor *FileMonitor) {
    stats := monitor.GetStats()
    total := monitor.GetTotalWords()

    fmt.Printf("\n--- Statistics (Total: %d words across %d files) ---\n", total, len(stats))

    for path, stat := range stats {
        fmt.Printf("  %s: %d words (updated: %s)\n",
            path,
            stat.WordCount,
            stat.LastSeen.Format("15:04:05"))
    }
}

Data Flow Diagram

sequenceDiagram participant FS as File System participant W as Watcher participant J as Job Queue participant WP as Worker Pool participant R as Results Channel participant S as Stats Manager participant U as User/API FS->>W: File Modified Event W->>W: Filter & Validate W->>J: Queue File Path J->>WP: Worker picks job WP->>WP: Count words WP->>R: Send result R->>S: Update stats S->>S: Lock & Update map U->>S: GetStats() S->>U: Return copy of stats Note over WP: Multiple workers
process concurrently FS->>W: File Deleted W->>S: Remove from stats

Concurrency Patterns Explained

1. Worker Pool Pattern

graph LR A[Job Queue] --> W1[Worker 1] A --> W2[Worker 2] A --> W3[Worker 3] A --> W4[Worker 4] W1 --> B[Results] W2 --> B W3 --> B W4 --> B style A fill:#3498db,color:#fff style B fill:#27ae60,color:#fff

Benefits:

  • Limits concurrent file I/O operations
  • Prevents resource exhaustion
  • Provides backpressure through buffered channel

2. Fan-Out, Fan-In Pattern

// Fan-out: Multiple workers consume from one channel
for i := 0; i < numWorkers; i++ {
    go worker(jobs) // All workers read from same channel
}

// Fan-in: Results converge into single channel
func worker(jobs <-chan string) {
    for job := range jobs {
        result := process(job)
        results <- result // All write to same channel
    }
}

3. Mutex for Shared State

// Reader lock (multiple readers allowed)
fm.statsMux.RLock()
data := fm.stats[key]
fm.statsMux.RUnlock()

// Writer lock (exclusive access)
fm.statsMux.Lock()
fm.stats[key] = newValue
fm.statsMux.Unlock()

Performance Testing

func BenchmarkWordCounting(b *testing.B) {
    // Create test file
    testFile := createTestFile(b, 10000) // 10k words
    defer os.Remove(testFile)

    monitor, _ := NewFileMonitor(".", 4)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        monitor.countWords(testFile)
    }
}

func TestConcurrentUpdates(t *testing.T) {
    monitor, err := NewFileMonitor("./test-dir", 10)
    if err != nil {
        t.Fatalf("Failed to create monitor: %v", err)
    }

    monitor.Start()
    defer monitor.Shutdown()

    // Simulate concurrent file updates
    var wg sync.WaitGroup
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            filename := fmt.Sprintf("./test-dir/file%d.txt", id)
            content := strings.Repeat("word ", 100*id)

            os.WriteFile(filename, []byte(content), 0644)
            time.Sleep(10 * time.Millisecond)
            os.Remove(filename)
        }(i)
    }

    wg.Wait()
    time.Sleep(1 * time.Second) // Allow processing to complete

    // Verify no panics and stats are consistent
    stats := monitor.GetStats()
    t.Logf("Processed %d files", len(stats))
}

Real-World Extensions

1. Add File Type Filtering

type FileFilter func(string) bool

func (fm *FileMonitor) WithFilter(filter FileFilter) *FileMonitor {
    fm.filter = filter
    return fm
}

// Usage
monitor.WithFilter(func(path string) bool {
    return strings.HasSuffix(path, ".go")
})

2. Add Metrics and Monitoring

type Metrics struct {
    FilesProcessed   int64
    TotalWords       int64
    ErrorsEncountered int64
    ProcessingTime   time.Duration
}

func (fm *FileMonitor) GetMetrics() Metrics {
    // Return prometheus metrics, etc.
}

3. Add Rate Limiting

import "golang.org/x/time/rate"

func (fm *FileMonitor) worker(id int, limiter *rate.Limiter) {
    for filePath := range fm.fileJobs {
        limiter.Wait(fm.ctx) // Rate limit file operations
        // ... process file
    }
}

Production Considerations

  1. Memory Management: For large directories, consider streaming file processing
  2. Error Recovery: Implement retry logic for transient failures
  3. Graceful Degradation: Continue operating even if some files fail
  4. Resource Limits: Tune worker pool size based on available CPU/IO
  5. Monitoring: Export metrics to Prometheus/Grafana
  6. Testing: Test with large files, many files, rapid changes

Performance Characteristics

  • Throughput: Processes 1000+ files/second (depends on file size)
  • Latency: Sub-second detection of file changes
  • Memory: O(n) where n = number of files monitored
  • CPU: Scales with worker pool size
  • Concurrency: Safe for concurrent access to statistics

Conclusion

This file monitoring service demonstrates essential Go concurrency patterns:

  • ✅ Worker pool for bounded concurrency
  • ✅ Fan-out/fan-in for parallel processing
  • ✅ Channels for communication between goroutines
  • ✅ Context for graceful shutdown
  • ✅ Mutex for safe shared state access

These patterns are applicable to many real-world scenarios: image processing pipelines, log aggregation, distributed task processing, and more.


Previous in series: Building a Crash-Resistant Log Service

Next in series: Building an MCP File Operations Tool in Go

Source code: Available on GitHub with full test suite and benchmarks.