Actor Model Pattern in Go
Go Concurrency Patterns Series: ← Semaphore Pattern | Series Overview What is the Actor Model? The Actor Model is a conceptual model for concurrent computation where “actors” are the fundamental units of computation. Each actor has its own isolated state, processes messages sequentially, and can create other actors, send messages, or change its behavior in response to messages. Key Principles: Isolated State: Each actor maintains its own private state Message Passing: Actors communicate only through asynchronous messages Sequential Processing: Each actor processes one message at a time Location Transparency: Actors can be local or remote Fault Tolerance: Actor failures are isolated and recoverable Real-World Use Cases Distributed Systems: Microservices communication Game Servers: Player state management IoT Systems: Device state and communication Financial Systems: Transaction processing Chat Applications: User session management Workflow Engines: Task orchestration Basic Actor Implementation package main import ( "context" "fmt" "sync" "time" ) // Message represents a message sent to an actor type Message interface{} // Actor represents the basic actor interface type Actor interface { Receive(message Message) Start(ctx context.Context) Stop() Send(message Message) GetID() string } // BaseActor provides basic actor functionality type BaseActor struct { id string mailbox chan Message quit chan struct{} wg sync.WaitGroup behavior func(Message) } // NewBaseActor creates a new base actor func NewBaseActor(id string, behavior func(Message)) *BaseActor { return &BaseActor{ id: id, mailbox: make(chan Message, 100), quit: make(chan struct{}), behavior: behavior, } } // GetID returns the actor ID func (ba *BaseActor) GetID() string { return ba.id } // Send sends a message to the actor func (ba *BaseActor) Send(message Message) { select { case ba.mailbox <- message: case <-ba.quit: fmt.Printf("Actor %s: Cannot send message, actor is stopped\n", ba.id) } } // Start starts the actor's message processing loop func (ba *BaseActor) Start(ctx context.Context) { ba.wg.Add(1) go func() { defer ba.wg.Done() fmt.Printf("Actor %s: Started\n", ba.id) for { select { case message := <-ba.mailbox: ba.Receive(message) case <-ba.quit: fmt.Printf("Actor %s: Stopped\n", ba.id) return case <-ctx.Done(): fmt.Printf("Actor %s: Context cancelled\n", ba.id) return } } }() } // Receive processes a received message func (ba *BaseActor) Receive(message Message) { if ba.behavior != nil { ba.behavior(message) } } // Stop stops the actor func (ba *BaseActor) Stop() { close(ba.quit) ba.wg.Wait() } // Common message types type StartMessage struct{} type StopMessage struct{} type PingMessage struct { Sender Actor } type PongMessage struct { Sender Actor } // CounterActor demonstrates a stateful actor type CounterActor struct { *BaseActor count int } // CounterMessage types type IncrementMessage struct{} type DecrementMessage struct{} type GetCountMessage struct { ResponseChan chan int } // NewCounterActor creates a new counter actor func NewCounterActor(id string) *CounterActor { ca := &CounterActor{ count: 0, } ca.BaseActor = NewBaseActor(id, ca.handleMessage) return ca } // handleMessage handles counter-specific messages func (ca *CounterActor) handleMessage(message Message) { switch msg := message.(type) { case IncrementMessage: ca.count++ fmt.Printf("Counter %s: Incremented to %d\n", ca.id, ca.count) case DecrementMessage: ca.count-- fmt.Printf("Counter %s: Decremented to %d\n", ca.id, ca.count) case GetCountMessage: fmt.Printf("Counter %s: Current count is %d\n", ca.id, ca.count) msg.ResponseChan <- ca.count case PingMessage: fmt.Printf("Counter %s: Received ping from %s\n", ca.id, msg.Sender.GetID()) msg.Sender.Send(PongMessage{Sender: ca}) default: fmt.Printf("Counter %s: Unknown message type: %T\n", ca.id, message) } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("=== Basic Actor Model Demo ===") // Create counter actors counter1 := NewCounterActor("counter-1") counter2 := NewCounterActor("counter-2") // Start actors counter1.Start(ctx) counter2.Start(ctx) defer counter1.Stop() defer counter2.Stop() // Send messages to actors counter1.Send(IncrementMessage{}) counter1.Send(IncrementMessage{}) counter1.Send(IncrementMessage{}) counter2.Send(IncrementMessage{}) counter2.Send(DecrementMessage{}) // Ping-pong between actors counter1.Send(PingMessage{Sender: counter2}) // Get current counts responseChan1 := make(chan int) responseChan2 := make(chan int) counter1.Send(GetCountMessage{ResponseChan: responseChan1}) counter2.Send(GetCountMessage{ResponseChan: responseChan2}) count1 := <-responseChan1 count2 := <-responseChan2 fmt.Printf("Final counts - Counter1: %d, Counter2: %d\n", count1, count2) time.Sleep(1 * time.Second) } Advanced Actor System with Supervision package main import ( "context" "fmt" "sync" "time" ) // ActorRef represents a reference to an actor type ActorRef struct { id string mailbox chan Message } // Send sends a message to the actor func (ref *ActorRef) Send(message Message) { select { case ref.mailbox <- message: default: fmt.Printf("ActorRef %s: Mailbox full, dropping message\n", ref.id) } } // GetID returns the actor ID func (ref *ActorRef) GetID() string { return ref.id } // ActorSystem manages actors and provides supervision type ActorSystem struct { mu sync.RWMutex actors map[string]*ManagedActor ctx context.Context cancel context.CancelFunc } // ManagedActor wraps an actor with management capabilities type ManagedActor struct { ref *ActorRef behavior ActorBehavior supervisor *ActorRef children map[string]*ActorRef mu sync.RWMutex restarts int maxRestarts int } // ActorBehavior defines how an actor processes messages type ActorBehavior func(ctx ActorContext, message Message) // ActorContext provides context for actor operations type ActorContext struct { Self *ActorRef System *ActorSystem Sender *ActorRef } // NewActorSystem creates a new actor system func NewActorSystem() *ActorSystem { ctx, cancel := context.WithCancel(context.Background()) return &ActorSystem{ actors: make(map[string]*ManagedActor), ctx: ctx, cancel: cancel, } } // ActorOf creates a new actor func (as *ActorSystem) ActorOf(id string, behavior ActorBehavior, supervisor *ActorRef) *ActorRef { as.mu.Lock() defer as.mu.Unlock() ref := &ActorRef{ id: id, mailbox: make(chan Message, 100), } actor := &ManagedActor{ ref: ref, behavior: behavior, supervisor: supervisor, children: make(map[string]*ActorRef), maxRestarts: 3, } as.actors[id] = actor // Start actor goroutine go as.runActor(actor) fmt.Printf("ActorSystem: Created actor %s\n", id) return ref } // runActor runs the actor's message processing loop func (as *ActorSystem) runActor(actor *ManagedActor) { defer func() { if r := recover(); r != nil { fmt.Printf("Actor %s: Panic recovered: %v\n", actor.ref.id, r) as.handleActorFailure(actor, fmt.Errorf("panic: %v", r)) } }() for { select { case message := <-actor.ref.mailbox: ctx := ActorContext{ Self: actor.ref, System: as, } actor.behavior(ctx, message) case <-as.ctx.Done(): fmt.Printf("Actor %s: System shutdown\n", actor.ref.id) return } } } // handleActorFailure handles actor failures and restarts func (as *ActorSystem) handleActorFailure(actor *ManagedActor, err error) { actor.mu.Lock() defer actor.mu.Unlock() actor.restarts++ fmt.Printf("Actor %s: Failed with error: %v (restart %d/%d)\n", actor.ref.id, err, actor.restarts, actor.maxRestarts) if actor.restarts <= actor.maxRestarts { // Restart the actor go as.runActor(actor) fmt.Printf("Actor %s: Restarted\n", actor.ref.id) } else { // Stop the actor and notify supervisor fmt.Printf("Actor %s: Max restarts exceeded, stopping\n", actor.ref.id) if actor.supervisor != nil { actor.supervisor.Send(ActorFailedMessage{ FailedActor: actor.ref, Error: err, }) } } } // Stop stops the actor system func (as *ActorSystem) Stop() { as.cancel() // Wait a bit for actors to stop gracefully time.Sleep(100 * time.Millisecond) fmt.Println("ActorSystem: Stopped") } // Message types for actor system type ActorFailedMessage struct { FailedActor *ActorRef Error error } type CreateChildMessage struct { ChildID string Behavior ActorBehavior ResponseChan chan *ActorRef } type WorkMessage struct { Data string } type ResultMessage struct { Result string From *ActorRef } // WorkerActor demonstrates a worker that can fail func WorkerBehavior(ctx ActorContext, message Message) { switch msg := message.(type) { case WorkMessage: fmt.Printf("Worker %s: Processing work: %s\n", ctx.Self.id, msg.Data) // Simulate work time.Sleep(100 * time.Millisecond) // Simulate random failures if len(msg.Data)%7 == 0 { panic("simulated worker failure") } result := fmt.Sprintf("processed-%s", msg.Data) fmt.Printf("Worker %s: Work completed: %s\n", ctx.Self.id, result) default: fmt.Printf("Worker %s: Unknown message: %T\n", ctx.Self.id, message) } } // SupervisorActor manages worker actors func SupervisorBehavior(ctx ActorContext, message Message) { switch msg := message.(type) { case CreateChildMessage: childRef := ctx.System.ActorOf(msg.ChildID, msg.Behavior, ctx.Self) msg.ResponseChan <- childRef case ActorFailedMessage: fmt.Printf("Supervisor %s: Child actor %s failed: %v\n", ctx.Self.id, msg.FailedActor.id, msg.Error) // Create replacement worker newWorkerID := fmt.Sprintf("%s-replacement", msg.FailedActor.id) ctx.System.ActorOf(newWorkerID, WorkerBehavior, ctx.Self) fmt.Printf("Supervisor %s: Created replacement worker %s\n", ctx.Self.id, newWorkerID) case WorkMessage: // Delegate work to children (simplified) fmt.Printf("Supervisor %s: Delegating work: %s\n", ctx.Self.id, msg.Data) default: fmt.Printf("Supervisor %s: Unknown message: %T\n", ctx.Self.id, message) } } func main() { fmt.Println("=== Advanced Actor System Demo ===") system := NewActorSystem() defer system.Stop() // Create supervisor supervisor := system.ActorOf("supervisor", SupervisorBehavior, nil) // Create workers through supervisor responseChan := make(chan *ActorRef) supervisor.Send(CreateChildMessage{ ChildID: "worker-1", Behavior: WorkerBehavior, ResponseChan: responseChan, }) worker1 := <-responseChan supervisor.Send(CreateChildMessage{ ChildID: "worker-2", Behavior: WorkerBehavior, ResponseChan: responseChan, }) worker2 := <-responseChan // Send work to workers workItems := []string{ "task-1", "task-2", "task-3", "failure", // "failure" will cause panic "task-4", "task-5", "another", "task-6", } for i, work := range workItems { var worker *ActorRef if i%2 == 0 { worker = worker1 } else { worker = worker2 } worker.Send(WorkMessage{Data: work}) time.Sleep(200 * time.Millisecond) } // Wait for processing time.Sleep(3 * time.Second) } Distributed Actor System package main import ( "context" "encoding/json" "fmt" "net" "sync" "time" ) // RemoteMessage represents a message that can be sent over network type RemoteMessage struct { Type string `json:"type"` Payload interface{} `json:"payload"` From string `json:"from"` To string `json:"to"` } // DistributedActorSystem extends ActorSystem with network capabilities type DistributedActorSystem struct { *ActorSystem nodeID string address string listener net.Listener peers map[string]net.Conn peersMu sync.RWMutex } // NewDistributedActorSystem creates a new distributed actor system func NewDistributedActorSystem(nodeID, address string) *DistributedActorSystem { return &DistributedActorSystem{ ActorSystem: NewActorSystem(), nodeID: nodeID, address: address, peers: make(map[string]net.Conn), } } // Start starts the distributed actor system func (das *DistributedActorSystem) Start() error { listener, err := net.Listen("tcp", das.address) if err != nil { return err } das.listener = listener // Accept incoming connections go func() { for { conn, err := listener.Accept() if err != nil { return } go das.handleConnection(conn) } }() fmt.Printf("DistributedActorSystem %s: Started on %s\n", das.nodeID, das.address) return nil } // ConnectToPeer connects to a peer node func (das *DistributedActorSystem) ConnectToPeer(peerID, peerAddress string) error { conn, err := net.Dial("tcp", peerAddress) if err != nil { return err } das.peersMu.Lock() das.peers[peerID] = conn das.peersMu.Unlock() go das.handleConnection(conn) fmt.Printf("DistributedActorSystem %s: Connected to peer %s\n", das.nodeID, peerID) return nil } // handleConnection handles incoming network connections func (das *DistributedActorSystem) handleConnection(conn net.Conn) { defer conn.Close() decoder := json.NewDecoder(conn) for { var msg RemoteMessage if err := decoder.Decode(&msg); err != nil { return } das.handleRemoteMessage(msg) } } // handleRemoteMessage processes remote messages func (das *DistributedActorSystem) handleRemoteMessage(msg RemoteMessage) { fmt.Printf("DistributedActorSystem %s: Received remote message from %s to %s\n", das.nodeID, msg.From, msg.To) // Find local actor and deliver message das.ActorSystem.mu.RLock() actor, exists := das.ActorSystem.actors[msg.To] das.ActorSystem.mu.RUnlock() if exists { // Convert payload back to proper message type switch msg.Type { case "work": if data, ok := msg.Payload.(string); ok { actor.ref.Send(WorkMessage{Data: data}) } case "result": if result, ok := msg.Payload.(string); ok { actor.ref.Send(ResultMessage{Result: result}) } } } } // SendRemoteMessage sends a message to a remote actor func (das *DistributedActorSystem) SendRemoteMessage(peerID, actorID string, message Message) error { das.peersMu.RLock() conn, exists := das.peers[peerID] das.peersMu.RUnlock() if !exists { return fmt.Errorf("peer %s not connected", peerID) } var msgType string var payload interface{} switch msg := message.(type) { case WorkMessage: msgType = "work" payload = msg.Data case ResultMessage: msgType = "result" payload = msg.Result default: return fmt.Errorf("unsupported message type: %T", message) } remoteMsg := RemoteMessage{ Type: msgType, Payload: payload, From: das.nodeID, To: actorID, } encoder := json.NewEncoder(conn) return encoder.Encode(remoteMsg) } // Stop stops the distributed actor system func (das *DistributedActorSystem) Stop() { if das.listener != nil { das.listener.Close() } das.peersMu.Lock() for _, conn := range das.peers { conn.Close() } das.peersMu.Unlock() das.ActorSystem.Stop() } // DistributedWorkerBehavior for distributed workers func DistributedWorkerBehavior(system *DistributedActorSystem) ActorBehavior { return func(ctx ActorContext, message Message) { switch msg := message.(type) { case WorkMessage: fmt.Printf("DistributedWorker %s: Processing work: %s\n", ctx.Self.id, msg.Data) // Simulate work time.Sleep(500 * time.Millisecond) result := fmt.Sprintf("processed-%s-by-%s", msg.Data, system.nodeID) fmt.Printf("DistributedWorker %s: Work completed: %s\n", ctx.Self.id, result) // Send result back (in real system, would send to requester) default: fmt.Printf("DistributedWorker %s: Unknown message: %T\n", ctx.Self.id, message) } } } func main() { fmt.Println("=== Distributed Actor System Demo ===") // Create two distributed actor systems system1 := NewDistributedActorSystem("node1", "localhost:8001") system2 := NewDistributedActorSystem("node2", "localhost:8002") // Start systems if err := system1.Start(); err != nil { panic(err) } defer system1.Stop() if err := system2.Start(); err != nil { panic(err) } defer system2.Stop() // Wait for systems to start time.Sleep(100 * time.Millisecond) // Connect systems if err := system1.ConnectToPeer("node2", "localhost:8002"); err != nil { panic(err) } if err := system2.ConnectToPeer("node1", "localhost:8001"); err != nil { panic(err) } // Create distributed workers worker1 := system1.ActorOf("worker1", DistributedWorkerBehavior(system1), nil) worker2 := system2.ActorOf("worker2", DistributedWorkerBehavior(system2), nil) // Send local work worker1.Send(WorkMessage{Data: "local-task-1"}) worker2.Send(WorkMessage{Data: "local-task-2"}) // Send remote work system1.SendRemoteMessage("node2", "worker2", WorkMessage{Data: "remote-task-from-node1"}) system2.SendRemoteMessage("node1", "worker1", WorkMessage{Data: "remote-task-from-node2"}) // Wait for processing time.Sleep(2 * time.Second) } Best Practices Keep Actors Small: Each actor should have a single responsibility Immutable Messages: Use immutable data structures for messages Avoid Blocking: Don’t block in actor message handlers Handle Failures: Implement proper supervision and error handling Message Ordering: Design for message reordering in distributed systems Backpressure: Handle mailbox overflow gracefully Testing: Test actors in isolation with mock messages Common Pitfalls Shared State: Accidentally sharing mutable state between actors Blocking Operations: Performing blocking I/O in message handlers Large Messages: Sending large objects instead of references Circular Dependencies: Creating circular message dependencies Resource Leaks: Not properly cleaning up actor resources Synchronous Communication: Trying to make actor communication synchronous When to Use Actor Model Use When: ...