Go Concurrency Patterns Series: ← Semaphore Pattern | Series Overview


What is the Actor Model?

The Actor Model is a conceptual model for concurrent computation where “actors” are the fundamental units of computation. Each actor has its own isolated state, processes messages sequentially, and can create other actors, send messages, or change its behavior in response to messages.

Key Principles:

  • Isolated State: Each actor maintains its own private state
  • Message Passing: Actors communicate only through asynchronous messages
  • Sequential Processing: Each actor processes one message at a time
  • Location Transparency: Actors can be local or remote
  • Fault Tolerance: Actor failures are isolated and recoverable

Real-World Use Cases

  • Distributed Systems: Microservices communication
  • Game Servers: Player state management
  • IoT Systems: Device state and communication
  • Financial Systems: Transaction processing
  • Chat Applications: User session management
  • Workflow Engines: Task orchestration

Basic Actor Implementation

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Message represents a message sent to an actor
type Message interface{}

// Actor represents the basic actor interface
type Actor interface {
    Receive(message Message)
    Start(ctx context.Context)
    Stop()
    Send(message Message)
    GetID() string
}

// BaseActor provides basic actor functionality
type BaseActor struct {
    id       string
    mailbox  chan Message
    quit     chan struct{}
    wg       sync.WaitGroup
    behavior func(Message)
}

// NewBaseActor creates a new base actor
func NewBaseActor(id string, behavior func(Message)) *BaseActor {
    return &BaseActor{
        id:       id,
        mailbox:  make(chan Message, 100),
        quit:     make(chan struct{}),
        behavior: behavior,
    }
}

// GetID returns the actor ID
func (ba *BaseActor) GetID() string {
    return ba.id
}

// Send sends a message to the actor
func (ba *BaseActor) Send(message Message) {
    select {
    case ba.mailbox <- message:
    case <-ba.quit:
        fmt.Printf("Actor %s: Cannot send message, actor is stopped\n", ba.id)
    }
}

// Start starts the actor's message processing loop
func (ba *BaseActor) Start(ctx context.Context) {
    ba.wg.Add(1)
    go func() {
        defer ba.wg.Done()
        fmt.Printf("Actor %s: Started\n", ba.id)
        
        for {
            select {
            case message := <-ba.mailbox:
                ba.Receive(message)
            case <-ba.quit:
                fmt.Printf("Actor %s: Stopped\n", ba.id)
                return
            case <-ctx.Done():
                fmt.Printf("Actor %s: Context cancelled\n", ba.id)
                return
            }
        }
    }()
}

// Receive processes a received message
func (ba *BaseActor) Receive(message Message) {
    if ba.behavior != nil {
        ba.behavior(message)
    }
}

// Stop stops the actor
func (ba *BaseActor) Stop() {
    close(ba.quit)
    ba.wg.Wait()
}

// Common message types
type StartMessage struct{}
type StopMessage struct{}
type PingMessage struct {
    Sender Actor
}
type PongMessage struct {
    Sender Actor
}

// CounterActor demonstrates a stateful actor
type CounterActor struct {
    *BaseActor
    count int
}

// CounterMessage types
type IncrementMessage struct{}
type DecrementMessage struct{}
type GetCountMessage struct {
    ResponseChan chan int
}

// NewCounterActor creates a new counter actor
func NewCounterActor(id string) *CounterActor {
    ca := &CounterActor{
        count: 0,
    }
    
    ca.BaseActor = NewBaseActor(id, ca.handleMessage)
    return ca
}

