@cool-ai/beach-core
v0.6.1
Published
Event router, manifest registry, and open registries — the first half of Beach's architectural centre.
Downloads
2,737
Readme
@cool-ai/beach-core
Owns the event router and manifest registry — the first half of Beach's architectural centre. Every cross-component message is a routeEvent() call; every asynchronous or suspended flow is a Manifest waiting for a result. @cool-ai/beach-session owns the second half (the turn lifecycle built on the registry).
Home: cool-ai.org · Documentation: cool-ai.org/docs
Install
npm install @cool-ai/beach-coreEventRouter
Every cross-component message in a Beach application passes through routeEvent(). Nothing bypasses it.
import { EventRouter } from '@cool-ai/beach-core';
const router = new EventRouter();
// Register handlers by name
router.register('handle_search', async (event) => {
const { query } = event.data;
// ... perform search
});
// Load declarative routing config
router.loadRoutingConfig({
rules: [
{ source: 'user', eventType: 'search_requested', handler: 'handle_search' },
// Payload-conditional dispatch — first match wins
{ source: 'user', eventType: 'message', handler: 'handle_urgent',
when: { payload: { priority: { equals: 'high' } } } },
{ source: 'user', eventType: 'message', handler: 'handle_normal' },
],
});
// Dispatch an event
await router.routeEvent({ source: 'user', eventType: 'search_requested', data: { query: 'flights to Rome' } });Payload predicate DSL
Routing when guards and cascade when clauses share a common predicate shape:
const when = {
payload: {
status: { equals: 'confirmed' }, // exact match
region: { exists: true }, // field must be present
},
anyOf: [
{ payload: { departureDate: { exists: true } } },
{ payload: { travelMonth: { exists: true } } },
],
allOf: [
{ payload: { destination: { exists: true } } },
{ payload: { passengers: { exists: true } } },
],
};payload — all predicates must pass (AND). anyOf — at least one sub-clause must pass (OR). allOf — all sub-clauses must pass (AND). All three must be satisfied when present.
Cascade config
Cascades fire derived events when a triggering event matches. They are suppressed when minimumContext fields are absent.
router.loadCascadeConfig({
rules: [
{
when: { event: 'search_completed', payload: { hasResults: { equals: true } } },
cascade: { handler: 'enrich_results', minimumContext: ['userId'] },
},
],
});
// Pass session context alongside the event
await router.routeEvent(
{ source: 'search', eventType: 'search_completed', data: { hasResults: true } },
{ userId: 'u-123' }, // context — merged with event data for minimumContext check
);Observability callbacks
RouterOptions accepts three optional callbacks for observability or testing:
const router = new EventRouter({
onRoutingDecision: (decision) => {
// Fires on every routing attempt — matched or unmatched.
if (decision.matched) {
console.log('routed to', decision.handler);
} else {
console.log('unmatched:', decision.reason); // 'no-rule' | 'no-when-match'
}
},
onCascadeHandlerError: ({ ruleName, error, event }) => { ... },
onCascadeSuppressed: ({ ruleName, missingFields, event }) => { ... },
});onRoutingDecision fires in all three dispatchRouting() paths: matched, no-rule-found (before RouteNotFoundError is thrown), and when-guard failed (before silent drop). The payload is a discriminated union — { matched: true; handler; ... } or { matched: false; reason: 'no-rule' | 'no-when-match'; ... }. sessionId is extracted defensively from event.data when present.
For context that must be loaded from storage, use cascadeContextProvider in RouterOptions:
const router = new EventRouter({
cascadeContextProvider: async (event) => {
return sessionStore.get(event.data.sessionId);
},
});ManifestRegistry
Coordinates collecting N expected results before firing a callback. Handles pre-registration arrivals via a scoped orphan queue.
import { Manifest, ManifestRegistry, ManifestEventStore } from '@cool-ai/beach-core';
import type { ManifestObserver } from '@cool-ai/beach-core';
// Optional — pass a ManifestObserver (e.g. ManifestEventStore from beach-inspect)
// to receive structured events for observability.
const registry = new ManifestRegistry({ observer: myObserver });
const manifest = new Manifest({
id: 'search:rome',
turnId: currentTurnId, // optional — links manifest to a turn in beach-inspect
expected: ['flights', 'hotels'],
timeoutMs: 10_000,
onComplete: async (filled) => {
// filled: Map<string, unknown> — all slots with the data passed to deliver()
await assemble(filled.get('flights'), filled.get('hotels'));
},
onTimeout: async (filled) => {
// filled: Map<string, unknown> — only arrived slots
// Return the AssemblyOutcome type string to include it in beach-inspect's timeout event.
await assemblePartial(filled);
return 'inject'; // 'inject' | 'events' | 'reset' | void
},
});
registry.register(manifest);
// Deliver results — targeted by manifest ID
registry.deliver('search:rome', 'flights', flightResults);
registry.deliver('search:rome', 'hotels', hotelResults);
// → both slots filled → onComplete firesResults delivered before register() are stored as scoped orphans keyed to ${manifestId}:${key} and drained when the manifest registers. Concurrent manifests expecting the same slot key are safe — each manifest only receives deliveries addressed to its own ID.
To enable full observability in beach-inspect, pass a ManifestEventStore (from @cool-ai/beach-inspect) as the observer. It receives lifecycle events for registration, slot fills, completion, timeout, cancellation, and orphan queue activity. See the beach-inspect README for wiring instructions.
Two patterns of use — Assembly and Delivery
Manifest is position-neutral: one primitive, one API, one settlement contract. The same class is used in two distinct roles. The names are conventions, not subtypes — nothing in the code distinguishes them. They describe who opens the manifest and who subscribes.
Assembly Manifest — downstream of the LLM, aggregates multi-step work into one composed output.
- Opened by: a tool handler, turn coordinator, or assembly layer that dispatches parallel or sequential sub-work.
- Slots: sub-results of the work (e.g. per-destination search results, sub-fetches, product variants).
- Filled by: individual tool handlers, supplier adapters, or specialist actors.
- Subscribed by: the orchestrating turn (for re-injection as a composed tool-result via
ResultsCollector) or a direct event consumer. - Settles → injects the composition into the actor's turn, or emits an assembled event.
This is the pattern already used by ResultsCollector for multi-supplier aggregation. One turn can open several Assembly Manifests.
Delivery Manifest — upstream of the LLM, gates when a batched-channel outbound is sent.
- Opened by: the inbound edge of a batched channel (e.g. email, SMS) when a message arrives.
- Slots: typically
main_replyonly; extensible for composite outbound artifacts (e.g. an email with a required attachment slot). - Filled by: the orchestrating actor's final
respond()fillsmain_reply; additional slot-fillers fill any composite slots the outbound requires. - Subscribed by: the outbound edge for that channel.
- Settles → the outbound edge reads the payload, formats for the channel, and dispatches.
Streaming channels (SSE, WebSocket, voice) have no Delivery Manifest — their outbound edge subscribes to the part stream directly. Only batched channels need gating.
Both patterns can coexist within a single turn. An email inbound opens a Delivery Manifest with main_reply; the orchestrating actor fires a multi-step research tool which opens an Assembly Manifest for its sub-results; when the Assembly Manifest settles its composition injects back into the actor; the actor's final respond() fills main_reply; the Delivery Manifest settles and the email is sent. Two manifests, same primitive, neither aware of the other.
The primitive knows nothing about channels, turns, sessions, or LLMs. Channel-specificity lives at the edges that open and subscribe — never in the core.
Manifest lifecycle
manifest.status() // 'open' | 'complete' | 'cancelled' | 'timed-out'
manifest.fill(key, data) // returns 'accepted' | 'rejected' — 'rejected' if manifest is not open or key is unexpected
manifest.cancel() // silently stop — no callbacks fire
manifest.reset() // re-open after completion or timeout; keeps filled slots; restarts timer
manifest.filledSoFar() // Map<string, unknown> of slots filled so far
registry.get('search:rome') // Manifest | undefined
registry.unregister('search:rome') // removes regardless of statusPartTypeRegistry
Startup validation for respond() part types. Call assertRegistered after all registrations are complete, before the application starts accepting requests.
import { PartTypeRegistry } from '@cool-ai/beach-core';
// Register custom part types
PartTypeRegistry.register({ name: 'itinerary', description: 'A travel itinerary.' });
// Validate at startup — throws with the full list of missing names
PartTypeRegistry.assertRegistered(['domain-data', 'a2ui-surface', 'itinerary']);Core registers: ack, thinking, response, clarify, error. @cool-ai/beach-protocol registers envelope data parts on import.
Cooperative cancellation
The EventRouter propagates an AbortSignal through the handler chain so cooperative I/O (fetch, callAgent, callActor, etc.) observes turn cancellation without re-plumbing. CR-153.
The signal originates from SessionTurnManager.runTurn, which returns it as part of TurnResult. The orchestrator handler passes it forward when routing post-turn events:
import type { HandlerContext } from '@cool-ai/beach-core';
router.register('orchestrator', async (event, ctx: HandlerContext) => {
const result = await manager.runTurn({ /* ... */ });
// Forward the turn's signal so destination-fan-out, response-collector,
// and any other downstream handler observes it.
await ctx.routeEvent(
{ source: 'assistant', eventType: 'reply_ready', data: { /* ... */ } },
undefined, // cascade context (unchanged)
{ signal: result.signal }, // ← CR-153
);
});Downstream handlers read ctx.signal and pass it into their I/O calls:
router.register('research-runner', async (event, ctx: HandlerContext) => {
const response = await fetch('https://search.example.com/...', {
signal: ctx.signal, // ← cooperative cancellation
});
// …
});The signal inherits automatically across child events. A handler that emits a child via ctx.routeEvent({ ... }) carries the same signal forward unless an override is passed explicitly:
// Inherits parent's signal:
await ctx.routeEvent({ source: 'x', eventType: 'y', data: {} });
// Suppress inheritance for a specific child (e.g. fire-and-forget audit log):
await ctx.routeEvent(
{ source: 'audit', eventType: 'log', data: {} },
undefined,
{ signal: undefined },
);Filtering destinations and cascade events also receive the signal — every handler descending from a signal-bearing chain sees it on HandlerContext.signal.
Handlers without I/O remain unaffected. ctx.signal is undefined when the event chain did not originate from a turn (peer-adapter inbound before session open, cron-triggered events, etc.).
Other registries
import { TurnStateRegistry, TransportRegistry, AuthTypeRegistry } from '@cool-ai/beach-core';Same register / isRegistered / getAll pattern. Consumers register additional values at startup. Unknown values are rejected at the first use (parse errors, schema validation).
Not in this package
- Session lifecycle (
@cool-ai/beach-session). - LLM invocation (
@cool-ai/beach-llm). - Channel adapters (
@cool-ai/beach-transport).
Related
- https://cool-ai.org/docs/design-principles — Beach's architectural invariant (principles 1.1, 2.1, 3.1, 3.2, 3.5, 3.6).
- https://cool-ai.org/docs/diagrams/components.svg — component dependency graph.
- https://cool-ai.org/docs/contribution-policy — the invariant stated precisely; what "protocol-leaking" means.
