npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

libeam

v0.3.0

Published

Erlang/OTP-inspired actor system for TypeScript — actors, supervision, distribution, GenStage

Readme

libeam

An Erlang/OTP-inspired actor system for TypeScript. Build distributed, fault-tolerant applications with location-transparent actors, automatic supervision, and gossip-based cluster membership.

Features

  • Functional API: Simple, closure-based actor definitions with createSystem and createActor (Recommended)
  • Actor Model: Lightweight actors with message-passing semantics (call for request/response, cast for fire-and-forget)
  • Location Transparency: Actors communicate via ActorRef regardless of whether the target is local or remote
  • Supervision: Automatic crash handling with configurable restart strategies
  • Distributed Clustering: Gossip-based membership detection with automatic peer discovery
  • Placement Strategies: Control where actors are spawned (local, round-robin) with optional role-based filtering
  • Transport Abstraction: Pluggable transport layer (in-memory for testing, ZeroMQ for production)

Elixir/OTP Parity

libeam implements the core primitives from Elixir/OTP, adapted for the Node.js runtime.

Implemented

| Feature | Elixir/OTP | libeam | |---------|------------|--------| | Actor Model | | Spawn processes | spawn/1, GenServer.start/2 | system.spawn() | | Async messages | send/2, GenServer.cast/2 | ref.cast() | | Sync calls | GenServer.call/2 | ref.call() | | GenServer Callbacks | | init/1 | Yes | init() | | handle_call/3 | Yes | handleCall() | | handle_cast/2 | Yes | handleCast() | | handle_info/2 | Yes | handleInfo() | | handle_continue/2 | Yes | handleContinue() | | terminate/2 | Yes | terminate() | | Idle timeout | {:noreply, state, timeout} | setIdleTimeout() | | Supervision | | Supervisors | Supervisor | Supervisor, ChildSupervisor | | one-for-one | Yes | Yes | | one-for-all | Yes | Yes | | rest-for-one | Yes | Yes | | Max restarts | Yes | Yes | | Dynamic supervisors | DynamicSupervisor | DynamicSupervisor | | Process Features | | Links | Process.link/1 | link() | | Monitors | Process.monitor/1 | watch() | | Trap exit | Process.flag(:trap_exit, true) | setTrapExit() | | Exit signals | Process.exit/2 | exit() | | Timers | Process.send_after/3 | sendAfter(), sendInterval() | | Introspection | | List children | Supervisor.which_children/1 | getChildren() | | Count children | Supervisor.count_children/1 | countChildren() | | Abstractions | | Agent | Agent | Agent | | DynamicSupervisor | DynamicSupervisor | DynamicSupervisor | | GenStage | GenStage | Producer, Consumer, ProducerConsumer, ConsumerSupervisor | | Distribution | | Cluster membership | :net_kernel | Cluster, GossipProtocol | | Remote messaging | Transparent | Via Transport | | Registry | Registry, :global | Registry, DistributedRegistry | | Actor migration | Manual process migration | system.migrate() | | Node roles | Node profiles | roles config + role spawn option | | Process groups | pg | joinGroup(), getGroup(), broadcast() |

Not Implemented (Not Needed in Node.js)

| Feature | Reason | |---------|--------| | Task.async/await | Use native Promise / async-await | | Selective receive | Not practical without BEAM VM | | Hot code upgrades | Use libeam deploy for rolling deploys | | Application behaviour | Use standard Node.js entry points | | ETS/DETS | Use Map or external stores (Redis, etc.) |

Not Yet Implemented

All core OTP features have been implemented. See the feature table above for the full list.

Installation

npm / pnpm / yarn

# npm
npm install libeam

# pnpm
pnpm add libeam

# yarn
yarn add libeam

Deno (JSR)

deno add jsr:@libeam/core

Or import directly:

import { createSystem, createActor } from "jsr:@libeam/core";

Quick Start

Functional API (Recommended)

The functional API provides a simple, closure-based approach to defining actors:

import { createSystem, createActor } from "libeam";

// Define an actor with closure-based state
const Counter = createActor((ctx, self, initialValue: number) => {
  let count = initialValue;

  return self
    .onCall("get", () => count)
    .onCall("increment", () => ++count)
    .onCast("set", (value: number) => { count = value; });
});

// Create a system (one line!)
const system = createSystem();

// Spawn and interact with typed refs
const counter = system.spawn(Counter, { args: [0] });

const value = await counter.call("get");     // 0 — fully typed!
await counter.call("increment");              // 1
counter.cast("set", 100);                     // fire-and-forget

await system.shutdown();

Class-Based API

For full control, extend the Actor class directly:

import { Actor, ActorRef } from "libeam";

class CounterActor extends Actor {
  private count = 0;

  init(initialValue: number = 0) {
    this.count = initialValue;
    console.log(`Counter initialized with ${this.count}`);
  }

  // Synchronous request/response
  handleCall(message: { type: string }): number {
    switch (message.type) {
      case "get":
        return this.count;
      case "increment":
        return ++this.count;
      default:
        throw new Error(`Unknown message: ${message.type}`);
    }
  }

  // Fire-and-forget messages
  handleCast(message: { type: string; value?: number }): void {
    if (message.type === "set" && message.value !== undefined) {
      this.count = message.value;
    }
  }
}

Single-Node Setup (In-Memory)

For testing or single-process applications:

import {
  ActorSystem,
  InMemoryTransport,
  LocalCluster,
  LocalRegistry,
} from "libeam";

async function main() {
  const cluster = new LocalCluster("node1");
  const transport = new InMemoryTransport("node1");
  const registry = new LocalRegistry();

  await transport.connect();

  const system = new ActorSystem(cluster, transport, registry);
  system.registerActorClass(CounterActor);
  await system.start();

  // Spawn an actor
  const counter = system.spawn(CounterActor, {
    name: "my-counter",
    args: [10], // Initial value
  });

  // Interact with the actor
  const value = await counter.call({ type: "get" });
  console.log(`Current value: ${value}`); // 10

  await counter.call({ type: "increment" });
  console.log(`After increment: ${await counter.call({ type: "get" })}`); // 11

  counter.cast({ type: "set", value: 100 });
}

main();

Multi-Node Setup (Functional API)

For distributed applications across multiple processes/machines, createSystem handles all the wiring — ZeroMQ transport, gossip protocol, cluster membership, and registry sync:

import { createSystem, createActor, ActorRegistry } from "libeam";

// Define actors
const Ping = createActor((ctx, self) => {
  return self.onCast("ping", async (n: number) => {
    console.log(`Ping received: ${n}`);
    const pong = await ctx.getActorByName("pong");
    if (pong) pong.cast("pong", n + 1);
  });
});

const Pong = createActor((ctx, self) => {
  return self.onCast("pong", async (n: number) => {
    console.log(`Pong received: ${n}`);
    const ping = await ctx.getActorByName("ping");
    if (ping) ping.cast("ping", n + 1);
  });
});

// Register actors for typed getActorByName — same codebase on all nodes
declare module "libeam" {
  interface ActorRegistry {
    ping: typeof Ping;
    pong: typeof Pong;
  }
}

// Node 1 — port convention: rpc=5000, pub=5001, gossip=5002
const system1 = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "my-cluster-secret",
});
const ping = system1.spawn(Ping, { name: "ping" });

// Node 2 — joins via node1's gossip port
const system2 = await createSystem({
  type: "distributed",
  port: 5010,
  seedNodes: ["127.0.0.1:5002"],
  cookie: "my-cluster-secret",
});
system2.spawn(Pong, { name: "pong" });