// handleMessage handles counter-specific messages
func (ca *CounterActor) handleMessage(message Message) {
    switch msg := message.(type) {
    case IncrementMessage:
        ca.count++
        fmt.Printf("Counter %s: Incremented to %d\n", ca.id, ca.count)
        
    case DecrementMessage:
        ca.count--
        fmt.Printf("Counter %s: Decremented to %d\n", ca.id, ca.count)
        
    case GetCountMessage:
        fmt.Printf("Counter %s: Current count is %d\n", ca.id, ca.count)
        msg.ResponseChan <- ca.count
        
    case PingMessage:
        fmt.Printf("Counter %s: Received ping from %s\n", ca.id, msg.Sender.GetID())
        msg.Sender.Send(PongMessage{Sender: ca})
        
    default:
        fmt.Printf("Counter %s: Unknown message type: %T\n", ca.id, message)
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    fmt.Println("=== Basic Actor Model Demo ===")
    
    // Create counter actors
    counter1 := NewCounterActor("counter-1")
    counter2 := NewCounterActor("counter-2")
    
    // Start actors
    counter1.Start(ctx)
    counter2.Start(ctx)
    
    defer counter1.Stop()
    defer counter2.Stop()
    
    // Send messages to actors
    counter1.Send(IncrementMessage{})
    counter1.Send(IncrementMessage{})
    counter1.Send(IncrementMessage{})
    
    counter2.Send(IncrementMessage{})
    counter2.Send(DecrementMessage{})
    
    // Ping-pong between actors
    counter1.Send(PingMessage{Sender: counter2})
    
    // Get current counts
    responseChan1 := make(chan int)
    responseChan2 := make(chan int)
    
    counter1.Send(GetCountMessage{ResponseChan: responseChan1})
    counter2.Send(GetCountMessage{ResponseChan: responseChan2})
    
    count1 := <-responseChan1
    count2 := <-responseChan2
    
    fmt.Printf("Final counts - Counter1: %d, Counter2: %d\n", count1, count2)
    
    time.Sleep(1 * time.Second)
}

Advanced Actor System with Supervision

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ActorRef represents a reference to an actor
type ActorRef struct {
    id      string
    mailbox chan Message
}

// Send sends a message to the actor
func (ref *ActorRef) Send(message Message) {
    select {
    case ref.mailbox <- message:
    default:
        fmt.Printf("ActorRef %s: Mailbox full, dropping message\n", ref.id)
    }
}

// GetID returns the actor ID
func (ref *ActorRef) GetID() string {
    return ref.id
}

// ActorSystem manages actors and provides supervision
type ActorSystem struct {
    mu      sync.RWMutex
    actors  map[string]*ManagedActor
    ctx     context.Context
    cancel  context.CancelFunc
}

// ManagedActor wraps an actor with management capabilities
type ManagedActor struct {
    ref       *ActorRef
    behavior  ActorBehavior
    supervisor *ActorRef
    children  map[string]*ActorRef
    mu        sync.RWMutex
    restarts  int
    maxRestarts int
}

// ActorBehavior defines how an actor processes messages
type ActorBehavior func(ctx ActorContext, message Message)

// ActorContext provides context for actor operations
type ActorContext struct {
    Self     *ActorRef
    System   *ActorSystem
    Sender   *ActorRef
}

// NewActorSystem creates a new actor system
func NewActorSystem() *ActorSystem {
    ctx, cancel := context.WithCancel(context.Background())
    return &ActorSystem{
        actors: make(map[string]*ManagedActor),
        ctx:    ctx,
        cancel: cancel,
    }
}

// ActorOf creates a new actor
func (as *ActorSystem) ActorOf(id string, behavior ActorBehavior, supervisor *ActorRef) *ActorRef {
    as.mu.Lock()
    defer as.mu.Unlock()
    
    ref := &ActorRef{
        id:      id,
        mailbox: make(chan Message, 100),
    }
    
    actor := &ManagedActor{
        ref:         ref,
        behavior:    behavior,
        supervisor:  supervisor,
        children:    make(map[string]*ActorRef),
        maxRestarts: 3,
    }
    
    as.actors[id] = actor
    
    // Start actor goroutine
    go as.runActor(actor)
    
    fmt.Printf("ActorSystem: Created actor %s\n", id)
    return ref
}

// runActor runs the actor's message processing loop
func (as *ActorSystem) runActor(actor *ManagedActor) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Actor %s: Panic recovered: %v\n", actor.ref.id, r)
            as.handleActorFailure(actor, fmt.Errorf("panic: %v", r))
        }
    }()
    
    for {
        select {
        case message := <-actor.ref.mailbox:
            ctx := ActorContext{
                Self:   actor.ref,
                System: as,
            }
            
            actor.behavior(ctx, message)
            
        case <-as.ctx.Done():
            fmt.Printf("Actor %s: System shutdown\n", actor.ref.id)
            return
        }
    }
}

