libeam
v0.3.0
Published
Erlang/OTP-inspired actor system for TypeScript — actors, supervision, distribution, GenStage
Maintainers
Readme
libeam
An Erlang/OTP-inspired actor system for TypeScript. Build distributed, fault-tolerant applications with location-transparent actors, automatic supervision, and gossip-based cluster membership.
Features
- Functional API: Simple, closure-based actor definitions with
createSystemandcreateActor(Recommended) - Actor Model: Lightweight actors with message-passing semantics (
callfor request/response,castfor fire-and-forget) - Location Transparency: Actors communicate via
ActorRefregardless of whether the target is local or remote - Supervision: Automatic crash handling with configurable restart strategies
- Distributed Clustering: Gossip-based membership detection with automatic peer discovery
- Placement Strategies: Control where actors are spawned (
local,round-robin) with optional role-based filtering - Transport Abstraction: Pluggable transport layer (in-memory for testing, ZeroMQ for production)
Elixir/OTP Parity
libeam implements the core primitives from Elixir/OTP, adapted for the Node.js runtime.
Implemented
| Feature | Elixir/OTP | libeam |
|---------|------------|--------|
| Actor Model |
| Spawn processes | spawn/1, GenServer.start/2 | system.spawn() |
| Async messages | send/2, GenServer.cast/2 | ref.cast() |
| Sync calls | GenServer.call/2 | ref.call() |
| GenServer Callbacks |
| init/1 | Yes | init() |
| handle_call/3 | Yes | handleCall() |
| handle_cast/2 | Yes | handleCast() |
| handle_info/2 | Yes | handleInfo() |
| handle_continue/2 | Yes | handleContinue() |
| terminate/2 | Yes | terminate() |
| Idle timeout | {:noreply, state, timeout} | setIdleTimeout() |
| Supervision |
| Supervisors | Supervisor | Supervisor, ChildSupervisor |
| one-for-one | Yes | Yes |
| one-for-all | Yes | Yes |
| rest-for-one | Yes | Yes |
| Max restarts | Yes | Yes |
| Dynamic supervisors | DynamicSupervisor | DynamicSupervisor |
| Process Features |
| Links | Process.link/1 | link() |
| Monitors | Process.monitor/1 | watch() |
| Trap exit | Process.flag(:trap_exit, true) | setTrapExit() |
| Exit signals | Process.exit/2 | exit() |
| Timers | Process.send_after/3 | sendAfter(), sendInterval() |
| Introspection |
| List children | Supervisor.which_children/1 | getChildren() |
| Count children | Supervisor.count_children/1 | countChildren() |
| Abstractions |
| Agent | Agent | Agent |
| DynamicSupervisor | DynamicSupervisor | DynamicSupervisor |
| GenStage | GenStage | Producer, Consumer, ProducerConsumer, ConsumerSupervisor |
| Distribution |
| Cluster membership | :net_kernel | Cluster, GossipProtocol |
| Remote messaging | Transparent | Via Transport |
| Registry | Registry, :global | Registry, DistributedRegistry |
| Actor migration | Manual process migration | system.migrate() |
| Node roles | Node profiles | roles config + role spawn option |
| Process groups | pg | joinGroup(), getGroup(), broadcast() |
Not Implemented (Not Needed in Node.js)
| Feature | Reason |
|---------|--------|
| Task.async/await | Use native Promise / async-await |
| Selective receive | Not practical without BEAM VM |
| Hot code upgrades | Use libeam deploy for rolling deploys |
| Application behaviour | Use standard Node.js entry points |
| ETS/DETS | Use Map or external stores (Redis, etc.) |
Not Yet Implemented
All core OTP features have been implemented. See the feature table above for the full list.
Installation
npm / pnpm / yarn
# npm
npm install libeam
# pnpm
pnpm add libeam
# yarn
yarn add libeamDeno (JSR)
deno add jsr:@libeam/coreOr import directly:
import { createSystem, createActor } from "jsr:@libeam/core";Quick Start
Functional API (Recommended)
The functional API provides a simple, closure-based approach to defining actors:
import { createSystem, createActor } from "libeam";
// Define an actor with closure-based state
const Counter = createActor((ctx, self, initialValue: number) => {
let count = initialValue;
return self
.onCall("get", () => count)
.onCall("increment", () => ++count)
.onCast("set", (value: number) => { count = value; });
});
// Create a system (one line!)
const system = createSystem();
// Spawn and interact with typed refs
const counter = system.spawn(Counter, { args: [0] });
const value = await counter.call("get"); // 0 — fully typed!
await counter.call("increment"); // 1
counter.cast("set", 100); // fire-and-forget
await system.shutdown();Class-Based API
For full control, extend the Actor class directly:
import { Actor, ActorRef } from "libeam";
class CounterActor extends Actor {
private count = 0;
init(initialValue: number = 0) {
this.count = initialValue;
console.log(`Counter initialized with ${this.count}`);
}
// Synchronous request/response
handleCall(message: { type: string }): number {
switch (message.type) {
case "get":
return this.count;
case "increment":
return ++this.count;
default:
throw new Error(`Unknown message: ${message.type}`);
}
}
// Fire-and-forget messages
handleCast(message: { type: string; value?: number }): void {
if (message.type === "set" && message.value !== undefined) {
this.count = message.value;
}
}
}Single-Node Setup (In-Memory)
For testing or single-process applications:
import {
ActorSystem,
InMemoryTransport,
LocalCluster,
LocalRegistry,
} from "libeam";
async function main() {
const cluster = new LocalCluster("node1");
const transport = new InMemoryTransport("node1");
const registry = new LocalRegistry();
await transport.connect();
const system = new ActorSystem(cluster, transport, registry);
system.registerActorClass(CounterActor);
await system.start();
// Spawn an actor
const counter = system.spawn(CounterActor, {
name: "my-counter",
args: [10], // Initial value
});
// Interact with the actor
const value = await counter.call({ type: "get" });
console.log(`Current value: ${value}`); // 10
await counter.call({ type: "increment" });
console.log(`After increment: ${await counter.call({ type: "get" })}`); // 11
counter.cast({ type: "set", value: 100 });
}
main();Multi-Node Setup (Functional API)
For distributed applications across multiple processes/machines, createSystem handles all the wiring — ZeroMQ transport, gossip protocol, cluster membership, and registry sync:
import { createSystem, createActor, ActorRegistry } from "libeam";
// Define actors
const Ping = createActor((ctx, self) => {
return self.onCast("ping", async (n: number) => {
console.log(`Ping received: ${n}`);
const pong = await ctx.getActorByName("pong");
if (pong) pong.cast("pong", n + 1);
});
});
const Pong = createActor((ctx, self) => {
return self.onCast("pong", async (n: number) => {
console.log(`Pong received: ${n}`);
const ping = await ctx.getActorByName("ping");
if (ping) ping.cast("ping", n + 1);
});
});
// Register actors for typed getActorByName — same codebase on all nodes
declare module "libeam" {
interface ActorRegistry {
ping: typeof Ping;
pong: typeof Pong;
}
}
// Node 1 — port convention: rpc=5000, pub=5001, gossip=5002
const system1 = await createSystem({
type: "distributed",
port: 5000,
seedNodes: [],
cookie: "my-cluster-secret",
});
const ping = system1.spawn(Ping, { name: "ping" });
// Node 2 — joins via node1's gossip port
const system2 = await createSystem({
type: "distributed",
port: 5010,
seedNodes: ["127.0.0.1:5002"],
cookie: "my-cluster-secret",
});
system2.spawn(Pong, { name: "pong" });
// Start the game — typed ref from spawn() supports clean syntax
ping.cast("ping", 0);Run the full distributed example:
# Terminal 1
npx tsx examples/high-level/distributed.ts node1
# Terminal 2
npx tsx examples/high-level/distributed.ts node2Multi-Node Setup (Class-Based API)
For full control over transport, gossip, and cluster configuration:
import {
ActorSystem,
ZeroMQTransport,
GossipProtocol,
GossipUDP,
DistributedCluster,
DistributedRegistry,
RegistrySync,
} from "libeam";
async function startNode(config: {
nodeId: string;
rpcPort: number;
pubPort: number;
gossipPort: number;
seedNodes: string[];
}) {
const { nodeId, rpcPort, pubPort, gossipPort, seedNodes } = config;
// 1. Setup transport (ZeroMQ)
const transport = new ZeroMQTransport({
nodeId,
rpcPort,
pubPort,
bindAddress: "0.0.0.0",
});
await transport.connect();
// 2. Setup gossip protocol for membership
const gossipUDP = new GossipUDP(gossipPort);
const gossipProtocol = new GossipProtocol(
nodeId,
`tcp://127.0.0.1:${rpcPort}`, // RPC address for peers
`127.0.0.1:${gossipPort}`, // Gossip address
gossipUDP,
{
gossipIntervalMs: 1000,
cleanupIntervalMs: 2000,
failureTimeoutMs: 5000,
gossipFanout: 3,
seedNodes,
},
);
// 3. Setup cluster (wraps gossip protocol)
const cluster = new DistributedCluster(gossipProtocol);
await cluster.start();
// 4. Setup registry sync for actor name resolution
const registrySync = new RegistrySync(nodeId, transport, cluster);
const registry = new DistributedRegistry(nodeId, registrySync);
// 5. Wire cluster membership changes to transport
cluster.on("member_join", (peerId: string) => {
const peer = cluster.getPeerState(peerId);
if (peer) {
transport.updatePeers([[peerId, peer.address]]);
}
});
// 6. Create actor system
const system = new ActorSystem(cluster, transport, registry);
system.registerActorClass(CounterActor);
await system.start();
return { system, cluster, transport };
}
// Node 1 (seed node)
const node1 = await startNode({
nodeId: "node1",
rpcPort: 5000,
pubPort: 5001,
gossipPort: 6000,
seedNodes: [], // No seeds for first node
});
// Node 2 (joins via seed)
const node2 = await startNode({
nodeId: "node2",
rpcPort: 5010,
pubPort: 5011,
gossipPort: 6010,
seedNodes: ["127.0.0.1:6000"], // Connect to node1's gossip port
});Functional API
The functional API is the recommended way to build applications with libeam. It provides better type safety, less boilerplate, and a more modern developer experience.
createSystem
The createSystem factory simplifies system creation and configuration.
import { createSystem } from "libeam";
// Local system (synchronous, zero config)
const system = createSystem();
// Local with options
const system = createSystem({ nodeId: "my-node" });
// Distributed (async, with ZeroMQ + Gossip)
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: ["127.0.0.1:6002"],
});System Interface:
spawn(actorClass, options?): Spawn an actor and return aTypedActorRefregister(actorClass): Register an actor class for remote spawninggetActorByName(name): Look up a named actor (local or remote)shutdown(): Gracefully shut down the system and all actorsstop(ref): Stop an individual actor (cascading termination of children)nodeId: The unique ID of this nodetransport: Access to the underlying transport layercluster: Access to the cluster membership interfaceregistry: Access to the actor registrysystem: Escape hatch to the rawActorSysteminstance
createActor
Define actors using a closure-based factory function.
const MyActor = createActor((ctx, self, ...args) => {
// Initialization logic here
return self.onCall("ping", () => "pong");
});The factory function receives:
ctx: The actor context (spawn, watch, link, stash, etc.)self: The actor builder (register handlers, timers, etc.)...args: Arguments passed duringspawn
ActorContext (ctx) Methods:
self: Reference to this actorparent: Reference to the parent actorspawn(actor, options?): Spawn a child actorwatch(ref)/unwatch(ref): Monitor other actorslink(ref)/unlink(ref): Bidirectional crash propagationexit(reason?): Stop this actor (with optional reason)setTrapExit(boolean): Enable/disable exit trappinggetActorByName(name): Look up a named actor (local or remote)stash()/unstash()/unstashAll()/clearStash(): Message stashing
ActorBuilder (self) Methods:
onCall(name, handler): Register a request-reply handleronCast(name, handler): Register a fire-and-forget handleronInfo(type, handler): Register a handler for system messages ("down","exit","timeout","moved")onTerminate(handler): Cleanup logiconContinue(handler): Deferred initializationsendAfter(msg, delay)/sendInterval(msg, interval): TimerssetIdleTimeout(ms): Configure idle timeoutmigratable({ getState, setState }): Enable actor migrationchildSupervision(options): Configure supervision strategy for children
TypedActorRef
When you spawn an actor created with createActor, you get a TypedActorRef. This provides full TypeScript autocompletion and type checking for call and cast.
const counter = system.spawn(Counter, { args: [0] });
// TypeScript knows "get" and "increment" are valid calls
const value = await counter.call("get");
await counter.call("increment");
// TypeScript knows "set" is a valid cast and requires a number
counter.cast("set", 42);Type Inference
When you return the builder chain from a createActor factory, TypeScript automatically infers the handler types:
// With return — full type inference
const Counter = createActor((ctx, self, initial: number) => {
let count = initial;
return self
.onCall("get", () => count)
.onCall("increment", () => ++count)
.onCast("set", (v: number) => { count = v; });
});
const counter = system.spawn(Counter, { args: [0] });
counter.call("get"); // TypeScript knows this returns number
counter.cast("set", 42); // TypeScript knows "set" expects a number
counter.call("typo"); // Type error! "typo" is not a valid methodWithout return, the factory still works but handlers are untyped:
// Without return — still works, but no type inference
const Untyped = createActor((ctx, self) => {
self.onCall("get", () => 42);
});Timer methods (sendAfter, sendInterval) return TimerRef, not the builder, so they must be called on a separate line before the return:
const Heartbeat = createActor((ctx, self) => {
self.sendInterval({ type: "tick" }, 1000); // separate line (returns TimerRef)
return self
.onCast("tick", () => console.log("tick"))
.onTerminate(() => console.log("stopped"));
});Module Augmentation (Typed getActorByName)
By default, getActorByName returns an untyped ActorRef. To get fully typed refs, augment the ActorRegistry interface:
// types.d.ts (or any .ts file)
import "libeam";
declare module "libeam" {
interface ActorRegistry {
counter: typeof Counter;
"chat-room": typeof ChatRoom;
}
}Now getActorByName returns typed refs when called with registered names:
const counter = await system.getActorByName("counter");
if (counter) {
const value = await counter.call("get"); // fully typed!
counter.cast("set", 100); // fully typed!
}Utility types for working with actor definitions:
ActorRefFrom<T>— ExtractTypedActorReffrom anActorDefinitionExtractCalls<T>— Extract call handler types from anActorDefinitionExtractCasts<T>— Extract cast handler types from anActorDefinition
import { ActorRefFrom } from "libeam";
type CounterRef = ActorRefFrom<typeof Counter>;
// TypedActorRef<{ get: () => number; increment: () => number }, { set: (v: number) => void }>Feature Comparison
| Feature | Functional API | Class-Based API |
|---------|---------------|-----------------|
| State management | Closures | Class fields |
| Message handlers | self.onCall() / self.onCast() | handleCall() / handleCast() |
| Type safety | Automatic (TypedActorRef) | Manual typing |
| System setup | createSystem() | Manual wiring |
| Child supervision | self.childSupervision() | childSupervision() override |
| Deferred init | self.onContinue() | handleContinue() override |
| Message stashing | ctx.stash() / ctx.unstashAll() | this.stash() / this.unstashAll() |
Class-Based API Reference
This section documents the low-level, class-based API. While the Functional API is recommended for most use cases, the class-based API provides full control and is used internally by the system.
Actor
Base class for all actors.
class MyActor extends Actor {
// Called when actor starts. Receives spawn arguments.
init(...args: any[]): void | Promise<void>;
// Called when actor is stopped.
terminate(): void | Promise<void>;
// Handle synchronous requests (must return a value).
handleCall(message: any): any | Promise<any>;
// Handle asynchronous messages (fire-and-forget).
handleCast(message: any): void | Promise<void>;
// Reference to self for sending to other actors
self: ActorRef;
}handleContinue
Called when init() returns { continue: data } for deferred async initialization.
class DatabaseActor extends Actor {
private db!: DatabaseConnection;
init() {
// Return immediately, continue async work
return { continue: "connect" };
}
async handleContinue(data: string) {
if (data === "connect") {
this.db = await connectToDatabase();
}
}
}See examples/high-level/handle_continue.ts and examples/low-level/handle_continue.ts for more examples.
Idle Timeout
Set a timeout that fires when the actor is idle (no messages received).
class SessionActor extends Actor {
init() {
// Timeout after 30 seconds of inactivity
this.setIdleTimeout(30000);
}
handleInfo(message: InfoMessage) {
if (message.type === "timeout") {
console.log(`Session idle for ${message.idleMs}ms, closing...`);
this.exit(this.self, "normal");
}
}
}Methods:
setIdleTimeout(timeoutMs: number): void- Set idle timeout in millisecondsgetIdleTimeout(): number- Get current idle timeout
The actor receives a TimeoutMessage via handleInfo() when the timeout fires.
See examples/high-level/idle_timeout.ts and test/idle_timeout.test.ts for more examples.
Timers
Schedule delayed and periodic messages to yourself.
class ReminderActor extends Actor {
private reminderRef!: TimerRef;
init() {
// One-shot timer: remind after 5 seconds
this.reminderRef = this.sendAfter({ type: "remind" }, 5000);
// Periodic timer: tick every second
this.sendInterval({ type: "tick" }, 1000);
}
handleCast(message: { type: string }) {
if (message.type === "remind") {
console.log("Time's up!");
} else if (message.type === "tick") {
console.log("Tick...");
}
}
terminate() {
// Clean up timers
this.cancelTimer(this.reminderRef);
this.cancelAllTimers();
}
}Methods:
sendAfter(message, delayMs): TimerRef- Schedule one-shot messagesendInterval(message, intervalMs): TimerRef- Schedule repeating messagecancelTimer(timerRef): boolean- Cancel a specific timercancelAllTimers(): void- Cancel all active timers
See examples/high-level/timers.ts and examples/low-level/timers.ts for more examples.
Watching
Monitor another actor's lifecycle and receive notification when it terminates.
class WorkerSupervisor extends Actor {
private workerWatch!: WatchRef;
init(workerRef: ActorRef) {
// Start watching the worker
this.workerWatch = this.watch(workerRef);
}
handleInfo(message: InfoMessage) {
if (message.type === "down") {
const down = message as DownMessage;
console.log(`Worker ${down.actorRef.id} terminated: ${down.reason.type}`);
// Unwatch when done
this.unwatch(down.watchRef);
}
}
}Methods:
watch(actorRef): WatchRef- Start watching an actorunwatch(watchRef): void- Stop watching
Behavior:
- One-shot notification: You receive exactly one
DownMessagewhen the watched actor terminates - Auto-cleanup: The watch is automatically removed after the DOWN message is delivered
- Works across nodes: Can watch actors on remote nodes
See examples/high-level/watching.ts and examples/low-level/actor_watching.ts for more examples.
Links
Bidirectional crash propagation between actors. If one linked actor crashes, the other crashes too (unless trapExit is enabled).
class ParentActor extends Actor {
private childLink!: LinkRef;
init(childRef: ActorRef) {
// Link to child - bidirectional crash propagation
this.childLink = this.link(childRef);
// Enable trap exit to receive ExitMessage instead of crashing
this.setTrapExit(true);
}
handleInfo(message: InfoMessage) {
if (message.type === "exit") {
const exit = message as ExitMessage;
console.log(`Linked actor exited: ${exit.reason.type}`);
// Unlink when done
this.unlink(exit.linkRef!);
}
}
terminateChild() {
// Send exit signal to linked actor
this.exit(this.childLink.actorRef, "shutdown");
}
}Methods:
link(actorRef): LinkRef- Create bidirectional linkunlink(linkRef): void- Remove linksetTrapExit(trap: boolean): void- Enable/disable exit trappingisTrapExit(): boolean- Check if exit trapping is enabledexit(actorRef, reason?): void- Send exit signal to actor
Exit Reasons:
"normal"- No effect on linked actors"kill"- Always terminates, ignores trapExit- Custom string - Delivered to linked actors with trapExit enabled
See examples/high-level/links.ts and examples/low-level/actor_links.ts for more examples.
Message Stashing
Defer message processing until the actor is ready. Useful for state-dependent message handling.
class StatefulActor extends Actor {
private ready = false;
private pendingMessages: any[] = [];
init() {
// Actor starts in "not ready" state
this.ready = false;
}
async handleCall(message: any) {
if (!this.ready) {
// Stash message for later processing
this.stash();
return "stashed";
}
// Process message normally
return this.processMessage(message);
}
setReady() {
this.ready = true;
// Replay all stashed messages
this.unstashAll();
}
private processMessage(message: any) {
return `Processed: ${message}`;
}
}Methods:
stash(): void- Save current message to stashunstash(): void- Replay one stashed message (FIFO order)unstashAll(): void- Replay all stashed messagesclearStash(): void- Discard all stashed messages
See examples/high-level/message_stashing.ts and test/message_stashing.test.ts for more examples.
InfoMessage Types
System messages delivered via handleInfo(). Use type guards to distinguish between message variants.
handleInfo(message: InfoMessage) {
switch (message.type) {
case "down":
const down = message as DownMessage;
console.log(`Actor ${down.actorRef.id} terminated: ${down.reason.type}`);
break;
case "exit":
const exit = message as ExitMessage;
console.log(`Linked actor exited: ${exit.reason.type}`);
break;
case "timeout":
const timeout = message as TimeoutMessage;
console.log(`Idle for ${timeout.idleMs}ms`);
break;
case "moved":
const moved = message as MovedMessage;
console.log(`Actor moved to ${moved.newNodeId}`);
break;
}
}| Message | Type | Fields | Description |
|---------|------|--------|-------------|
| DownMessage | "down" | watchRef, actorRef, reason | Watched actor terminated |
| ExitMessage | "exit" | linkRef?, actorRef, reason | Linked actor exited (trapExit only) |
| TimeoutMessage | "timeout" | idleMs | Idle timeout fired |
| MovedMessage | "moved" | watchRef?, linkRef?, actorRef, oldNodeId, newNodeId, newActorId | Actor migrated to another node |
TerminationReason:
type TerminationReason =
| { type: "normal" }
| { type: "error"; error: any }
| { type: "killed" };ActorRef
Location-transparent reference to an actor.
// Request/response with timeout (default 5000ms)
const result = await actorRef.call(message, timeout?);
// Fire-and-forget
actorRef.cast(message);Agent
State management abstraction for simple key-value storage.
// Create an agent
const counter = Agent.start(system, 0);
// Read state
const value = await counter.get();
// Update state (waits for completion)
await counter.update(n => n + 1);
// Fire-and-forget update
counter.cast(n => n + 1);
// Stop the agent
await counter.stop();Methods:
Agent.start<T>(system, initialState, options?): Agent<T>- Create an agentget(timeout?): Promise<T>- Get current stateupdate(fn, timeout?): Promise<T>- Update state, returns new valuegetAndUpdate(fn, timeout?): Promise<T>- Update state, returns old valuecast(fn): void- Fire-and-forget state updatestop(): Promise<void>- Stop the agentgetRef(): ActorRef- Access underlying actor reference
See test/agent.test.ts for more examples.
DynamicSupervisor
On-demand supervised child spawning. Unlike static supervision trees where children are defined at init time, a DynamicSupervisor starts with zero children and allows adding them at runtime. All children are supervised with a one-for-one strategy.
import { DynamicSupervisor } from "libeam";
// Start a dynamic supervisor
const dynSup = DynamicSupervisor.start(system);
// With options
const dynSup = DynamicSupervisor.start(system, {
maxChildren: 100, // Cap child count (default: Infinity)
maxRestarts: 3, // Per-child restart limit (default: 3)
periodMs: 5000, // Restart counting window (default: 5000)
});
// Named supervisor
const dynSup = DynamicSupervisor.start(system, {}, { name: "worker-pool" });
// Start children on demand
const workerRef = await dynSup.startChild(WorkerActor, { args: ["job-1"] });
// Functional actors return TypedActorRef with full type inference
const counter = await dynSup.startChild(Counter, { args: [0] });
await counter.call("get"); // fully typed!
counter.cast("set", 42); // fully typed!
// Inspect children
const children = await dynSup.whichChildren();
// [{ ref: ActorRef, className: "WorkerActor", name?: string }]
const counts = await dynSup.countChildren();
// { specs: 2, active: 2 }
// Terminate a specific child
await dynSup.terminateChild(workerRef); // true if found, false otherwise
// Stop supervisor (cascades to all children)
await dynSup.stop();Methods:
DynamicSupervisor.start(system, options?, spawnOptions?): DynamicSupervisor- Create a supervisorstartChild(actorClass, options?): Promise<ActorRef | TypedActorRef>- Spawn a supervised childterminateChild(ref): Promise<boolean>- Stop a child by refwhichChildren(): Promise<ChildInfo[]>- List active children with metadatacountChildren(): Promise<ChildCounts>- Get child count statisticsstop(): Promise<void>- Stop supervisor and all childrengetRef(): ActorRef- Access underlying actor reference
Supervision behavior:
- Children that crash are automatically restarted (one-for-one)
- If a child exceeds
maxRestartswithinperiodMs, it is stopped permanently startChildthrowsMaxChildrenErrorwhen themaxChildrenlimit is reached- Stopping the supervisor cascades to all children (depth-first termination)
See test/dynamic_supervisor.test.ts for more examples.
GenStage
Demand-driven producer-consumer pipelines with back-pressure. Inspired by Elixir's GenStage. Consumers tell producers how many events they can handle; producers never emit more than requested.
import { Producer, Consumer, ProducerConsumer, ConsumerSupervisor } from "libeam";
// Producer: emits sequential numbers on demand
const producer = Producer.start(system, {
init: () => 0,
handleDemand: (demand, counter) => {
const events = Array.from({ length: demand }, (_, i) => counter + i);
return [events, counter + demand];
},
});
// Consumer: prints received events
const consumer = Consumer.start(system, {
handleEvents: (events, _from, state) => {
console.log("Received:", events);
return state;
},
});
// Subscribe with back-pressure options
await consumer.subscribe(producer.getRef(), { maxDemand: 100, minDemand: 50 });
// 3-stage pipeline with transformation
const multiplier = ProducerConsumer.start(system, {
init: () => 10,
handleEvents: (events, _from, factor) => {
return [events.map(e => e * factor), factor];
},
});
await multiplier.subscribe(producer.getRef(), { maxDemand: 50 });
await consumer.subscribe(multiplier.getRef(), { maxDemand: 50 });Stage Types:
Producer— Emits events in response to downstream demand. Buffers events when demand is zero.Consumer— Subscribes to producers and processes received events.ProducerConsumer— Receives events from upstream, transforms them, and dispatches downstream.ConsumerSupervisor— Subscribes to a producer and spawns a supervised worker per event. Back-pressure is tied to worker lifecycle.
Producer Methods:
Producer.start(system, callbacks, options?, spawnOptions?): Producer— Create a producerstop(): Promise<void>— Stop the producerdemand(mode): void— Switch demand mode (call with"forward"to resume afterdemand: "accumulate")getRef(): ActorRef— Get the producer's ref (for consumer subscriptions)
Consumer Methods:
Consumer.start(system, callbacks, spawnOptions?): Consumer— Create a consumersubscribe(producerRef, options?): Promise<SubscriptionRef>— Subscribe to a producercancel(ref): boolean— Cancel a subscriptionstop(): Promise<void>— Stop and cancel all subscriptionsgetRef(): ActorRef— Get the consumer's ref
ProducerConsumer Methods:
ProducerConsumer.start(system, callbacks, producerOptions?, spawnOptions?): ProducerConsumer— Create a stagesubscribe(producerRef, options?): Promise<SubscriptionRef>— Subscribe to upstreamcancelUpstream(ref): boolean— Cancel an upstream subscriptionstop(): Promise<void>— Stop and cancel all subscriptionsgetRef(): ActorRef— Get the stage's ref
ConsumerSupervisor Methods:
ConsumerSupervisor.start(system, childSpec, options?, spawnOptions?): ConsumerSupervisor— Create a consumer supervisorsubscribe(producerRef, options?): Promise<SubscriptionRef>— Subscribe to a producercancel(ref): boolean— Cancel a subscriptionwhichChildren(): Promise<ChildInfo[]>— List active worker childrencountChildren(): Promise<ChildCounts>— Get worker count statisticsstop(): Promise<void>— Stop and cancel all subscriptions, terminate all workersgetRef(): ActorRef— Get the supervisor's ref
Subscription Options:
maxDemand— Max events in flight per subscription (default: 1000)minDemand— Threshold to request more events (default: 75% of maxDemand)cancel— Cancel behavior:"permanent"|"transient"|"temporary"(default)partition— Partition to subscribe to (required for PartitionDispatcher)
Dispatcher Types:
Producers can be configured with different dispatch strategies via ProducerOptions.dispatcher:
| Dispatcher | Description | Use Case |
|------------|-------------|----------|
| { type: "demand" } | Sends to consumer with highest pending demand (default) | Work pools, load balancing |
| { type: "broadcast" } | Sends all events to all consumers | Event buses, audit logging, fan-out |
| { type: "partition", partitions: N, hash? } | Routes events by hash to fixed partitions | Ordered processing per key, sharding |
// Default: DemandDispatcher (highest-demand-first)
const producer = Producer.start(system, callbacks);
// BroadcastDispatcher: all consumers get all events
const producer = Producer.start(system, callbacks, {
dispatcher: { type: "broadcast" },
});
// PartitionDispatcher: hash-based routing
const producer = Producer.start(system, callbacks, {
dispatcher: {
type: "partition",
partitions: 4,
hash: (event) => event.userId % 4, // route by user ID
},
});
// Consumers subscribe to specific partitions
await consumer0.subscribe(producer.getRef(), { maxDemand: 10, partition: 0 });
await consumer1.subscribe(producer.getRef(), { maxDemand: 10, partition: 1 });BroadcastDispatcher behavior:
- All consumers receive the same events (fan-out)
- Demand =
min(all consumer demands)— slowest consumer throttles the pipeline - Sequential subscribes are safe — initial demand is deferred to ensure all consumers register before events flow
Demand Mode (demand: "accumulate"):
For complex topologies where you need deterministic setup, start the producer in accumulate mode:
const producer = Producer.start(system, callbacks, {
dispatcher: { type: "broadcast" },
demand: "accumulate", // Pauses event production
});
// Subscribe all consumers (order doesn't matter)
await consumer1.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer2.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer3.subscribe(producer.getRef(), { maxDemand: 10 });
// Resume — now all consumers receive the same events
producer.demand("forward");Inspired by Elixir's {:producer, state, demand: :accumulate}.
PartitionDispatcher behavior:
- One consumer per partition (subscribing to a taken partition throws)
- Hash function maps each event to a partition:
(event) => partitionIndex | null - Returning
nullfrom hash discards the event - Events for partitions with no consumer are buffered per-partition
- Default hash: modulo for numbers, djb2 for strings,
event.keyfor objects
Back-pressure behavior:
- Consumer sends initial demand of
maxDemandon subscribe (deferred to next tick for broadcast safety) - When pending demand drops to
minDemand, consumer automatically re-asks - Producer never emits more events than total demand from all consumers
- Excess events are buffered in the producer (configurable
bufferSize, default: 10000) - Multiple consumers receive events via the configured dispatcher
See test/gen_stage.test.ts for more examples.
ConsumerSupervisor behavior:
ConsumerSupervisor spawns one supervised worker per event. Demand is tied to worker lifecycle:
- On subscribe, sends initial demand of
maxDemandto producer - Each received event spawns a worker:
actorClass.init(...baseArgs, event) maxDemand= max concurrent workers (never more active at once)- When a worker exits (normal or crash), its demand slot is released
- Released slots accumulate; when they reach
minDemand, more events are requested - Workers are supervised with one-for-one strategy (configurable
maxRestarts/periodMs)
import { Producer, ConsumerSupervisor } from "libeam";
// Producer emits jobs
const producer = Producer.start(system, {
init: () => 0,
handleDemand: (demand, counter) => {
const jobs = Array.from({ length: demand }, (_, i) => ({
id: counter + i,
payload: `task-${counter + i}`,
}));
return [jobs, counter + demand];
},
});
// ConsumerSupervisor spawns a JobWorker per event
const supervisor = ConsumerSupervisor.start(system, {
actorClass: JobWorker,
args: ["base-config"], // event appended as last arg
});
// max 10 concurrent workers, re-ask when 7 complete
await supervisor.subscribe(producer.getRef(), {
maxDemand: 10,
minDemand: 7,
});
// Inspect active workers
const workers = await supervisor.whichChildren();
const counts = await supervisor.countChildren();ActorSystem
Manages actor lifecycle on a node.
const system = new ActorSystem(cluster, transport, registry, supervisionOptions?);
// Register actor classes for remote spawning
system.registerActorClass(MyActor);
system.registerActorClasses([ActorA, ActorB]);
// Spawn actors
const ref = system.spawn(MyActor, {
name?: string, // Optional registered name
args?: any[], // Arguments passed to init()
strategy?: 'local' | 'round-robin' // Placement strategy
});
// Stop an actor
await system.stop(actorRef);
// Start processing messages
await system.start();
// Check system state
system.isRunning(); // true if running and not shutting down
system.isShuttingDown(); // true if shutdown in progress
// Graceful shutdown
await system.shutdown({
timeout: 5000, // Max time to wait for actors (default: 5000ms)
drainMailboxes: true // Wait for pending messages (default: true)
});Supervision
Configure crash handling behavior:
const system = new ActorSystem(cluster, transport, registry, {
strategy: "Restart", // or 'Stop'
maxRestarts: 3, // Max restarts within period
periodMs: 5000, // Time window for restart counting
});Supervision Trees
Actors can spawn child actors, creating a supervision tree hierarchy. When a parent actor is stopped, all its children are automatically terminated first (cascading termination).
class WorkerActor extends Actor {
handleCall(message: any) {
if (message.type === "work") {
return `Processed: ${message.data}`;
}
}
handleCast(message: any) {}
}
class SupervisorActor extends Actor {
private workers: ActorRef[] = [];
init(workerCount: number) {
// Spawn child workers under this supervisor
for (let i = 0; i < workerCount; i++) {
const worker = this.spawn(WorkerActor, { name: `worker-${i}` });
this.workers.push(worker);
}
}
handleCall(message: any) {
if (message.type === "get_worker_count") {
return this.getChildren().length;
}
if (message.type === "dispatch") {
// Round-robin to workers
const worker = this.workers[message.index % this.workers.length];
return worker.call({ type: "work", data: message.data });
}
}
handleCast(message: any) {}
}
// Usage
const supervisor = system.spawn(SupervisorActor, { args: [3] });
// Supervisor has 3 child workers
const count = await supervisor.call({ type: "get_worker_count" }); // 3
// When supervisor is stopped, all workers are terminated first
await system.stop(supervisor); // Stops workers, then supervisorActor Context
Each actor has access to its context:
class MyActor extends Actor {
someMethod() {
// Reference to parent actor (undefined for root actors)
const parent = this.context.parent;
// Set of child actor references
const children = this.context.children;
// Reference to the actor system
const system = this.context.system;
}
}Child Management Methods
Actors have protected methods for managing children:
class ParentActor extends Actor {
handleCall(message: any) {
if (message.type === "spawn_worker") {
// Spawn a child actor
const child = this.spawn(WorkerActor, {
name: message.name,
args: [message.config]
});
return child;
}
if (message.type === "stop_worker") {
// Stop a specific child
await this.stopChild(message.workerRef);
}
if (message.type === "list_workers") {
// Get all children
return this.getChildren();
}
}
}Cascading Termination
When a parent is stopped:
- All children are stopped recursively (depth-first)
- Each child's
terminate()is called - Children are removed from the system
- Parent's
terminate()is called last
// Tree: root -> child1 -> grandchild
// -> child2
await system.stop(rootRef);
// Termination order: grandchild, child1, child2, rootChild Supervision Strategies
Parent actors can define how their children should be supervised when they crash. Override the childSupervision() method to customize the behavior:
import { Actor, ChildSupervisionOptions } from "libeam";
class MySupervisor extends Actor {
// Override to customize child supervision
childSupervision(): ChildSupervisionOptions {
return {
strategy: "one-for-all", // or "one-for-one", "rest-for-one"
maxRestarts: 3, // Max restarts within period
periodMs: 5000, // Time window for restart counting
};
}
init() {
this.spawn(WorkerActor, { args: ["worker1"] });
this.spawn(WorkerActor, { args: ["worker2"] });
this.spawn(WorkerActor, { args: ["worker3"] });
}
handleCall(message: any) { return "ok"; }
handleCast(message: any) {}
}Available Strategies:
| Strategy | Behavior |
|----------|----------|
| one-for-one | Only restart the crashed child (default) |
| one-for-all | Restart all children if one crashes |
| rest-for-one | Restart the crashed child and all children spawned after it |
one-for-one (default): Isolates failures - only the crashed actor is restarted.
// If worker2 crashes, only worker2 is restarted
// worker1 and worker3 are unaffectedone-for-all: Use when children have interdependencies and must be restarted together.
// If any worker crashes, all workers are stopped and restarted
// Useful for tightly coupled processes (e.g., producer-consumer pairs)rest-for-one: Use when children have ordered dependencies.
// Children spawned in order: db -> cache -> api
// If cache crashes, cache and api are restarted (db is unaffected)
// If db crashes, all three are restartedMax Restarts:
If a child exceeds maxRestarts within periodMs, it will be stopped permanently instead of restarted.
Placement Strategies
Control where actors are spawned:
local: Always spawn on the current noderound-robin: Distribute across cluster members
Both strategies support an optional role filter. See Node Roles for details.
// Spawn locally
system.spawn(MyActor, { strategy: "local" });
// Distribute across nodes
system.spawn(MyActor, { strategy: "round-robin" });
// Distribute only across nodes with the "worker" role
system.spawn(MyActor, { strategy: "round-robin", role: "worker" });Cluster Interface
Implement for custom cluster membership:
interface Cluster {
readonly nodeId: string;
getMembers(): string[];
}Transport Interface
Implement for custom network transport:
interface Transport {
getNodeId(): string;
connect(): Promise<void>;
disconnect(): Promise<void>;
// Point-to-point messaging
request(nodeId: string, message: any, timeout: number): Promise<any>;
send(nodeId: string, message: any): Promise<void>;
// Pub/sub for registry propagation
publish(topic: string, message: any): Promise<void>;
subscribe(topic: string, handler: MessageHandler): Promise<Subscription>;
// Message handlers
onRequest(handler: RequestHandler): void;
onMessage(handler: MessageHandler): void;
// Peer management
updatePeers(peers: Array<[nodeId: string, address: string]>): void;
}Example: Chat Application
A complete example showing actors communicating across nodes.
Functional API (Recommended)
import { createSystem, createActor, ActorRef } from "libeam";
const ChatRoom = createActor((ctx, self) => {
const participants = new Map<string, ActorRef>();
return self
.onCall("getParticipants", () => Array.from(participants.keys()))
.onCast("join", (name: string, ref: ActorRef) => {
participants.set(name, ref);
broadcast(`${name} joined the chat`);
})
.onCast("message", (from: string, text: string) => {
broadcast(`[${from}] ${text}`);
});
function broadcast(text: string) {
for (const ref of participants.values()) {
ref.cast({ method: "notify", args: [text] });
}
}
});
const User = createActor((ctx, self, name: string, roomRef: ActorRef) => {
// Join room on init (roomRef is untyped, so use raw message format)
roomRef.cast({ method: "join", args: [name, ctx.self] });
return self.onCast("notify", (text: string) => {
console.log(`[${name}] ${text}`);
});
});
// Usage:
const system = createSystem();
const room = system.spawn(ChatRoom);
system.spawn(User, { args: ["Alice", room] });
system.spawn(User, { args: ["Bob", room] });
// TypedActorRef — typed call/cast works directly on room
const members = await room.call("getParticipants"); // ["Alice", "Bob"]Class-Based API
import {
Actor,
ActorRef,
ActorSystem,
InMemoryTransport,
LocalRegistry,
Cluster,
} from "libeam";
class ChatRoomActor extends Actor {
private participants = new Map<string, ActorRef>();
handleCast(
message:
| { type: "join"; name: string; ref: ActorRef }
| { type: "message"; from: string; text: string },
) {
if (message.type === "join") {
this.participants.set(message.name, message.ref);
this.broadcast(`${message.name} joined the chat`);
} else if (message.type === "message") {
this.broadcast(`[${message.from}] ${message.text}`);
}
}
private broadcast(text: string) {
for (const ref of this.participants.values()) {
ref.cast({ type: "notification", text });
}
}
}
class UserActor extends Actor {
private name = "";
init(name: string, roomRef: ActorRef) {
this.name = name;
roomRef.cast({ type: "join", name, ref: this.self });
}
handleCast(message: { type: "notification"; text: string }) {
console.log(`[${this.name}] ${message.text}`);
}
}Run the full example:
pnpm example:chatGraceful Shutdown
Proper shutdown ensures actors terminate cleanly and cluster peers are notified.
Functional API (Recommended)
When using createSystem, a single call handles the entire shutdown sequence:
const system = createSystem();
// ... spawn actors ...
await system.shutdown(); // Terminates actors, leaves cluster, disconnects transportClass-Based API
When manually wiring components, you must shut them down in order:
async function shutdownNode(system, cluster, transport) {
// 1. Shutdown actor system (terminates actors, unregisters names)
await system.shutdown({
timeout: 5000, // Wait up to 5s for actors to terminate
drainMailboxes: true // Process pending messages first
});
// 2. Leave cluster gracefully (notifies peers)
await cluster.leave(); // Broadcasts "leaving" status to peers
// 3. Disconnect transport
await transport.disconnect();
}
// Handle process signals
process.on("SIGTERM", () => shutdownNode(system, cluster, transport));
process.on("SIGINT", () => shutdownNode(system, cluster, transport));Shutdown Sequence
- ActorSystem.shutdown(): Stops accepting new spawns, drains mailboxes, calls
terminate()on all actors, unregisters named actors - DistributedCluster.leave(): Broadcasts "leaving" status to peers so they immediately remove this node from membership (instead of waiting for failure timeout)
- Transport.disconnect(): Closes network connections
Shutdown Options
interface ShutdownOptions {
timeout?: number; // Max ms to wait for actors (default: 5000)
drainMailboxes?: boolean; // Wait for pending messages (default: true)
}Cluster Readiness
Wait for the cluster to form before spawning actors that depend on peer connectivity:
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: ["127.0.0.1:6002"],
cookie: "my-secret",
});
// Wait for at least one peer to join (default: minMembers=2, timeout=30s)
await system.waitForCluster();
// Wait for a specific number of members
await system.waitForCluster({ minMembers: 3, timeout: 15000 });
// Wait for specific nodes
await system.waitForCluster({ nodes: ["gateway", "worker-1"] });
// Both conditions must be met (AND)
await system.waitForCluster({ minMembers: 3, nodes: ["gateway"] });Or inline with system creation using the ready option:
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: ["127.0.0.1:6002"],
cookie: "my-secret",
ready: { minMembers: 3, timeout: 15000 },
});
// System is guaranteed to have 3 members when createSystem resolveswaitForCluster uses gossip-based membership detection. If the timeout is reached before conditions are met, a TimeoutError is thrown. For local (non-distributed) systems, waitForCluster() resolves immediately.
Node Roles
Nodes can declare roles to support heterogeneous clusters. Roles are propagated via gossip and used during actor placement to ensure actors land on appropriate nodes.
Declaring Roles
// This node is a gateway
const gateway = await createSystem({
type: "distributed",
port: 5000,
seedNodes: [],
cookie: "my-secret",
roles: ["gateway"],
});
// This node is a worker
const worker = await createSystem({
type: "distributed",
port: 5010,
seedNodes: ["127.0.0.1:5002"],
cookie: "my-secret",
roles: ["worker", "compute"],
});A node can have multiple roles. Roles are immutable — set once at startup and propagated to all peers via gossip.
Role-Based Placement
The role option on spawn() filters candidate nodes before the placement strategy selects one:
// Spawn on any node with the "worker" role (round-robin across workers)
const ref = system.spawn(MyActor, { strategy: "round-robin", role: "worker" });
// Assert the local node has the "gateway" role before spawning locally
const ref = system.spawn(Router, { role: "gateway" });If no nodes in the cluster have the required role, a NoRoleMatchError is thrown immediately.
Querying Roles
The cluster interface exposes role-based membership queries:
// Get all nodes with a specific role
const workers = system.cluster.getMembersByRole("worker");
// ["node-abc", "node-def"]How It Works
Roles are stored in each node's gossip state (PeerState.roles) and propagated automatically via the existing gossip protocol. The placement engine filters cluster.getMembersByRole(role) before applying the selected strategy (local or round-robin). No additional network overhead — roles piggyback on the existing heartbeat cycle.
Process Groups
Process groups provide named, dynamic groupings of actors. Any actor can join or leave groups at runtime, and you can broadcast messages to all members of a group. Inspired by Erlang's pg module.
Basic Usage
import { createSystem, createActor } from "libeam";
const Worker = createActor((ctx, self, id: string) => {
return self
.onCall("id", () => id)
.onCast("work", (task: string) => {
console.log(`Worker ${id} processing: ${task}`);
});
});
const system = createSystem();
const w1 = system.spawn(Worker, { args: ["a"] });
const w2 = system.spawn(Worker, { args: ["b"] });
const w3 = system.spawn(Worker, { args: ["c"] });
// Add actors to a group
system.joinGroup("workers", w1);
system.joinGroup("workers", w2);
system.joinGroup("workers", w3);
// An actor can be in multiple groups
system.joinGroup("priority", w1);
// Get all members of a group
const members = system.getGroup("workers"); // [ActorRef, ActorRef, ActorRef]
// Broadcast a cast message to all members
system.broadcast("workers", { method: "work", args: ["job-42"] });
// Remove an actor from a group
system.leaveGroup("workers", w2);
// Groups are cleaned up automatically when actors stop
await system.stop(w1); // removed from "workers" and "priority"API
joinGroup(group, ref): void— Add an actor to a named group. Idempotent — joining the same group twice has no effect.leaveGroup(group, ref): void— Remove an actor from a group. Safe to call even if the actor is not in the group.getGroup(group): ActorRef[]— Get all actor refs in a group. Returns[]for unknown groups.broadcast(group, message): void— Send a cast message to every actor in the group.
Behavior
- Auto-cleanup: When an actor is stopped, it is automatically removed from all groups.
- Distributed: Group membership changes are propagated across nodes via the transport pub/sub layer. When a node leaves the cluster, all its members are removed from all groups.
- Idempotent joins: Calling
joinGroupwith the same actor and group multiple times is safe — only one membership entry is stored.
See test/process_group.test.ts for more examples.
Authentication
Distributed systems can be secured with cookie-based authentication, inspired by Erlang's distribution cookie. When configured, nodes verify each other using HMAC-SHA256 signatures on gossip messages and CurveZMQ encryption on transport connections.
Cookie Configuration
The cookie must be at least 16 characters for secure key derivation:
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: ["127.0.0.1:6002"],
cookie: "my-cluster-secret", // ≥16 characters
});All nodes in a cluster must share the same cookie. Nodes with different cookies cannot communicate — gossip messages are silently dropped and transport connections are rejected.
Distributed Configuration with Salt
For distributed setups, you can optionally customize the key derivation salt:
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: ["127.0.0.1:6002"],
cookie: "my-cluster-secret",
salt: "custom-salt", // Optional, default: "libeam-v2"
});The salt is used in HKDF key derivation to separate gossip HMAC keys from CurveZMQ keypairs.
Environment Variable
Instead of passing the cookie in code, set the LIBEAM_COOKIE environment variable:
LIBEAM_COOKIE=my-cluster-secret npx tsx app.tsThe precedence order is: auth option > cookie option > LIBEAM_COOKIE env var. If none are set, the system starts in open mode with a warning logged.
Custom Authenticator
For advanced use cases, implement the Authenticator interface:
import { Authenticator, CookieAuthenticator } from "libeam";
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: [],
auth: new CookieAuthenticator("my-secret"),
});The Authenticator interface covers gossip message signing and verification (HMAC-SHA256). Transport-level security (encryption + authentication) is handled by CurveZMQ at the socket layer, driven by the cookie. See src/auth.ts for the full interface and deriveKeys, z85Encode, z85Decode utilities for advanced use cases.
Cookie Rotation
Cookies can be rotated without restarting the cluster using the Consul-style keyring API. The rotation is operator-driven in three steps:
const system = await createSystem({
type: "distributed",
port: 5000,
seedNodes: [],
cookie: "old-cluster-secret",
});
// Step 1: Install the new cookie on ALL nodes (gossip accepts both keys)
system.keyring!.install("new-cluster-secret");
// Step 2: Switch each node to the new cookie (recreates transport sockets)
await system.keyring!.use();
// Step 3: Remove the old cookie from the keyring on ALL nodes
system.keyring!.remove();Recommended procedure:
- Install the new cookie on every node. Order doesn't matter. After this step, gossip messages signed with either the old or new key are accepted.
- Use the new cookie on each node (sequentially or in parallel). This triggers a brief transport reconnection window (~milliseconds) as ZeroMQ sockets are recreated with the new CurveZMQ keys. In-flight RPC requests during
use()will fail with aTransportError— callers should retry. - Remove the old cookie from the keyring on every node. After this step, only the new key is accepted.
Behavior during rotation:
- Gossip continues uninterrupted during
install()andremove(). Both keys are accepted. - Transport has a brief reconnection window during
use(). Pending requests are rejected withTransportError. - Messages are delivered at-most-once. No duplicates are introduced by the rotation.
keyringis only available on distributed systems. Local systems havesystem.keyring === undefined.
Inspecting the keyring:
// List key fingerprints (SHA-256 hex of public key, never raw cookies)
console.log(system.keyring!.list());
// Before install: ["a1b2c3..."]
// After install: ["a1b2c3...", "d4e5f6..."]
// After remove: ["d4e5f6..."]Known Limitations
| Limitation | Details | |------------|---------| | Gossip UDP is authenticated but not encrypted | HMAC-SHA256 proves identity but does not encrypt message payloads. Use network-level isolation (VPN/firewall) for defense in depth. | | CurveZMQ failures are silent | Wrong cookie = messages dropped with no error. Ensure all nodes share the same cookie. | | v2 auth is not wire-compatible with v1 | Nodes running v1 and v2 cannot communicate. Upgrade all nodes together. |
Logging
Libeam includes a structured logging system with configurable log levels and handlers.
Configuration
import { loggerConfig } from "libeam";
// Set log level (debug, info, warn, error, none)
loggerConfig.level = "debug";
// Custom log handler
loggerConfig.handler = (entry) => {
// entry: { level, message, context, timestamp, error? }
console.log(JSON.stringify(entry));
};Log Levels
debug: Detailed debugging informationinfo: General operational messageswarn: Warning conditionserror: Error conditionsnone: Disable all logging
Component Loggers
Each component creates its own logger with context:
import { createLogger } from "libeam";
const log = createLogger("MyComponent", nodeId);
log.info("Operation completed", { duration: 100 });
log.error("Operation failed", error, { operationId: "123" });Telemetry
Libeam includes a lightweight telemetry/instrumentation system inspired by Elixir's :telemetry library. It provides synchronous event emission with zero overhead when no handlers are attached — safe for hot paths like message processing.
Basic Usage
import { telemetry, TelemetryEvents } from "libeam";
// Attach a handler to actor lifecycle events
const handlerId = telemetry.attach("my-metrics", [
TelemetryEvents.actor.spawn,
[...TelemetryEvents.actor.stop, "stop"],
], (eventName, measurements, metadata) => {
console.log(`Event: ${eventName.join(".")}` , measurements, metadata);
});
// Detach when done
telemetry.detach(handlerId);Span API
Wrap a function in start/stop/exception telemetry events with automatic duration measurement:
// Emits ["myapp", "db", "query", "start"] before, ["...", "stop"] after
const result = telemetry.span(
["myapp", "db", "query"],
{ table: "users" },
() => db.query("SELECT * FROM users")
);Event Catalog
All events emitted by libeam:
| Event | Measurements | Metadata |
|-------|-------------|----------|
| libeam.actor.spawn | — | actor_id, actor_class, name, node_id, parent_id? |
| libeam.actor.init.stop | duration_ms | actor_id, actor_class |
| libeam.actor.init.exception | duration_ms | actor_id, actor_class, error |
| libeam.actor.stop.stop | duration_ms | actor_id, name, children_stopped |
| libeam.actor.handle_call.start | system_time | actor_id |
| libeam.actor.handle_call.stop | duration_ms | actor_id |
| libeam.actor.handle_call.exception | duration_ms | actor_id, error |
| libeam.actor.handle_cast.start | system_time | actor_id |
| libeam.actor.handle_cast.stop | duration_ms | actor_id |
| libeam.actor.handle_cast.exception | duration_ms | actor_id, error |
| libeam.supervisor.crash | — | actor_id, error, strategy |
| libeam.supervisor.restart | — | actor_id, new_actor_id, attempt, strategy |
| libeam.supervisor.max_restarts | count | actor_id, max_restarts, period_ms |
| libeam.gen_stage.subscribe | — | producer_id, consumer_tag |
| libeam.gen_stage.cancel | — | producer_id, consumer_tag, reason |
| libeam.gen_stage.dispatch | event_count | producer_id |
| libeam.gen_stage.buffer_overflow | dropped_count, buffer_size | producer_id |
| libeam.mailbox.overflow | — | actor_id, message_type |
| libeam.cluster.join | — | peer_id, node_id |
| libeam.cluster.leave | — | peer_id, node_id |
| libeam.system.shutdown.stop | duration_ms, actor_count | node_id |
Metrics Integration
import { telemetry, TelemetryEvents } from "libeam";
// Simple metrics collector
const metrics = {
actorsSpawned: 0,
actorsStopped: 0,
callDurations: [] as number[],
crashes: 0,
};
telemetry.attach("prometheus", [
TelemetryEvents.actor.spawn,
[...TelemetryEvents.actor.stop, "stop"],
[...TelemetryEvents.actor.handleCall, "stop"],
TelemetryEvents.supervisor.crash,
], (event, measurements) => {
const name = event.join(".");
if (name === "libeam.actor.spawn") metrics.actorsSpawned++;
if (name === "libeam.actor.stop.stop") metrics.actorsStopped++;
if (name === "libeam.actor.handle_call.stop") metrics.callDurations.push(measurements.duration_ms);
if (name === "libeam.supervisor.crash") metrics.crashes++;
});Zero Overhead
When no handlers are attached, telemetry calls are effectively free — a single Map lookup that returns immediately. Hot paths like message processing (handleCall/handleCast) additionally gate on hasHandlers(), ensuring zero object allocation overhead.
API
| Method | Description |
|--------|-------------|
| telemetry.attach(id, eventNames, handler) | Attach handler to events. Returns handler id. |
| telemetry.detach(id) | Remove handler. Returns true if found. |
| telemetry.execute(eventName, measurements, metadata) | Emit event synchronously. No-op if no handlers. |
| telemetry.span(eventName, metadata, fn) | Wrap function in start/stop/exception events. |
| telemetry.hasHandlers(eventName) | Check if any handlers are attached. |
| telemetry.reset() | Remove all handlers (test cleanup). |
Error Handling
Libeam provides typed error classes for better error handling:
import {
LibeamError,
ActorNotFoundError,
RegistryLookupError,
TimeoutError,
SystemShuttingDownError,
TransportError,
PeerNotFoundError,
} from "libeam";
try {
await actorRef.call({ type: "get" });
} catch (err) {
if (err instanceof TimeoutError) {
console.log(`Timed out after ${err.context?.timeoutMs}ms`);
} else if (err instanceof ActorNotFoundError) {
console.log(`Actor ${err.context?.actorId} not found`);
}
}Error Types
| Error | Code | Description |
|-------|------|-------------|
| ActorNotFoundError | ACTOR_NOT_FOUND | Actor does not exist |
| RegistryLookupError | REGISTRY_LOOKUP_FAILED | Named actor not in registry |
| TimeoutError | TIMEOUT | Operation timed out |
| SystemShuttingDownError | SYSTEM_SHUTTING_DOWN | System is shutting down |
| TransportError | TRANSPORT_ERROR | Network transport failure |
| PeerNotFoundError | PEER_NOT_FOUND | Peer node not known |
| ActorClassNotRegisteredError | ACTOR_CLASS_NOT_REGISTERED | Actor class not registered for remote spawn |
| AuthenticationError | AUTHENTICATION_FAILED | Node authentication failed |
| NoRoleMatchError | NO_ROLE_MATCH | No nodes have the required role |
| ActorNotMigratableError | ACTOR_NOT_MIGRATABLE | Actor doesn't implement Migratable |
| ActorHasChildrenError | ACTOR_HAS_CHILDREN | Actor has child actors |
| MaxChildrenError | MAX_CHILDREN | DynamicSupervisor child limit reached |
Health Checks
Libeam provides health check support for monitoring system status, useful for Kubernetes probes and monitoring systems.
Component Health
Both ActorSystem and DistributedCluster implement HealthCheckable:
import { ActorSystem, DistributedCluster } from "libeam";
// Get health from individual components
const systemHealth = system.getHealth();
// {
// name: "ActorSystem",
// status: "healthy",
// message: "System is healthy",
// details: { actorCount: 5, totalMailboxSize: 0, registeredClasses: 3 }
// }
const clusterHealth = cluster.getHealth();
// {
// name: "Cluster",
// status