// Start the game — typed ref from spawn() supports clean syntax
ping.cast("ping", 0);

Run the full distributed example:

# Terminal 1
npx tsx examples/high-level/distributed.ts node1

# Terminal 2
npx tsx examples/high-level/distributed.ts node2

Multi-Node Setup (Class-Based API)

For full control over transport, gossip, and cluster configuration:

import {
  ActorSystem,
  ZeroMQTransport,
  GossipProtocol,
  GossipUDP,
  DistributedCluster,
  DistributedRegistry,
  RegistrySync,
} from "libeam";

async function startNode(config: {
  nodeId: string;
  rpcPort: number;
  pubPort: number;
  gossipPort: number;
  seedNodes: string[];
}) {
  const { nodeId, rpcPort, pubPort, gossipPort, seedNodes } = config;

  // 1. Setup transport (ZeroMQ)
  const transport = new ZeroMQTransport({
    nodeId,
    rpcPort,
    pubPort,
    bindAddress: "0.0.0.0",
  });
  await transport.connect();

  // 2. Setup gossip protocol for membership
  const gossipUDP = new GossipUDP(gossipPort);
  const gossipProtocol = new GossipProtocol(
    nodeId,
    `tcp://127.0.0.1:${rpcPort}`, // RPC address for peers
    `127.0.0.1:${gossipPort}`, // Gossip address
    gossipUDP,
    {
      gossipIntervalMs: 1000,
      cleanupIntervalMs: 2000,
      failureTimeoutMs: 5000,
      gossipFanout: 3,
      seedNodes,
    },
  );

  // 3. Setup cluster (wraps gossip protocol)
  const cluster = new DistributedCluster(gossipProtocol);
  await cluster.start();

  // 4. Setup registry sync for actor name resolution
  const registrySync = new RegistrySync(nodeId, transport, cluster);
  const registry = new DistributedRegistry(nodeId, registrySync);

  // 5. Wire cluster membership changes to transport
  cluster.on("member_join", (peerId: string) => {
    const peer = cluster.getPeerState(peerId);
    if (peer) {
      transport.updatePeers([[peerId, peer.address]]);
    }
  });

  // 6. Create actor system
  const system = new ActorSystem(cluster, transport, registry);
  system.registerActorClass(CounterActor);
  await system.start();

  return { system, cluster, transport };
}

// Node 1 (seed node)
const node1 = await startNode({
  nodeId: "node1",
  rpcPort: 5000,
  pubPort: 5001,
  gossipPort: 6000,
  seedNodes: [], // No seeds for first node
});

// Node 2 (joins via seed)
const node2 = await startNode({
  nodeId: "node2",
  rpcPort: 5010,
  pubPort: 5011,
  gossipPort: 6010,
  seedNodes: ["127.0.0.1:6000"], // Connect to node1's gossip port
});

Functional API

The functional API is the recommended way to build applications with libeam. It provides better type safety, less boilerplate, and a more modern developer experience.

createSystem

The createSystem factory simplifies system creation and configuration.

import { createSystem } from "libeam";

// Local system (synchronous, zero config)
const system = createSystem();

// Local with options
const system = createSystem({ nodeId: "my-node" });

// Distributed (async, with ZeroMQ + Gossip)
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
});

System Interface:

  • spawn(actorClass, options?): Spawn an actor and return a TypedActorRef
  • register(actorClass): Register an actor class for remote spawning
  • getActorByName(name): Look up a named actor (local or remote)
  • shutdown(): Gracefully shut down the system and all actors
  • stop(ref): Stop an individual actor (cascading termination of children)
  • nodeId: The unique ID of this node
  • transport: Access to the underlying transport layer
  • cluster: Access to the cluster membership interface
  • registry: Access to the actor registry
  • system: Escape hatch to the raw ActorSystem instance

createActor

Define actors using a closure-based factory function.

const MyActor = createActor((ctx, self, ...args) => {
  // Initialization logic here
  
  return self.onCall("ping", () => "pong");
});

The factory function receives:

  • ctx: The actor context (spawn, watch, link, stash, etc.)
  • self: The actor builder (register handlers, timers, etc.)
  • ...args: Arguments passed during spawn

ActorContext (ctx) Methods:

  • self: Reference to this actor
  • parent: Reference to the parent actor
  • spawn(actor, options?): Spawn a child actor
  • watch(ref) / unwatch(ref): Monitor other actors
  • link(ref) / unlink(ref): Bidirectional crash propagation
  • exit(reason?): Stop this actor (with optional reason)
  • setTrapExit(boolean): Enable/disable exit trapping
  • getActorByName(name): Look up a named actor (local or remote)
  • stash() / unstash() / unstashAll() / clearStash(): Message stashing

ActorBuilder (self) Methods:

  • onCall(name, handler): Register a request-reply handler
  • onCast(name, handler): Register a fire-and-forget handler
  • onInfo(type, handler): Register a handler for system messages ("down", "exit", "timeout", "moved")
  • onTerminate(handler): Cleanup logic
  • onContinue(handler): Deferred initialization
  • sendAfter(msg, delay) / sendInterval(msg, interval): Timers
  • setIdleTimeout(ms): Configure idle timeout
  • migratable({ getState, setState }): Enable actor migration
  • childSupervision(options): Configure supervision strategy for children

TypedActorRef

When you spawn an actor created with createActor, you get a TypedActorRef. This provides full TypeScript autocompletion and type checking for call and cast.

const counter = system.spawn(Counter, { args: [0] });

// TypeScript knows "get" and "increment" are valid calls
const value = await counter.call("get");
await counter.call("increment");

// TypeScript knows "set" is a valid cast and requires a number
counter.cast("set", 42);

Type Inference

When you return the builder chain from a createActor factory, TypeScript automatically infers the handler types:

// With return — full type inference
const Counter = createActor((ctx, self, initial: number) => {
  let count = initial;
  return self
    .onCall("get", () => count)
    .onCall("increment", () => ++count)
    .onCast("set", (v: number) => { count = v; });
});

const counter = system.spawn(Counter, { args: [0] });
counter.call("get");        // TypeScript knows this returns number
counter.cast("set", 42);    // TypeScript knows "set" expects a number
counter.call("typo");       // Type error! "typo" is not a valid method

Without return, the factory still works but handlers are untyped:

// Without return — still works, but no type inference
const Untyped = createActor((ctx, self) => {
  self.onCall("get", () => 42);
});

Timer methods (sendAfter, sendInterval) return TimerRef, not the builder, so they must be called on a separate line before the return:

const Heartbeat = createActor((ctx, self) => {
  self.sendInterval({ type: "tick" }, 1000);  // separate line (returns TimerRef)
  return self
    .onCast("tick", () => console.log("tick"))
    .onTerminate(() => console.log("stopped"));
});

Module Augmentation (Typed getActorByName)

By default, getActorByName returns an untyped ActorRef. To get fully typed refs, augment the ActorRegistry interface:

// types.d.ts (or any .ts file)
import "libeam";

declare module "libeam" {
  interface ActorRegistry {
    counter: typeof Counter;
    "chat-room": typeof ChatRoom;
  }
}

Now getActorByName returns typed refs when called with registered names:

const counter = await system.getActorByName("counter");
if (counter) {
  const value = await counter.call("get");  // fully typed!
  counter.cast("set", 100);                 // fully typed!
}

Utility types for working with actor definitions:

  • ActorRefFrom<T> — Extract TypedActorRef from an ActorDefinition
  • ExtractCalls<T> — Extract call handler types from an ActorDefinition
  • ExtractCasts<T> — Extract cast handler types from an ActorDefinition