// handleActorFailure handles actor failures and restarts
func (as *ActorSystem) handleActorFailure(actor *ManagedActor, err error) {
    actor.mu.Lock()
    defer actor.mu.Unlock()
    
    actor.restarts++
    fmt.Printf("Actor %s: Failed with error: %v (restart %d/%d)\n", 
        actor.ref.id, err, actor.restarts, actor.maxRestarts)
    
    if actor.restarts <= actor.maxRestarts {
        // Restart the actor
        go as.runActor(actor)
        fmt.Printf("Actor %s: Restarted\n", actor.ref.id)
    } else {
        // Stop the actor and notify supervisor
        fmt.Printf("Actor %s: Max restarts exceeded, stopping\n", actor.ref.id)
        if actor.supervisor != nil {
            actor.supervisor.Send(ActorFailedMessage{
                FailedActor: actor.ref,
                Error:       err,
            })
        }
    }
}

// Stop stops the actor system
func (as *ActorSystem) Stop() {
    as.cancel()
    
    // Wait a bit for actors to stop gracefully
    time.Sleep(100 * time.Millisecond)
    
    fmt.Println("ActorSystem: Stopped")
}

// Message types for actor system
type ActorFailedMessage struct {
    FailedActor *ActorRef
    Error       error
}

type CreateChildMessage struct {
    ChildID   string
    Behavior  ActorBehavior
    ResponseChan chan *ActorRef
}

type WorkMessage struct {
    Data string
}

type ResultMessage struct {
    Result string
    From   *ActorRef
}

// WorkerActor demonstrates a worker that can fail
func WorkerBehavior(ctx ActorContext, message Message) {
    switch msg := message.(type) {
    case WorkMessage:
        fmt.Printf("Worker %s: Processing work: %s\n", ctx.Self.id, msg.Data)
        
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        
        // Simulate random failures
        if len(msg.Data)%7 == 0 {
            panic("simulated worker failure")
        }
        
        result := fmt.Sprintf("processed-%s", msg.Data)
        fmt.Printf("Worker %s: Work completed: %s\n", ctx.Self.id, result)
        
    default:
        fmt.Printf("Worker %s: Unknown message: %T\n", ctx.Self.id, message)
    }
}

// SupervisorActor manages worker actors
func SupervisorBehavior(ctx ActorContext, message Message) {
    switch msg := message.(type) {
    case CreateChildMessage:
        childRef := ctx.System.ActorOf(msg.ChildID, msg.Behavior, ctx.Self)
        msg.ResponseChan <- childRef
        
    case ActorFailedMessage:
        fmt.Printf("Supervisor %s: Child actor %s failed: %v\n", 
            ctx.Self.id, msg.FailedActor.id, msg.Error)
        
        // Create replacement worker
        newWorkerID := fmt.Sprintf("%s-replacement", msg.FailedActor.id)
        ctx.System.ActorOf(newWorkerID, WorkerBehavior, ctx.Self)
        fmt.Printf("Supervisor %s: Created replacement worker %s\n", 
            ctx.Self.id, newWorkerID)
        
    case WorkMessage:
        // Delegate work to children (simplified)
        fmt.Printf("Supervisor %s: Delegating work: %s\n", ctx.Self.id, msg.Data)
        
    default:
        fmt.Printf("Supervisor %s: Unknown message: %T\n", ctx.Self.id, message)
    }
}

func main() {
    fmt.Println("=== Advanced Actor System Demo ===")
    
    system := NewActorSystem()
    defer system.Stop()
    
    // Create supervisor
    supervisor := system.ActorOf("supervisor", SupervisorBehavior, nil)
    
    // Create workers through supervisor
    responseChan := make(chan *ActorRef)
    
    supervisor.Send(CreateChildMessage{
        ChildID:      "worker-1",
        Behavior:     WorkerBehavior,
        ResponseChan: responseChan,
    })
    worker1 := <-responseChan
    
    supervisor.Send(CreateChildMessage{
        ChildID:      "worker-2", 
        Behavior:     WorkerBehavior,
        ResponseChan: responseChan,
    })
    worker2 := <-responseChan
    
    // Send work to workers
    workItems := []string{
        "task-1", "task-2", "task-3", "failure", // "failure" will cause panic
        "task-4", "task-5", "another", "task-6",
    }
    
    for i, work := range workItems {
        var worker *ActorRef
        if i%2 == 0 {
            worker = worker1
        } else {
            worker = worker2
        }
        
        worker.Send(WorkMessage{Data: work})
        time.Sleep(200 * time.Millisecond)
    }
    
    // Wait for processing
    time.Sleep(3 * time.Second)
}

