@cool-ai/beach-session
v0.8.3
Published
Turn lifecycle built on the manifest registry — the second half of Beach's architectural centre.
Readme
@cool-ai/beach-session
Turn lifecycle and manifest registry — the second half of the Beach architectural centre. @cool-ai/beach-core owns the event router (the first half); @cool-ai/beach-session owns what happens inside a routed turn.
Home: cool-ai.org · Documentation: cool-ai.org/docs
Install
npm install @cool-ai/beach-sessionThe event-routed pattern
For production use, runTurn() is called from inside a router-dispatched handler, not directly from a route or controller. The canonical pipeline is:
channel inbound → routeEvent('channel', 'message_received') (channel-blind first-layer event)
→ message-matcher handler
→ routeEvent('channel', 'message_matched')
→ channel-inbound handler
→ routeEvent('session', 'turn_requested')
→ orchestrator handler ← calls runTurn() here (channel-blind)
→ routeEvent('assistant', 'reply_ready')
→ destination-fan-out handler ← reads session.destinations
→ routeEvent('delivery', '<destinationKind>') (one event per destination)
→ chat-collector / response-collector / consumer-supplied subscribers@cool-ai/beach-starter provides reference implementations of all pipeline handlers except the orchestrator, which is always consumer-supplied. See its README for the complete wiring example and the routing.json template.
A minimal orchestrator handler looks like this:
import { EventRouter } from '@cool-ai/beach-core';
import { SessionTurnManager } from '@cool-ai/beach-session';
import { registerCanonicalHandlers } from '@cool-ai/beach-starter';
import type { TurnRequestedData } from '@cool-ai/beach-starter';
const router = new EventRouter({ ...wireInspectCallbacks.routerOptions });
const manager = new SessionTurnManager({ router, ...wireInspectCallbacks.managerOptions });
// 1. Register the consumer-supplied orchestrator first. The orchestrator
// never sees channelId — the dispatch layer reads session.destinations.
router.register('my-orchestrator', async (event, context) => {
const { sessionId, turnId, inboundMessage } = event.data as TurnRequestedData;
const result = await manager.runTurn({
sessionId,
turnId,
slotKey: 'main',
actorId: 'concierge',
actorConfig: conciergeConfig,
provider: myProvider,
registry: myToolRegistry,
inboundMessage,
...wireInspectCallbacks.turnOptions,
});
// Forward the turn's AbortSignal so destination-fan-out, response-collector,
// and any downstream handler doing I/O observes cancelTurn / timeout
// cooperatively.
await context.routeEvent(
{
source: 'assistant',
eventType: 'reply_ready',
data: { sessionId, turnId, parts: result.parts, turnState: result.turnState },
},
undefined,
{ signal: result.signal },
);
});
// 2. Register the canonical pipeline handlers. Destination-agnostic —
// each session declares its destinations at openSession() time.
registerCanonicalHandlers(router, {
orchestratorHandler: 'my-orchestrator',
resolveSession: (d) => d.threadId,
sessionManager: manager,
chatPublish: async (sessionId, parts) => {
await redis.publish(`reply:${sessionId}`, JSON.stringify(parts));
},
store: missiveStore,
});
// 3. Load routing config (from @cool-ai/beach-starter/templates/routing.json
// with YOUR_ORCHESTRATOR replaced by 'my-orchestrator').
router.loadRoutingConfig(routingConfig);Sequence diagrams: canonical-pipeline.svg, research-fanout.svg.
SessionTurnManager
One instance manages all sessions in a process.
import { EventRouter } from '@cool-ai/beach-core';
import { SessionTurnManager } from '@cool-ai/beach-session';
const manager = new SessionTurnManager({
router,
manifestRegistry, // optional — enables orphan queueing for inject()
onTurnStarted: (event) => {
// { turnId, sessionId, actorId, inboundMessage } — fires before the actor is invoked
},
onTurnSettled: (event) => {
// { turnId, sessionId, actorId, slotKey, fromState, toState, respond }
},
onTurnTimeout: ({ turnId, sessionId }) => { ... },
onTurnCancelled: ({ turnId, sessionId }) => { ... },
});Wire all observation callbacks via wireInspect() from @cool-ai/beach-inspect rather than building them by hand. See the orchestrator example above.
openSession()
const session = manager.openSession({
destinations: [{ kind: 'ui-streaming' }, { kind: 'audit' }],
actors: ['baxter'],
// Optional. Defaults to 'streaming'. Must be registered in ChannelModeRegistry.
channelMode: 'batched',
// Optional. Seed the session's staticSlots for the first turn's contributors.
staticSlots: { priorContext: { query: 'LHR-JFK' } },
// Optional. Contributors fired at turn-open time to inject context messages.
contextContributors: [myContextContributor],
// Optional. Contributors fired by channel adapters at outbound firing points.
outboundContributors: [myOutboundContributor],
// actorDirectory: { baxter: baxterConfig } — required if using passTo handoffs
// id: 'my-session-id' — generated if omitted
});session.channelMode and session.staticSlots are readable on the returned session. staticSlots grows over the session's lifetime as appendOutboundRecord() accumulates entries.
Participants — the canonical turn-advancer abstraction
A Participant is the abstraction Beach's session manager invokes once per turn-advancement step. LLM actors, deterministic classifiers, rules engines, and any other turn-advancer all implement the same interface; the session manager treats them identically through the same lifecycle (heartbeat, timeout, signal cascade, manifest cleanup).
import { LLMActor, type Participant } from '@cool-ai/beach-session';
// canonical LLM Participant — wraps callActor from @cool-ai/beach-llm
const sally: Participant = new LLMActor({ actorConfig, provider, registry });
await manager.runParticipantTurn({
sessionId, turnId, slotKey: 'sally',
participant: sally,
inboundMessage: { role: 'user', content: 'plan a weekend in Rome' },
});Beach's centre is event-routed; turn-advancement is now formally a Participant call, not an LLM call. LLMs are one common Participant kind, not the only one.
HandlerParticipant — the canonical wrapper for deterministic non-LLM turn-advancers. Use it for triage classifiers, rules engines, database-backed responders, queue runners — anything that advances a turn without an LLM call:
import { HandlerParticipant } from '@cool-ai/beach-session';
const triage: Participant = new HandlerParticipant({
id: 'task-triage',
async handle(opts) {
const last = opts.messages[opts.messages.length - 1];
const text = typeof last?.content === 'string' ? last.content : '';
const decision = text.length < 20 ? 'too-short' : 'route-to-specialist';
return {
parts: [{ partType: 'response', text: `Triage: ${decision}` }],
turnState: 'complete',
};
},
});
await manager.runParticipantTurn({ participant: triage, sessionId, slotKey: 'triage', inboundMessage });Same lifecycle as LLMActor: heartbeat, timeout, cancellation signal, mailbox, manifest cleanup. Same RespondCall output shape. Multi-step deterministic work via inject()-driven re-invocation works identically (return turnState: 'awaiting' from the first invocation; the next inject() triggers a second handle() call).
Full guide: documentation/guides/non-llm-participants.md — covers when to use HandlerParticipant vs. an EventRouter handler, the canonical migration pattern from manual turn-advancement, and the cooperative-cancellation pattern.
Current limitations (planned for a future minor release):
passTofrom a non-LLM Participant fails loudly. OnlyLLMActorcarries the registry + provider needed to construct the target actor today.
runTurn(...) remains available and unchanged in shape (see below); it constructs an LLMActor from RunTurnOptions and delegates to runParticipantTurn. Existing callers see no behaviour change.
Context contributors and outbound contributors
Contributors carry context across turns without coupling the inbound adapter to the actor's prompt structure. Two kinds ship:
ContextContributor<E> — fired at turn-open firing points, injects messages into the actor mailbox before the inbound message. Reads staticSlots (outbound records from prior turns) and produces Contribution objects (each wrapping a Message).
OutboundContributor<E> — fired by channel adapters and recipes at outbound firing points. Records data about what was sent. The returned OutboundRecord is stored on the session via appendOutboundRecord() and made available to the next turn's ContextContributor via ContextContext.staticSlots.
import type { ContextContributor, OutboundContributor } from '@cool-ai/beach-session';
// Inject a system-context message at turn-open time
const contextContributor: ContextContributor<unknown> = {
firingPoints: new Set(['turn-open-static']),
contribute: (ctx) => {
const priorSearch = ctx.staticSlots.get('lastSearch');
if (priorSearch === undefined) return [];
return {
message: {
role: 'user',
content: `Prior search context: ${JSON.stringify(priorSearch)}`,
},
};
},
};
// Record outbound data so the next turn can see what was sent
const outboundContributor: OutboundContributor<MyEnvelope> = {
firingPoints: new Set(['channel-send']),
contribute: (ctx) => ({
lastSearch: { query: ctx.artefact.searchQuery, sentAt: new Date().toISOString() },
}),
};
manager.openSession({
destinations: [...],
actors: ['concierge'],
contextContributors: [contextContributor],
outboundContributors: [outboundContributor],
});Firing points
Context contributor firing points — called in the order declared per firing point:
| Name | When | Called by |
|---|---|---|
| turn-open-static | After message history, before inbound message | SessionTurnManager at every turn |
| turn-open-derived | After turn-open-static contributions land | SessionTurnManager at every turn |
| mid-turn-event | Mid-turn when a named event is received | Reserved — not yet invoked by the manager |
Outbound contributor firing points — called by the channel adapter or recipe, not by the manager:
| Name | When | Called by |
|---|---|---|
| channel-send | Before the channel adapter sends an outbound envelope | Channel adapter |
| composer-emit | When a Composer emits a composed envelope | Composer / recipe |
| external-confirm | When an external system confirms receipt | Channel adapter |
| human-approved-send | When a human approval gate clears the message | wireApprovalAirgap (CAIB-192) |
Storing outbound records
// Called by the channel adapter after an OutboundContributor fires:
manager.appendOutboundRecord(sessionId, { lastSearch: { query: 'LHR-JFK' } });
// Retrieves the contributors registered for a session:
const contributors = manager.getOutboundContributors(sessionId);Open registries
ContextContributorFiringPointRegistry and OutboundContributorFiringPointRegistry follow the same open-registry pattern as CheckpointPhaseRegistry. Register custom firing point names at startup; the manager rejects unknown names at first use via assertRegistered.
Two additional registries ship in this release:
ChannelModeRegistry— validchannelModevalues. Defaults:'streaming','batched'.OpenerSourceRegistry— provenance of session opens. Defaults:'inbound','scheduled','peer'.
import { ChannelModeRegistry } from '@cool-ai/beach-session';
ChannelModeRegistry.register({
name: 'hybrid',
description: 'Streaming with a deferred batch confirmation step.',
});Low-level API: runTurn()
Opens a turn, invokes the actor through its full tool loop, and returns a TurnResult when respond() is called. Internally delegates to runParticipantTurn with an LLMActor constructed from the supplied actorConfig, provider, and registry. TurnResult is a structural superset of RespondCall with one extra field — signal: AbortSignal — for cooperative cancellation propagation. Existing destructuring of { parts, turnState } continues to work unchanged.
import { AnthropicProvider, ToolRegistry } from '@cool-ai/beach-llm';
const result = await manager.runTurn({
sessionId: session.id,
actorId: 'baxter',
actorConfig: {
id: 'baxter',
model: 'claude-haiku-4-5',
systemPrompt: '...',
tools: ['task_list'],
},
registry: tools,
provider: new AnthropicProvider(new Anthropic()),
inboundMessage: { role: 'user', content: 'What tasks do I have?' },
slotKey: 'baxter.tasks',
// optional:
turnId: 'my-turn-id',
messageHistory: previousMessages,
onTextBlock: (text) => { ... },
timeoutMs: 60_000,
systemInstruction: 'Focus on...',
onToolExecution: async (record) => { await auditLog.write(record); },
});
result.turnState // 'complete' | 'awaiting' | 'clarifying' | 'error' | 'passed' | ...
result.parts // the actor's response parts
result.signal // AbortSignal — aborts on cancelTurn/timeout. Forward via routeEvent's
// 3rd-arg `{ signal }` so downstream handlers cancel cooperatively.For production use, wrap runTurn() in a router-dispatched handler. See the event-routed pattern above.
inject()
Re-invokes an awaiting turn with a new message. Used when background specialist results arrive.
const outcome = await manager.inject({
turnId: 'my-turn-id',
message: { role: 'user', content: JSON.stringify(results) },
slotKey: 'research',
});
// outcome: 'delivered' | 'dropped-cancelled' | 'dropped-deleted'The FilterAndDistribute primitive (in @cool-ai/beach-core) handles per-tool-call dispatch — when a generalist tool returns an AnnotatedRecord<T>, the framework dispatches to the LLM session, UI streaming, audit, formatter, etc. in parallel without consumer wiring. Direct use of inject() is for custom specialist patterns: pushing background-research results back into an awaiting turn from outside the actor's tool loop.
cancelTurn()
Aborts an active turn. The AbortSignal is propagated to tool handlers; in-flight work that doesn't check it will complete but its results will be silently discarded by inject().
await manager.cancelTurn(turnId);Async since @cool-ai/[email protected]. The manager awaits the durable settled checkpoint before tearing down the turn slot, so the durability guarantee survives cancellation. The abort signal still fires synchronously before the await — tool handlers wired to ctx.signal observe cancellation immediately; only the settled checkpoint persistence and the onTurnCancelled callback fire after the await resolves. Migration from earlier 0.6.x: insert await at every call site.
Durable execution
SessionTurnManager does not implement durability itself. It calls a consumer-supplied DurableExecutor at four lifecycle moments per turn so any substrate satisfying the interface — Trigger.dev, Temporal, Inngest, Cloudflare Durable Objects, or a consumer-built backend — can persist turn state without re-engineering the manager.
import { SessionTurnManager, InMemoryDurableExecutor, type DurableExecutor } from '@cool-ai/beach-session';
const durableExecutor: DurableExecutor = new InMemoryDurableExecutor(); // tests / dev
// or: const durableExecutor = new TriggerDevDurableExecutor({ ... }); // production (separate package)
const manager = new SessionTurnManager({ router, durableExecutor });Without a durableExecutor, no checkpoint code path executes — back-compat preserved exactly. Sessions that haven't opted in see the original non-durable behaviour.
Lifecycle phases
The manager fires four canonical checkpoints across each turn:
started → (llm-complete | tool-received → llm-complete)* → settled| Phase | When it fires | State captured |
|---|---|---|
| started | Slot registered, mailbox seeded, heartbeat live; participant about to be invoked | { actorId, slotKey, messages, expiresAt } |
| llm-complete | One participant iteration complete; mailbox updated | { messages, respond } |
| tool-received | Background result injected via inject(); participant about to re-run | { message, slotKey, manifestId? } |
| settled | Turn closed (terminal, cancel, timeout, passTo) | { turnState, respond, reason } where reason ∈ 'normal' \| 'cancelled' \| 'timed-out' \| 'passed' |
A fifth canonical phase, tool-dispatched, is registered in CheckpointPhaseRegistry but not fired by Beach — the dispatch is intra-iteration inside the participant. Consumers wanting per-tool-dispatch checkpoints fire it themselves from the existing onToolExecution callback.
Phase taxonomy is open — register custom phases
CheckpointPhaseRegistry follows the same shape as PartTypeRegistry. A consumer building a multi-actor research orchestrator can register peer-call-dispatched (or any other domain-specific phase) at startup and fire it from a custom handler — without library modification:
import { CheckpointPhaseRegistry } from '@cool-ai/beach-session';
CheckpointPhaseRegistry.register({
name: 'peer-call-dispatched',
description: 'A2A peer agent invoked; awaiting response.',
});Unknown phases are rejected by fireCheckpoint at first use.
Restoration is consumer-driven
Beach exposes the mechanism (durableExecutor.restore(turnId)) but never calls it itself. Consumers decide when to restore — startup, post-deploy migration, on-demand turn re-entry, or never. The mechanism returns checkpoints in chronological order; an empty array means nothing to restore (turn never checkpointed, or pruned by adapter retention policy).
const checkpoints = await durableExecutor.restore('turn-id');
if (checkpoints.length === 0) {
// nothing to restore — fresh turn or pruned
} else {
const last = checkpoints[checkpoints.length - 1];
// last.state is the most recent checkpoint payload
}Idempotency contract
SessionTurnManager issues monotonically-increasing timestamps — even when two lifecycle events fire within the same wall-clock millisecond, their timestamps differ by at least 1 ms. The (turnId, phase, timestamp) tuple is therefore unique per distinct logical checkpoint at the call site.
The adapter MUST dedupe checkpoint() calls landing more than once with identical (turnId, phase, timestamp) — these represent network-layer retries delivering the same call twice. The adapter MUST NOT dedupe by (turnId, phase) alone — distinct timestamps mean distinct phase transitions and both must be preserved.
Choosing a reference implementation explicitly
Beach ships InMemoryDurableExecutor for tests and dev. Crashes lose every checkpoint because the underlying Map lives in process memory; the warning is loud by design. Production deployments wire a real substrate. Choose one explicitly at startup — the explicit choice is the structural defence against the "I forgot to wire durability" failure mode.
Errors propagate
Errors thrown by checkpoint() propagate to the caller of the lifecycle method that fired the checkpoint. Durability is treated as a hard dependency when configured: a failed started checkpoint aborts the turn before the participant runs; a failed settled checkpoint propagates from cancelTurn / runTurn etc.
ResultsCollector
Collects N parallel specialist results before re-invoking a single awaiting turn. Used when the orchestrator dispatches multiple research tasks in parallel and needs to reassemble before replying.
import { ResultsCollector } from '@cool-ai/beach-session';
import { ManifestRegistry } from '@cool-ai/beach-core';
const registry = new ManifestRegistry();
const manager = new SessionTurnManager({ router, manifestRegistry: registry });
new ResultsCollector({
turnId,
sessionManager: manager,
registry,
expected: ['flights', 'hotels'],
assemble: (filled) => ({
type: 'inject',
message: { role: 'user', content: JSON.stringify(Object.fromEntries(filled)) },
}),
timeoutMs: 10_000,
onTimeout: (filled) => ({
type: 'inject',
message: { role: 'user', content: `Partial results: ${JSON.stringify(Object.fromEntries(filled))}` },
}),
});
registry.deliver(`results-collector:${turnId}`, 'flights', flightData);
registry.deliver(`results-collector:${turnId}`, 'hotels', hotelData);
// → both slots filled → assemble() called → inject() fires → actor re-invokedassemble() and onTimeout() return an AssemblyOutcome:
| type | Effect |
|--------|--------|
| inject | Re-invokes the actor with message appended to the turn thread |
| events | Routes the listed events; turn is not re-invoked |
| reset | Re-opens the manifest for corrective re-search; optionally routes events |
Outbound-edge subscription pattern
Outbound channel adapters consume the turn in one of two shapes. This is the only place "streaming vs batched" lives in Beach — the interior is uniform.
Streaming edge — SSE, WebSocket, voice. The edge subscribes to the actor's part stream via Redis pub/sub. The chatCollectorHandler from @cool-ai/beach-starter handles this automatically.
Batched edge — SMTP, SMS, push. At inbound time, open a Delivery Manifest with expected: ['main_reply']. Fill it from onTurnSettled. The responseCollectorHandler from @cool-ai/beach-starter handles the missive write automatically; the edge formatter reads the store on manifest settlement.
import { ManifestRegistry, Manifest } from '@cool-ai/beach-core';
const manifestRegistry = new ManifestRegistry();
const manifest = new Manifest({
id: `email-delivery:${inbound.id}`,
expected: ['main_reply'],
onComplete: async (filled) => {
const artifact = await formatter.format({ inbound, filledSlots: filled });
await channel.send(artifact);
},
});
manifestRegistry.register(manifest);
const settled = await manager.runTurn({ /* ... */ });
manifestRegistry.deliver(manifest.id, 'main_reply', { parts: settled.parts });The interior has no awareness of which edge shape is in use. Principle 2.7 (channel-agnostic actors) is preserved because channel knowledge lives only at the inbound and outbound edges.
Turn states
| State | Meaning |
|-------|---------|
| awaiting | Waiting for background results; inject() re-invokes |
| complete | Turn finished normally |
| clarifying | Actor asked a clarifying question; next user message opens a new turn |
| error | Turn failed; actor cannot recover |
| passed | Actor handed off to another actor via passTo |
| cancelled | Cancelled via cancelTurn() |
Multi-actor handoffs
An actor responds with turnState: 'passed' and passTo: 'other-actor'. The session manager closes the current turn, opens a new one for the target actor, and inherits the full conversation thread.
const session = manager.openSession({
channelId: 'chat',
actors: ['concierge', 'specialist'],
actorDirectory: {
concierge: conciergeConfig,
specialist: specialistConfig,
},
});Not in this package
- LLM invocation and
respond()parsing (@cool-ai/beach-llm). - Event routing (
@cool-ai/beach-core). - Envelope assembly (
@cool-ai/beach-protocol). - Canonical pipeline handlers (
@cool-ai/beach-starter).
Related
@cool-ai/beach-starter— canonical pipeline handlers and routing template.- https://cool-ai.org/docs/design-principles — principles 1.1 (architectural centre), 2.8 (explicit turn completion), 3.1 (background dispatch), 3.7 (replay).
- https://cool-ai.org/docs/respond-tool — the tool the actor calls.