import { ActorRefFrom } from "libeam";

type CounterRef = ActorRefFrom<typeof Counter>;
// TypedActorRef<{ get: () => number; increment: () => number }, { set: (v: number) => void }>

Feature Comparison

| Feature | Functional API | Class-Based API | |---------|---------------|-----------------| | State management | Closures | Class fields | | Message handlers | self.onCall() / self.onCast() | handleCall() / handleCast() | | Type safety | Automatic (TypedActorRef) | Manual typing | | System setup | createSystem() | Manual wiring | | Child supervision | self.childSupervision() | childSupervision() override | | Deferred init | self.onContinue() | handleContinue() override | | Message stashing | ctx.stash() / ctx.unstashAll() | this.stash() / this.unstashAll() |

Class-Based API Reference

This section documents the low-level, class-based API. While the Functional API is recommended for most use cases, the class-based API provides full control and is used internally by the system.

Actor

Base class for all actors.

class MyActor extends Actor {
  // Called when actor starts. Receives spawn arguments.
  init(...args: any[]): void | Promise<void>;

  // Called when actor is stopped.
  terminate(): void | Promise<void>;

  // Handle synchronous requests (must return a value).
  handleCall(message: any): any | Promise<any>;

  // Handle asynchronous messages (fire-and-forget).
  handleCast(message: any): void | Promise<void>;

  // Reference to self for sending to other actors
  self: ActorRef;
}

handleContinue

Called when init() returns { continue: data } for deferred async initialization.

class DatabaseActor extends Actor {
  private db!: DatabaseConnection;

  init() {
    // Return immediately, continue async work
    return { continue: "connect" };
  }

  async handleContinue(data: string) {
    if (data === "connect") {
      this.db = await connectToDatabase();
    }
  }
}

See examples/high-level/handle_continue.ts and examples/low-level/handle_continue.ts for more examples.

Idle Timeout

Set a timeout that fires when the actor is idle (no messages received).

class SessionActor extends Actor {
  init() {
    // Timeout after 30 seconds of inactivity
    this.setIdleTimeout(30000);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "timeout") {
      console.log(`Session idle for ${message.idleMs}ms, closing...`);
      this.exit(this.self, "normal");
    }
  }
}

Methods:

  • setIdleTimeout(timeoutMs: number): void - Set idle timeout in milliseconds
  • getIdleTimeout(): number - Get current idle timeout

The actor receives a TimeoutMessage via handleInfo() when the timeout fires.

See examples/high-level/idle_timeout.ts and test/idle_timeout.test.ts for more examples.

Timers

Schedule delayed and periodic messages to yourself.

class ReminderActor extends Actor {
  private reminderRef!: TimerRef;

  init() {
    // One-shot timer: remind after 5 seconds
    this.reminderRef = this.sendAfter({ type: "remind" }, 5000);
    
    // Periodic timer: tick every second
    this.sendInterval({ type: "tick" }, 1000);
  }

  handleCast(message: { type: string }) {
    if (message.type === "remind") {
      console.log("Time's up!");
    } else if (message.type === "tick") {
      console.log("Tick...");
    }
  }

  terminate() {
    // Clean up timers
    this.cancelTimer(this.reminderRef);
    this.cancelAllTimers();
  }
}

Methods:

  • sendAfter(message, delayMs): TimerRef - Schedule one-shot message
  • sendInterval(message, intervalMs): TimerRef - Schedule repeating message
  • cancelTimer(timerRef): boolean - Cancel a specific timer
  • cancelAllTimers(): void - Cancel all active timers

See examples/high-level/timers.ts and examples/low-level/timers.ts for more examples.

Watching

Monitor another actor's lifecycle and receive notification when it terminates.

class WorkerSupervisor extends Actor {
  private workerWatch!: WatchRef;

  init(workerRef: ActorRef) {
    // Start watching the worker
    this.workerWatch = this.watch(workerRef);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "down") {
      const down = message as DownMessage;
      console.log(`Worker ${down.actorRef.id} terminated: ${down.reason.type}`);
      
      // Unwatch when done
      this.unwatch(down.watchRef);
    }
  }
}

Methods:

  • watch(actorRef): WatchRef - Start watching an actor
  • unwatch(watchRef): void - Stop watching

Behavior:

  • One-shot notification: You receive exactly one DownMessage when the watched actor terminates
  • Auto-cleanup: The watch is automatically removed after the DOWN message is delivered
  • Works across nodes: Can watch actors on remote nodes

See examples/high-level/watching.ts and examples/low-level/actor_watching.ts for more examples.

Links

Bidirectional crash propagation between actors. If one linked actor crashes, the other crashes too (unless trapExit is enabled).

class ParentActor extends Actor {
  private childLink!: LinkRef;

  init(childRef: ActorRef) {
    // Link to child - bidirectional crash propagation
    this.childLink = this.link(childRef);
    
    // Enable trap exit to receive ExitMessage instead of crashing
    this.setTrapExit(true);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "exit") {
      const exit = message as ExitMessage;
      console.log(`Linked actor exited: ${exit.reason.type}`);
      
      // Unlink when done
      this.unlink(exit.linkRef!);
    }
  }

  terminateChild() {
    // Send exit signal to linked actor
    this.exit(this.childLink.actorRef, "shutdown");
  }
}

Methods:

  • link(actorRef): LinkRef - Create bidirectional link
  • unlink(linkRef): void - Remove link
  • setTrapExit(trap: boolean): void - Enable/disable exit trapping
  • isTrapExit(): boolean - Check if exit trapping is enabled
  • exit(actorRef, reason?): void - Send exit signal to actor

Exit Reasons:

  • "normal" - No effect on linked actors
  • "kill" - Always terminates, ignores trapExit
  • Custom string - Delivered to linked actors with trapExit enabled

See examples/high-level/links.ts and examples/low-level/actor_links.ts for more examples.

Message Stashing

Defer message processing until the actor is ready. Useful for state-dependent message handling.

class StatefulActor extends Actor {
  private ready = false;
  private pendingMessages: any[] = [];

  init() {
    // Actor starts in "not ready" state
    this.ready = false;
  }

  async handleCall(message: any) {
    if (!this.ready) {
      // Stash message for later processing
      this.stash();
      return "stashed";
    }
    // Process message normally
    return this.processMessage(message);
  }

  setReady() {
    this.ready = true;
    // Replay all stashed messages
    this.unstashAll();
  }

  private processMessage(message: any) {
    return `Processed: ${message}`;
  }
}

Methods:

  • stash(): void - Save current message to stash
  • unstash(): void - Replay one stashed message (FIFO order)
  • unstashAll(): void - Replay all stashed messages
  • clearStash(): void - Discard all stashed messages

See examples/high-level/message_stashing.ts and test/message_stashing.test.ts for more examples.

InfoMessage Types

System messages delivered via handleInfo(). Use type guards to distinguish between message variants.

handleInfo(message: InfoMessage) {
  switch (message.type) {
    case "down":
      const down = message as DownMessage;
      console.log(`Actor ${down.actorRef.id} terminated: ${down.reason.type}`);
      break;
    case "exit":
      const exit = message as ExitMessage;
      console.log(`Linked actor exited: ${exit.reason.type}`);
      break;
    case "timeout":
      const timeout = message as TimeoutMessage;
      console.log(`Idle for ${timeout.idleMs}ms`);
      break;
    case "moved":
      const moved = message as MovedMessage;
      console.log(`Actor moved to ${moved.newNodeId}`);
      break;
  }
}