Distributed Actor System

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net"
    "sync"
    "time"
)

// RemoteMessage represents a message that can be sent over network
type RemoteMessage struct {
    Type    string      `json:"type"`
    Payload interface{} `json:"payload"`
    From    string      `json:"from"`
    To      string      `json:"to"`
}

// DistributedActorSystem extends ActorSystem with network capabilities
type DistributedActorSystem struct {
    *ActorSystem
    nodeID    string
    address   string
    listener  net.Listener
    peers     map[string]net.Conn
    peersMu   sync.RWMutex
}

// NewDistributedActorSystem creates a new distributed actor system
func NewDistributedActorSystem(nodeID, address string) *DistributedActorSystem {
    return &DistributedActorSystem{
        ActorSystem: NewActorSystem(),
        nodeID:      nodeID,
        address:     address,
        peers:       make(map[string]net.Conn),
    }
}

// Start starts the distributed actor system
func (das *DistributedActorSystem) Start() error {
    listener, err := net.Listen("tcp", das.address)
    if err != nil {
        return err
    }
    
    das.listener = listener
    
    // Accept incoming connections
    go func() {
        for {
            conn, err := listener.Accept()
            if err != nil {
                return
            }
            
            go das.handleConnection(conn)
        }
    }()
    
    fmt.Printf("DistributedActorSystem %s: Started on %s\n", das.nodeID, das.address)
    return nil
}

// ConnectToPeer connects to a peer node
func (das *DistributedActorSystem) ConnectToPeer(peerID, peerAddress string) error {
    conn, err := net.Dial("tcp", peerAddress)
    if err != nil {
        return err
    }
    
    das.peersMu.Lock()
    das.peers[peerID] = conn
    das.peersMu.Unlock()
    
    go das.handleConnection(conn)
    
    fmt.Printf("DistributedActorSystem %s: Connected to peer %s\n", das.nodeID, peerID)
    return nil
}

// handleConnection handles incoming network connections
func (das *DistributedActorSystem) handleConnection(conn net.Conn) {
    defer conn.Close()
    
    decoder := json.NewDecoder(conn)
    
    for {
        var msg RemoteMessage
        if err := decoder.Decode(&msg); err != nil {
            return
        }
        
        das.handleRemoteMessage(msg)
    }
}

// handleRemoteMessage processes remote messages
func (das *DistributedActorSystem) handleRemoteMessage(msg RemoteMessage) {
    fmt.Printf("DistributedActorSystem %s: Received remote message from %s to %s\n", 
        das.nodeID, msg.From, msg.To)
    
    // Find local actor and deliver message
    das.ActorSystem.mu.RLock()
    actor, exists := das.ActorSystem.actors[msg.To]
    das.ActorSystem.mu.RUnlock()
    
    if exists {
        // Convert payload back to proper message type
        switch msg.Type {
        case "work":
            if data, ok := msg.Payload.(string); ok {
                actor.ref.Send(WorkMessage{Data: data})
            }
        case "result":
            if result, ok := msg.Payload.(string); ok {
                actor.ref.Send(ResultMessage{Result: result})
            }
        }
    }
}

// SendRemoteMessage sends a message to a remote actor
func (das *DistributedActorSystem) SendRemoteMessage(peerID, actorID string, message Message) error {
    das.peersMu.RLock()
    conn, exists := das.peers[peerID]
    das.peersMu.RUnlock()
    
    if !exists {
        return fmt.Errorf("peer %s not connected", peerID)
    }
    
    var msgType string
    var payload interface{}
    
    switch msg := message.(type) {
    case WorkMessage:
        msgType = "work"
        payload = msg.Data
    case ResultMessage:
        msgType = "result"
        payload = msg.Result
    default:
        return fmt.Errorf("unsupported message type: %T", message)
    }
    
    remoteMsg := RemoteMessage{
        Type:    msgType,
        Payload: payload,
        From:    das.nodeID,
        To:      actorID,
    }
    
    encoder := json.NewEncoder(conn)
    return encoder.Encode(remoteMsg)
}