| Message | Type | Fields | Description | |---------|------|--------|-------------| | DownMessage | "down" | watchRef, actorRef, reason | Watched actor terminated | | ExitMessage | "exit" | linkRef?, actorRef, reason | Linked actor exited (trapExit only) | | TimeoutMessage | "timeout" | idleMs | Idle timeout fired | | MovedMessage | "moved" | watchRef?, linkRef?, actorRef, oldNodeId, newNodeId, newActorId | Actor migrated to another node |

TerminationReason:

type TerminationReason = 
  | { type: "normal" }
  | { type: "error"; error: any }
  | { type: "killed" };

ActorRef

Location-transparent reference to an actor.

// Request/response with timeout (default 5000ms)
const result = await actorRef.call(message, timeout?);

// Fire-and-forget
actorRef.cast(message);

Agent

State management abstraction for simple key-value storage.

// Create an agent
const counter = Agent.start(system, 0);

// Read state
const value = await counter.get();

// Update state (waits for completion)
await counter.update(n => n + 1);

// Fire-and-forget update
counter.cast(n => n + 1);

// Stop the agent
await counter.stop();

Methods:

  • Agent.start<T>(system, initialState, options?): Agent<T> - Create an agent
  • get(timeout?): Promise<T> - Get current state
  • update(fn, timeout?): Promise<T> - Update state, returns new value
  • getAndUpdate(fn, timeout?): Promise<T> - Update state, returns old value
  • cast(fn): void - Fire-and-forget state update
  • stop(): Promise<void> - Stop the agent
  • getRef(): ActorRef - Access underlying actor reference

See test/agent.test.ts for more examples.

DynamicSupervisor

On-demand supervised child spawning. Unlike static supervision trees where children are defined at init time, a DynamicSupervisor starts with zero children and allows adding them at runtime. All children are supervised with a one-for-one strategy.

import { DynamicSupervisor } from "libeam";

// Start a dynamic supervisor
const dynSup = DynamicSupervisor.start(system);

// With options
const dynSup = DynamicSupervisor.start(system, {
  maxChildren: 100,  // Cap child count (default: Infinity)
  maxRestarts: 3,    // Per-child restart limit (default: 3)
  periodMs: 5000,    // Restart counting window (default: 5000)
});

// Named supervisor
const dynSup = DynamicSupervisor.start(system, {}, { name: "worker-pool" });

// Start children on demand
const workerRef = await dynSup.startChild(WorkerActor, { args: ["job-1"] });

// Functional actors return TypedActorRef with full type inference
const counter = await dynSup.startChild(Counter, { args: [0] });
await counter.call("get");       // fully typed!
counter.cast("set", 42);         // fully typed!

// Inspect children
const children = await dynSup.whichChildren();
// [{ ref: ActorRef, className: "WorkerActor", name?: string }]

const counts = await dynSup.countChildren();
// { specs: 2, active: 2 }

// Terminate a specific child
await dynSup.terminateChild(workerRef); // true if found, false otherwise

// Stop supervisor (cascades to all children)
await dynSup.stop();

Methods:

  • DynamicSupervisor.start(system, options?, spawnOptions?): DynamicSupervisor - Create a supervisor
  • startChild(actorClass, options?): Promise<ActorRef | TypedActorRef> - Spawn a supervised child
  • terminateChild(ref): Promise<boolean> - Stop a child by ref
  • whichChildren(): Promise<ChildInfo[]> - List active children with metadata
  • countChildren(): Promise<ChildCounts> - Get child count statistics
  • stop(): Promise<void> - Stop supervisor and all children
  • getRef(): ActorRef - Access underlying actor reference

Supervision behavior:

  • Children that crash are automatically restarted (one-for-one)
  • If a child exceeds maxRestarts within periodMs, it is stopped permanently
  • startChild throws MaxChildrenError when the maxChildren limit is reached
  • Stopping the supervisor cascades to all children (depth-first termination)

See test/dynamic_supervisor.test.ts for more examples.

GenStage

Demand-driven producer-consumer pipelines with back-pressure. Inspired by Elixir's GenStage. Consumers tell producers how many events they can handle; producers never emit more than requested.

import { Producer, Consumer, ProducerConsumer, ConsumerSupervisor } from "libeam";

// Producer: emits sequential numbers on demand
const producer = Producer.start(system, {
  init: () => 0,
  handleDemand: (demand, counter) => {
    const events = Array.from({ length: demand }, (_, i) => counter + i);
    return [events, counter + demand];
  },
});

// Consumer: prints received events
const consumer = Consumer.start(system, {
  handleEvents: (events, _from, state) => {
    console.log("Received:", events);
    return state;
  },
});

// Subscribe with back-pressure options
await consumer.subscribe(producer.getRef(), { maxDemand: 100, minDemand: 50 });

// 3-stage pipeline with transformation
const multiplier = ProducerConsumer.start(system, {
  init: () => 10,
  handleEvents: (events, _from, factor) => {
    return [events.map(e => e * factor), factor];
  },
});

await multiplier.subscribe(producer.getRef(), { maxDemand: 50 });
await consumer.subscribe(multiplier.getRef(), { maxDemand: 50 });

Stage Types:

  • Producer — Emits events in response to downstream demand. Buffers events when demand is zero.
  • Consumer — Subscribes to producers and processes received events.
  • ProducerConsumer — Receives events from upstream, transforms them, and dispatches downstream.
  • ConsumerSupervisor — Subscribes to a producer and spawns a supervised worker per event. Back-pressure is tied to worker lifecycle.

Producer Methods:

  • Producer.start(system, callbacks, options?, spawnOptions?): Producer — Create a producer
  • stop(): Promise<void> — Stop the producer
  • demand(mode): void — Switch demand mode (call with "forward" to resume after demand: "accumulate")
  • getRef(): ActorRef — Get the producer's ref (for consumer subscriptions)

Consumer Methods:

  • Consumer.start(system, callbacks, spawnOptions?): Consumer — Create a consumer
  • subscribe(producerRef, options?): Promise<SubscriptionRef> — Subscribe to a producer
  • cancel(ref): boolean — Cancel a subscription
  • stop(): Promise<void> — Stop and cancel all subscriptions
  • getRef(): ActorRef — Get the consumer's ref

ProducerConsumer Methods:

  • ProducerConsumer.start(system, callbacks, producerOptions?, spawnOptions?): ProducerConsumer — Create a stage
  • subscribe(producerRef, options?): Promise<SubscriptionRef> — Subscribe to upstream
  • cancelUpstream(ref): boolean — Cancel an upstream subscription
  • stop(): Promise<void> — Stop and cancel all subscriptions
  • getRef(): ActorRef — Get the stage's ref

ConsumerSupervisor Methods:

  • ConsumerSupervisor.start(system, childSpec, options?, spawnOptions?): ConsumerSupervisor — Create a consumer supervisor
  • subscribe(producerRef, options?): Promise<SubscriptionRef> — Subscribe to a producer
  • cancel(ref): boolean — Cancel a subscription
  • whichChildren(): Promise<ChildInfo[]> — List active worker children
  • countChildren(): Promise<ChildCounts> — Get worker count statistics
  • stop(): Promise<void> — Stop and cancel all subscriptions, terminate all workers
  • getRef(): ActorRef — Get the supervisor's ref

Subscription Options:

  • maxDemand — Max events in flight per subscription (default: 1000)
  • minDemand — Threshold to request more events (default: 75% of maxDemand)
  • cancel — Cancel behavior: "permanent" | "transient" | "temporary" (default)
  • partition — Partition to subscribe to (required for PartitionDispatcher)

Dispatcher Types:

Producers can be configured with different dispatch strategies via ProducerOptions.dispatcher:

| Dispatcher | Description | Use Case | |------------|-------------|----------| | { type: "demand" } | Sends to consumer with highest pending demand (default) | Work pools, load balancing | | { type: "broadcast" } | Sends all events to all consumers | Event buses, audit logging, fan-out | | { type: "partition", partitions: N, hash? } | Routes events by hash to fixed partitions | Ordered processing per key, sharding |

// Default: DemandDispatcher (highest-demand-first)
const producer = Producer.start(system, callbacks);

// BroadcastDispatcher: all consumers get all events
const producer = Producer.start(system, callbacks, {
  dispatcher: { type: "broadcast" },
});

// PartitionDispatcher: hash-based routing
const producer = Producer.start(system, callbacks, {
  dispatcher: {
    type: "partition",
    partitions: 4,
    hash: (event) => event.userId % 4,  // route by user ID
  },
});

// Consumers subscribe to specific partitions
await consumer0.subscribe(producer.getRef(), { maxDemand: 10, partition: 0 });
await consumer1.subscribe(producer.getRef(), { maxDemand: 10, partition: 1 });

BroadcastDispatcher behavior:

  • All consumers receive the same events (fan-out)
  • Demand = min(all consumer demands) — slowest consumer throttles the pipeline
  • Sequential subscribes are safe — initial demand is deferred to ensure all consumers register before events flow

Demand Mode (demand: "accumulate"):

For complex topologies where you need deterministic setup, start the producer in accumulate mode:

const producer = Producer.start(system, callbacks, {
  dispatcher: { type: "broadcast" },
  demand: "accumulate",  // Pauses event production
});

// Subscribe all consumers (order doesn't matter)
await consumer1.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer2.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer3.subscribe(producer.getRef(), { maxDemand: 10 });

// Resume — now all consumers receive the same events
producer.demand("forward");

Inspired by Elixir's {:producer, state, demand: :accumulate}.

PartitionDispatcher behavior:

  • One consumer per partition (subscribing to a taken partition throws)
  • Hash function maps each event to a partition: (event) => partitionIndex | null
  • Returning null from hash discards the event
  • Events for partitions with no consumer are buffered per-partition
  • Default hash: modulo for numbers, djb2 for strings, event.key for objects

Back-pressure behavior:

  • Consumer sends initial demand of maxDemand on subscribe (deferred to next tick for broadcast safety)
  • When pending demand drops to minDemand, consumer automatically re-asks
  • Producer never emits more events than total demand from all consumers
  • Excess events are buffered in the producer (configurable bufferSize, default: 10000)
  • Multiple consumers receive events via the configured dispatcher

See test/gen_stage.test.ts for more examples.

ConsumerSupervisor behavior:

ConsumerSupervisor spawns one supervised worker per event. Demand is tied to worker lifecycle:

  • On subscribe, sends initial demand of maxDemand to producer
  • Each received event spawns a worker: actorClass.init(...baseArgs, event)
  • maxDemand = max concurrent workers (never more active at once)
  • When a worker exits (normal or crash), its demand slot is released
  • Released slots accumulate; when they reach minDemand, more events are requested
  • Workers are supervised with one-for-one strategy (configurable maxRestarts/periodMs)
import { Producer, ConsumerSupervisor } from "libeam";

// Producer emits jobs
const producer = Producer.start(system, {
  init: () => 0,
  handleDemand: (demand, counter) => {
    const jobs = Array.from({ length: demand }, (_, i) => ({
      id: counter + i,
      payload: `task-${counter + i}`,
    }));
    return [jobs, counter + demand];
  },
});

// ConsumerSupervisor spawns a JobWorker per event
const supervisor = ConsumerSupervisor.start(system, {
  actorClass: JobWorker,
  args: ["base-config"],  // event appended as last arg
});

// max 10 concurrent workers, re-ask when 7 complete
await supervisor.subscribe(producer.getRef(), {
  maxDemand: 10,
  minDemand: 7,
});

// Inspect active workers
const workers = await supervisor.whichChildren();
const counts = await supervisor.countChildren();

ActorSystem

Manages actor lifecycle on a node.

const system = new ActorSystem(cluster, transport, registry, supervisionOptions?);

// Register actor classes for remote spawning
system.registerActorClass(MyActor);
system.registerActorClasses([ActorA, ActorB]);

// Spawn actors
const ref = system.spawn(MyActor, {
  name?: string,           // Optional registered name
  args?: any[],            // Arguments passed to init()
  strategy?: 'local' | 'round-robin'  // Placement strategy
});

// Stop an actor
await system.stop(actorRef);

// Start processing messages
await system.start();

// Check system state
system.isRunning();      // true if running and not shutting down
system.isShuttingDown(); // true if shutdown in progress

// Graceful shutdown
await system.shutdown({
  timeout: 5000,        // Max time to wait for actors (default: 5000ms)
  drainMailboxes: true  // Wait for pending messages (default: true)
});

Supervision

Configure crash handling behavior:

const system = new ActorSystem(cluster, transport, registry, {
  strategy: "Restart", // or 'Stop'
  maxRestarts: 3, // Max restarts within period
  periodMs: 5000, // Time window for restart counting
});

Supervision Trees

Actors can spawn child actors, creating a supervision tree hierarchy. When a parent actor is stopped, all its children are automatically terminated first (cascading termination).

class WorkerActor extends Actor {
  handleCall(message: any) {
    if (message.type === "work") {
      return `Processed: ${message.data}`;
    }
  }
  handleCast(message: any) {}
}

class SupervisorActor extends Actor {
  private workers: ActorRef[] = [];

  init(workerCount: number) {
    // Spawn child workers under this supervisor
    for (let i = 0; i < workerCount; i++) {
      const worker = this.spawn(WorkerActor, { name: `worker-${i}` });
      this.workers.push(worker);
    }
  }

  handleCall(message: any) {
    if (message.type === "get_worker_count") {
      return this.getChildren().length;
    }
    if (message.type === "dispatch") {
      // Round-robin to workers
      const worker = this.workers[message.index % this.workers.length];
      return worker.call({ type: "work", data: message.data });
    }
  }

  handleCast(message: any) {}
}

// Usage
const supervisor = system.spawn(SupervisorActor, { args: [3] });

// Supervisor has 3 child workers
const count = await supervisor.call({ type: "get_worker_count" }); // 3

// When supervisor is stopped, all workers are terminated first
await system.stop(supervisor); // Stops workers, then supervisor

Actor Context

Each actor has access to its context:

class MyActor extends Actor {
  someMethod() {
    // Reference to parent actor (undefined for root actors)
    const parent = this.context.parent;

    // Set of child actor references
    const children = this.context.children;

    // Reference to the actor system
    const system = this.context.system;
  }
}

Child Management Methods

Actors have protected methods for managing children:

class ParentActor extends Actor {
  handleCall(message: any) {
    if (message.type === "spawn_worker") {
      // Spawn a child actor
      const child = this.spawn(WorkerActor, {
        name: message.name,
        args: [message.config]
      });
      return child;
    }

    if (message.type === "stop_worker") {
      // Stop a specific child
      await this.stopChild(message.workerRef);
    }

    if (message.type === "list_workers") {
      // Get all children
      return this.getChildren();
    }
  }
}

Cascading Termination