// Stop stops the distributed actor system
func (das *DistributedActorSystem) Stop() {
    if das.listener != nil {
        das.listener.Close()
    }
    
    das.peersMu.Lock()
    for _, conn := range das.peers {
        conn.Close()
    }
    das.peersMu.Unlock()
    
    das.ActorSystem.Stop()
}

// DistributedWorkerBehavior for distributed workers
func DistributedWorkerBehavior(system *DistributedActorSystem) ActorBehavior {
    return func(ctx ActorContext, message Message) {
        switch msg := message.(type) {
        case WorkMessage:
            fmt.Printf("DistributedWorker %s: Processing work: %s\n", ctx.Self.id, msg.Data)
            
            // Simulate work
            time.Sleep(500 * time.Millisecond)
            
            result := fmt.Sprintf("processed-%s-by-%s", msg.Data, system.nodeID)
            fmt.Printf("DistributedWorker %s: Work completed: %s\n", ctx.Self.id, result)
            
            // Send result back (in real system, would send to requester)
            
        default:
            fmt.Printf("DistributedWorker %s: Unknown message: %T\n", ctx.Self.id, message)
        }
    }
}

func main() {
    fmt.Println("=== Distributed Actor System Demo ===")
    
    // Create two distributed actor systems
    system1 := NewDistributedActorSystem("node1", "localhost:8001")
    system2 := NewDistributedActorSystem("node2", "localhost:8002")
    
    // Start systems
    if err := system1.Start(); err != nil {
        panic(err)
    }
    defer system1.Stop()
    
    if err := system2.Start(); err != nil {
        panic(err)
    }
    defer system2.Stop()
    
    // Wait for systems to start
    time.Sleep(100 * time.Millisecond)
    
    // Connect systems
    if err := system1.ConnectToPeer("node2", "localhost:8002"); err != nil {
        panic(err)
    }
    
    if err := system2.ConnectToPeer("node1", "localhost:8001"); err != nil {
        panic(err)
    }
    
    // Create distributed workers
    worker1 := system1.ActorOf("worker1", DistributedWorkerBehavior(system1), nil)
    worker2 := system2.ActorOf("worker2", DistributedWorkerBehavior(system2), nil)
    
    // Send local work
    worker1.Send(WorkMessage{Data: "local-task-1"})
    worker2.Send(WorkMessage{Data: "local-task-2"})
    
    // Send remote work
    system1.SendRemoteMessage("node2", "worker2", WorkMessage{Data: "remote-task-from-node1"})
    system2.SendRemoteMessage("node1", "worker1", WorkMessage{Data: "remote-task-from-node2"})
    
    // Wait for processing
    time.Sleep(2 * time.Second)
}

Best Practices

  1. Keep Actors Small: Each actor should have a single responsibility
  2. Immutable Messages: Use immutable data structures for messages
  3. Avoid Blocking: Don’t block in actor message handlers
  4. Handle Failures: Implement proper supervision and error handling
  5. Message Ordering: Design for message reordering in distributed systems
  6. Backpressure: Handle mailbox overflow gracefully
  7. Testing: Test actors in isolation with mock messages

Common Pitfalls

  1. Shared State: Accidentally sharing mutable state between actors
  2. Blocking Operations: Performing blocking I/O in message handlers
  3. Large Messages: Sending large objects instead of references
  4. Circular Dependencies: Creating circular message dependencies
  5. Resource Leaks: Not properly cleaning up actor resources
  6. Synchronous Communication: Trying to make actor communication synchronous

When to Use Actor Model

Use When:

  • Building distributed systems
  • Need fault isolation
  • Managing complex state machines
  • Implementing event-driven architectures
  • Building scalable concurrent systems

Avoid When:

  • Simple sequential processing
  • Tight coupling between components
  • Performance-critical tight loops
  • Simple request-response patterns

The Actor Model provides a powerful abstraction for building concurrent and distributed systems. It promotes isolation, fault tolerance, and scalability through message-passing communication and supervised actor hierarchies.


Series Complete! You’ve now learned all the essential Go concurrency patterns. Check out the Series Overview to review all patterns and their use cases.