When a parent is stopped:

  1. All children are stopped recursively (depth-first)
  2. Each child's terminate() is called
  3. Children are removed from the system
  4. Parent's terminate() is called last
// Tree: root -> child1 -> grandchild
//            -> child2

await system.stop(rootRef);
// Termination order: grandchild, child1, child2, root

Child Supervision Strategies

Parent actors can define how their children should be supervised when they crash. Override the childSupervision() method to customize the behavior:

import { Actor, ChildSupervisionOptions } from "libeam";

class MySupervisor extends Actor {
  // Override to customize child supervision
  childSupervision(): ChildSupervisionOptions {
    return {
      strategy: "one-for-all",  // or "one-for-one", "rest-for-one"
      maxRestarts: 3,           // Max restarts within period
      periodMs: 5000,           // Time window for restart counting
    };
  }

  init() {
    this.spawn(WorkerActor, { args: ["worker1"] });
    this.spawn(WorkerActor, { args: ["worker2"] });
    this.spawn(WorkerActor, { args: ["worker3"] });
  }

  handleCall(message: any) { return "ok"; }
  handleCast(message: any) {}
}

Available Strategies:

| Strategy | Behavior | |----------|----------| | one-for-one | Only restart the crashed child (default) | | one-for-all | Restart all children if one crashes | | rest-for-one | Restart the crashed child and all children spawned after it |

one-for-one (default): Isolates failures - only the crashed actor is restarted.

// If worker2 crashes, only worker2 is restarted
// worker1 and worker3 are unaffected

one-for-all: Use when children have interdependencies and must be restarted together.

// If any worker crashes, all workers are stopped and restarted
// Useful for tightly coupled processes (e.g., producer-consumer pairs)

rest-for-one: Use when children have ordered dependencies.

// Children spawned in order: db -> cache -> api
// If cache crashes, cache and api are restarted (db is unaffected)
// If db crashes, all three are restarted

Max Restarts:

If a child exceeds maxRestarts within periodMs, it will be stopped permanently instead of restarted.

Placement Strategies

Control where actors are spawned:

  • local: Always spawn on the current node
  • round-robin: Distribute across cluster members

Both strategies support an optional role filter. See Node Roles for details.

// Spawn locally
system.spawn(MyActor, { strategy: "local" });

// Distribute across nodes
system.spawn(MyActor, { strategy: "round-robin" });

// Distribute only across nodes with the "worker" role
system.spawn(MyActor, { strategy: "round-robin", role: "worker" });

Cluster Interface

Implement for custom cluster membership:

interface Cluster {
  readonly nodeId: string;
  getMembers(): string[];
}

Transport Interface

Implement for custom network transport:

interface Transport {
  getNodeId(): string;
  connect(): Promise<void>;
  disconnect(): Promise<void>;

  // Point-to-point messaging
  request(nodeId: string, message: any, timeout: number): Promise<any>;
  send(nodeId: string, message: any): Promise<void>;

  // Pub/sub for registry propagation
  publish(topic: string, message: any): Promise<void>;
  subscribe(topic: string, handler: MessageHandler): Promise<Subscription>;

  // Message handlers
  onRequest(handler: RequestHandler): void;
  onMessage(handler: MessageHandler): void;

  // Peer management
  updatePeers(peers: Array<[nodeId: string, address: string]>): void;
}

Example: Chat Application

A complete example showing actors communicating across nodes.

Functional API (Recommended)

import { createSystem, createActor, ActorRef } from "libeam";

const ChatRoom = createActor((ctx, self) => {
  const participants = new Map<string, ActorRef>();

  return self
    .onCall("getParticipants", () => Array.from(participants.keys()))
    .onCast("join", (name: string, ref: ActorRef) => {
      participants.set(name, ref);
      broadcast(`${name} joined the chat`);
    })
    .onCast("message", (from: string, text: string) => {
      broadcast(`[${from}] ${text}`);
    });

  function broadcast(text: string) {
    for (const ref of participants.values()) {
      ref.cast({ method: "notify", args: [text] });
    }
  }
});

const User = createActor((ctx, self, name: string, roomRef: ActorRef) => {
  // Join room on init (roomRef is untyped, so use raw message format)
  roomRef.cast({ method: "join", args: [name, ctx.self] });

  return self.onCast("notify", (text: string) => {
    console.log(`[${name}] ${text}`);
  });
});

// Usage:
const system = createSystem();
const room = system.spawn(ChatRoom);
system.spawn(User, { args: ["Alice", room] });
system.spawn(User, { args: ["Bob", room] });

// TypedActorRef — typed call/cast works directly on room
const members = await room.call("getParticipants"); // ["Alice", "Bob"]

Class-Based API

import {
  Actor,
  ActorRef,
  ActorSystem,
  InMemoryTransport,
  LocalRegistry,
  Cluster,
} from "libeam";

class ChatRoomActor extends Actor {
  private participants = new Map<string, ActorRef>();

  handleCast(
    message:
      | { type: "join"; name: string; ref: ActorRef }
      | { type: "message"; from: string; text: string },
  ) {
    if (message.type === "join") {
      this.participants.set(message.name, message.ref);
      this.broadcast(`${message.name} joined the chat`);
    } else if (message.type === "message") {
      this.broadcast(`[${message.from}] ${message.text}`);
    }
  }

  private broadcast(text: string) {
    for (const ref of this.participants.values()) {
      ref.cast({ type: "notification", text });
    }
  }
}

class UserActor extends Actor {
  private name = "";

  init(name: string, roomRef: ActorRef) {
    this.name = name;
    roomRef.cast({ type: "join", name, ref: this.self });
  }

  handleCast(message: { type: "notification"; text: string }) {
    console.log(`[${this.name}] ${message.text}`);
  }
}

Run the full example:

pnpm example:chat

Graceful Shutdown

Proper shutdown ensures actors terminate cleanly and cluster peers are notified.

Functional API (Recommended)

When using createSystem, a single call handles the entire shutdown sequence:

const system = createSystem();
// ... spawn actors ...
await system.shutdown(); // Terminates actors, leaves cluster, disconnects transport

Class-Based API

When manually wiring components, you must shut them down in order:

async function shutdownNode(system, cluster, transport) {
  // 1. Shutdown actor system (terminates actors, unregisters names)
  await system.shutdown({
    timeout: 5000,        // Wait up to 5s for actors to terminate
    drainMailboxes: true  // Process pending messages first
  });

  // 2. Leave cluster gracefully (notifies peers)
  await cluster.leave();  // Broadcasts "leaving" status to peers

  // 3. Disconnect transport
  await transport.disconnect();
}

// Handle process signals
process.on("SIGTERM", () => shutdownNode(system, cluster, transport));
process.on("SIGINT", () => shutdownNode(system, cluster, transport));

Shutdown Sequence

  1. ActorSystem.shutdown(): Stops accepting new spawns, drains mailboxes, calls terminate() on all actors, unregisters named actors
  2. DistributedCluster.leave(): Broadcasts "leaving" status to peers so they immediately remove this node from membership (instead of waiting for failure timeout)
  3. Transport.disconnect(): Closes network connections

Shutdown Options

interface ShutdownOptions {
  timeout?: number;       // Max ms to wait for actors (default: 5000)
  drainMailboxes?: boolean; // Wait for pending messages (default: true)
}

Cluster Readiness

Wait for the cluster to form before spawning actors that depend on peer connectivity:

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-secret",
});

// Wait for at least one peer to join (default: minMembers=2, timeout=30s)
await system.waitForCluster();

// Wait for a specific number of members
await system.waitForCluster({ minMembers: 3, timeout: 15000 });

// Wait for specific nodes
await system.waitForCluster({ nodes: ["gateway", "worker-1"] });

// Both conditions must be met (AND)
await system.waitForCluster({ minMembers: 3, nodes: ["gateway"] });

Or inline with system creation using the ready option:

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-secret",
  ready: { minMembers: 3, timeout: 15000 },
});
// System is guaranteed to have 3 members when createSystem resolves

waitForCluster uses gossip-based membership detection. If the timeout is reached before conditions are met, a TimeoutError is thrown. For local (non-distributed) systems, waitForCluster() resolves immediately.

Node Roles

Nodes can declare roles to support heterogeneous clusters. Roles are propagated via gossip and used during actor placement to ensure actors land on appropriate nodes.

Declaring Roles

// This node is a gateway
const gateway = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "my-secret",
  roles: ["gateway"],
});

// This node is a worker
const worker = await createSystem({
  type: "distributed",
  port: 5010,
  seedNodes: ["127.0.0.1:5002"],
  cookie: "my-secret",
  roles: ["worker", "compute"],
});

A node can have multiple roles. Roles are immutable — set once at startup and propagated to all peers via gossip.

Role-Based Placement

The role option on spawn() filters candidate nodes before the placement strategy selects one:

// Spawn on any node with the "worker" role (round-robin across workers)
const ref = system.spawn(MyActor, { strategy: "round-robin", role: "worker" });

// Assert the local node has the "gateway" role before spawning locally
const ref = system.spawn(Router, { role: "gateway" });

If no nodes in the cluster have the required role, a NoRoleMatchError is thrown immediately.

Querying Roles

The cluster interface exposes role-based membership queries:

// Get all nodes with a specific role
const workers = system.cluster.getMembersByRole("worker");
// ["node-abc", "node-def"]

How It Works

Roles are stored in each node's gossip state (PeerState.roles) and propagated automatically via the existing gossip protocol. The placement engine filters cluster.getMembersByRole(role) before applying the selected strategy (local or round-robin). No additional network overhead — roles piggyback on the existing heartbeat cycle.

Process Groups

Process groups provide named, dynamic groupings of actors. Any actor can join or leave groups at runtime, and you can broadcast messages to all members of a group. Inspired by Erlang's pg module.

Basic Usage

import { createSystem, createActor } from "libeam";

const Worker = createActor((ctx, self, id: string) => {
  return self
    .onCall("id", () => id)
    .onCast("work", (task: string) => {
      console.log(`Worker ${id} processing: ${task}`);
    });
});

const system = createSystem();

const w1 = system.spawn(Worker, { args: ["a"] });
const w2 = system.spawn(Worker, { args: ["b"] });
const w3 = system.spawn(Worker, { args: ["c"] });

// Add actors to a group
system.joinGroup("workers", w1);
system.joinGroup("workers", w2);
system.joinGroup("workers", w3);

// An actor can be in multiple groups
system.joinGroup("priority", w1);

// Get all members of a group
const members = system.getGroup("workers"); // [ActorRef, ActorRef, ActorRef]

// Broadcast a cast message to all members
system.broadcast("workers", { method: "work", args: ["job-42"] });

// Remove an actor from a group
system.leaveGroup("workers", w2);

// Groups are cleaned up automatically when actors stop
await system.stop(w1); // removed from "workers" and "priority"

API

  • joinGroup(group, ref): void — Add an actor to a named group. Idempotent — joining the same group twice has no effect.
  • leaveGroup(group, ref): void — Remove an actor from a group. Safe to call even if the actor is not in the group.
  • getGroup(group): ActorRef[] — Get all actor refs in a group. Returns [] for unknown groups.
  • broadcast(group, message): void — Send a cast message to every actor in the group.

Behavior

  • Auto-cleanup: When an actor is stopped, it is automatically removed from all groups.
  • Distributed: Group membership changes are propagated across nodes via the transport pub/sub layer. When a node leaves the cluster, all its members are removed from all groups.
  • Idempotent joins: Calling joinGroup with the same actor and group multiple times is safe — only one membership entry is stored.

See test/process_group.test.ts for more examples.

Authentication

Distributed systems can be secured with cookie-based authentication, inspired by Erlang's distribution cookie. When configured, nodes verify each other using HMAC-SHA256 signatures on gossip messages and CurveZMQ encryption on transport connections.

Cookie Configuration

The cookie must be at least 16 characters for secure key derivation:

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-cluster-secret", // ≥16 characters
});

All nodes in a cluster must share the same cookie. Nodes with different cookies cannot communicate — gossip messages are silently dropped and transport connections are rejected.

Distributed Configuration with Salt

For distributed setups, you can optionally customize the key derivation salt:

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-cluster-secret",
  salt: "custom-salt", // Optional, default: "libeam-v2"
});

The salt is used in HKDF key derivation to separate gossip HMAC keys from CurveZMQ keypairs.

Environment Variable

Instead of passing the cookie in code, set the LIBEAM_COOKIE environment variable:

LIBEAM_COOKIE=my-cluster-secret npx tsx app.ts

The precedence order is: auth option > cookie option > LIBEAM_COOKIE env var. If none are set, the system starts in open mode with a warning logged.

Custom Authenticator

For advanced use cases, implement the Authenticator interface:

import { Authenticator, CookieAuthenticator } from "libeam";

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  auth: new CookieAuthenticator("my-secret"),
});

The Authenticator interface covers gossip message signing and verification (HMAC-SHA256). Transport-level security (encryption + authentication) is handled by CurveZMQ at the socket layer, driven by the cookie. See src/auth.ts for the full interface and deriveKeys, z85Encode, z85Decode utilities for advanced use cases.

Cookie Rotation

Cookies can be rotated without restarting the cluster using the Consul-style keyring API. The rotation is operator-driven in three steps:

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "old-cluster-secret",
});

// Step 1: Install the new cookie on ALL nodes (gossip accepts both keys)
system.keyring!.install("new-cluster-secret");

// Step 2: Switch each node to the new cookie (recreates transport sockets)
await system.keyring!.use();

// Step 3: Remove the old cookie from the keyring on ALL nodes
system.keyring!.remove();

Recommended procedure:

  1. Install the new cookie on every node. Order doesn't matter. After this step, gossip messages signed with either the old or new key are accepted.
  2. Use the new cookie on each node (sequentially or in parallel). This triggers a brief transport reconnection window (~milliseconds) as ZeroMQ sockets are recreated with the new CurveZMQ keys. In-flight RPC requests during use() will fail with a TransportError — callers should retry.
  3. Remove the old cookie from the keyring on every node. After this step, only the new key is accepted.

Behavior during rotation:

  • Gossip continues uninterrupted during install() and remove(). Both keys are accepted.
  • Transport has a brief reconnection window during use(). Pending requests are rejected with TransportError.
  • Messages are delivered at-most-once. No duplicates are introduced by the rotation.
  • keyring is only available on distributed systems. Local systems have system.keyring === undefined.

Inspecting the keyring:

// List key fingerprints (SHA-256 hex of public key, never raw cookies)
console.log(system.keyring!.list());
// Before install: ["a1b2c3..."]
// After install:  ["a1b2c3...", "d4e5f6..."]
// After remove:   ["d4e5f6..."]

Known Limitations

| Limitation | Details | |------------|---------| | Gossip UDP is authenticated but not encrypted | HMAC-SHA256 proves identity but does not encrypt message payloads. Use network-level isolation (VPN/firewall) for defense in depth. | | CurveZMQ failures are silent | Wrong cookie = messages dropped with no error. Ensure all nodes share the same cookie. | | v2 auth is not wire-compatible with v1 | Nodes running v1 and v2 cannot communicate. Upgrade all nodes together. |

Logging

Libeam includes a structured logging system with configurable log levels and handlers.

Configuration

import { loggerConfig } from "libeam";

// Set log level (debug, info, warn, error, none)
loggerConfig.level = "debug";

// Custom log handler
loggerConfig.handler = (entry) => {
  // entry: { level, message, context, timestamp, error? }
  console.log(JSON.stringify(entry));
};

Log Levels

  • debug: Detailed debugging information
  • info: General operational messages
  • warn: Warning conditions
  • error: Error conditions
  • none: Disable all logging

Component Loggers

Each component creates its own logger with context:

import { createLogger } from "libeam";

const log = createLogger("MyComponent", nodeId);
log.info("Operation completed", { duration: 100 });
log.error("Operation failed", error, { operationId: "123" });

Telemetry

Libeam includes a lightweight telemetry/instrumentation system inspired by Elixir's :telemetry library. It provides synchronous event emission with zero overhead when no handlers are attached — safe for hot paths like message processing.

Basic Usage

import { telemetry, TelemetryEvents } from "libeam";

// Attach a handler to actor lifecycle events
const handlerId = telemetry.attach("my-metrics", [
  TelemetryEvents.actor.spawn,
  [...TelemetryEvents.actor.stop, "stop"],
], (eventName, measurements, metadata) => {
  console.log(`Event: ${eventName.join(".")}`  , measurements, metadata);
});

// Detach when done
telemetry.detach(handlerId);

Span API

Wrap a function in start/stop/exception telemetry events with automatic duration measurement:

// Emits ["myapp", "db", "query", "start"] before, ["...", "stop"] after
const result = telemetry.span(
  ["myapp", "db", "query"],
  { table: "users" },
  () => db.query("SELECT * FROM users")
);

Event Catalog

All events emitted by libeam:

| Event | Measurements | Metadata | |-------|-------------|----------| | libeam.actor.spawn | — | actor_id, actor_class, name, node_id, parent_id? | | libeam.actor.init.stop | duration_ms | actor_id, actor_class | | libeam.actor.init.exception | duration_ms | actor_id, actor_class, error | | libeam.actor.stop.stop | duration_ms | actor_id, name, children_stopped | | libeam.actor.handle_call.start | system_time | actor_id | | libeam.actor.handle_call.stop | duration_ms | actor_id | | libeam.actor.handle_call.exception | duration_ms | actor_id, error | | libeam.actor.handle_cast.start | system_time | actor_id | | libeam.actor.handle_cast.stop | duration_ms | actor_id | | libeam.actor.handle_cast.exception | duration_ms | actor_id, error | | libeam.supervisor.crash | — | actor_id, error, strategy | | libeam.supervisor.restart | — | actor_id, new_actor_id, attempt, strategy | | libeam.supervisor.max_restarts | count | actor_id, max_restarts, period_ms | | libeam.gen_stage.subscribe | — | producer_id, consumer_tag | | libeam.gen_stage.cancel | — | producer_id, consumer_tag, reason | | libeam.gen_stage.dispatch | event_count | producer_id | | libeam.gen_stage.buffer_overflow | dropped_count, buffer_size | producer_id | | libeam.mailbox.overflow | — | actor_id, message_type | | libeam.cluster.join | — | peer_id, node_id | | libeam.cluster.leave | — | peer_id, node_id | | libeam.system.shutdown.stop | duration_ms, actor_count | node_id |

Metrics Integration

import { telemetry, TelemetryEvents } from "libeam";

// Simple metrics collector
const metrics = {
  actorsSpawned: 0,
  actorsStopped: 0,
  callDurations: [] as number[],
  crashes: 0,
};

telemetry.attach("prometheus", [
  TelemetryEvents.actor.spawn,
  [...TelemetryEvents.actor.stop, "stop"],
  [...TelemetryEvents.actor.handleCall, "stop"],
  TelemetryEvents.supervisor.crash,
], (event, measurements) => {
  const name = event.join(".");
  if (name === "libeam.actor.spawn") metrics.actorsSpawned++;
  if (name === "libeam.actor.stop.stop") metrics.actorsStopped++;
  if (name === "libeam.actor.handle_call.stop") metrics.callDurations.push(measurements.duration_ms);
  if (name === "libeam.supervisor.crash") metrics.crashes++;
});

Zero Overhead

When no handlers are attached, telemetry calls are effectively free — a single Map lookup that returns immediately. Hot paths like message processing (handleCall/handleCast) additionally gate on hasHandlers(), ensuring zero object allocation overhead.

API

| Method | Description | |--------|-------------| | telemetry.attach(id, eventNames, handler) | Attach handler to events. Returns handler id. | | telemetry.detach(id) | Remove handler. Returns true if found. | | telemetry.execute(eventName, measurements, metadata) | Emit event synchronously. No-op if no handlers. | | telemetry.span(eventName, metadata, fn) | Wrap function in start/stop/exception events. | | telemetry.hasHandlers(eventName) | Check if any handlers are attached. | | telemetry.reset() | Remove all handlers (test cleanup). |

Error Handling

Libeam provides typed error classes for better error handling:

import {
  LibeamError,
  ActorNotFoundError,
  RegistryLookupError,
  TimeoutError,
  SystemShuttingDownError,
  TransportError,
  PeerNotFoundError,
} from "libeam";

try {
  await actorRef.call({ type: "get" });
} catch (err) {
  if (err instanceof TimeoutError) {
    console.log(`Timed out after ${err.context?.timeoutMs}ms`);
  } else if (err instanceof ActorNotFoundError) {
    console.log(`Actor ${err.context?.actorId} not found`);
  }
}

Error Types

| Error | Code | Description | |-------|------|-------------| | ActorNotFoundError | ACTOR_NOT_FOUND | Actor does not exist | | RegistryLookupError | REGISTRY_LOOKUP_FAILED | Named actor not in registry | | TimeoutError | TIMEOUT | Operation timed out | | SystemShuttingDownError | SYSTEM_SHUTTING_DOWN | System is shutting down | | TransportError | TRANSPORT_ERROR | Network transport failure | | PeerNotFoundError | PEER_NOT_FOUND | Peer node not known | | ActorClassNotRegisteredError | ACTOR_CLASS_NOT_REGISTERED | Actor class not registered for remote spawn | | AuthenticationError | AUTHENTICATION_FAILED | Node authentication failed | | NoRoleMatchError | NO_ROLE_MATCH | No nodes have the required role | | ActorNotMigratableError | ACTOR_NOT_MIGRATABLE | Actor doesn't implement Migratable | | ActorHasChildrenError | ACTOR_HAS_CHILDREN | Actor has child actors | | MaxChildrenError | MAX_CHILDREN | DynamicSupervisor child limit reached |

Health Checks

Libeam provides health check support for monitoring system status, useful for Kubernetes probes and monitoring systems.

Component Health

Both ActorSystem and DistributedCluster implement HealthCheckable:

import { ActorSystem, DistributedCluster } from "libeam";

// Get health from individual components
const systemHealth = system.getHealth();
// {
//   name: "ActorSystem",
//   status: "healthy",
//   message: "System is healthy",
//   details: { actorCount: 5, totalMailboxSize: 0, registeredClasses: 3 }
// }

const clusterHealth = cluster.getHealth();
// {
//   name: "Cluster",
//   status