@cool-ai/beach-starter
v1.2.1
Published
Canonical pipeline handlers and routing template — the working shape for a Beach application.
Downloads
573
Readme
@cool-ai/beach-starter
Canonical pipeline handlers and routing template for Beach applications. Owns the reference wiring that turns Beach's router + session primitives into a working application — channel-blind interior, destination fan-out at the dispatch layer.
Home: cool-ai.org · Documentation: cool-ai.org/docs
Three consumer projects rediscovered the same canonical-handlers pattern independently. This package extracts it so new consumers inherit the working shape rather than rediscovering it.
Install
npm install @cool-ai/beach-starterPeer dependencies: @cool-ai/beach-session, @cool-ai/beach-llm.
A configured starting point — including a sample concierge actor, an example-lookup tool, and an sse streaming channel pre-wired for this pipeline — comes from @cool-ai/beach-config:
npm install @cool-ai/beach-config
npx beach-config init --with-starterThe flag lays down config/actors/concierge.yaml, config/tools/example-lookup.yaml, and config/channels/sse.yaml alongside the regular config scaffold. Replace the placeholders with the application's real concierge and tools.
The canonical pipeline
channel inbound
→ channel:message_received (channel-blind first-layer event;
inbound adapter has resolved threadId,
personId, destinations on the session)
→ message-matcher (resolves sessionId from threadId)
→ channel:message_matched
→ channel-inbound (creates turnId; normalises parts → Message)
→ session:turn_requested
→ YOUR_ORCHESTRATOR (runTurn; emits assistant:reply_ready
without channelId — channel-blind)
→ assistant:reply_ready
→ destination-fan-out (reads session.destinations; emits one
delivery event per destination on its
dedicated event type)
→ delivery:ui_streaming OR delivery:formatter
→ chat-collector OR response-collector
→ SSE pub/sub OR outbound missive draftMulti-channel replies (a client emailed and texted; reply via both) work without channel-branching code anywhere — the inbound adapter populates two destinations on the session and the fan-out emits two delivery events.
Quick start — channel-blind application
import { EventRouter } from '@cool-ai/beach-core';
import { SessionTurnManager } from '@cool-ai/beach-session';
import { InMemoryMissiveStore } from '@cool-ai/beach-missives/stores';
import { registerCanonicalHandlers } from '@cool-ai/beach-starter';
import type { TurnRequestedData } from '@cool-ai/beach-starter';
import routingConfig from '@cool-ai/beach-starter/templates/routing.json' with { type: 'json' };
const router = new EventRouter();
const manager = new SessionTurnManager({ router });
const store = new InMemoryMissiveStore();
// 1. Register your orchestrator. The orchestrator never sees channelId —
// the dispatch layer reads session.destinations to route replies.
router.register('my-orchestrator', async (event, context) => {
const { sessionId, turnId, inboundMessage } = event.data as TurnRequestedData;
const respond = await manager.runTurn({
sessionId, turnId, slotKey: 'main',
actorId: 'concierge', actorConfig, provider, registry: tools,
inboundMessage,
});
await context.routeEvent({
source: 'assistant', eventType: 'reply_ready',
data: { sessionId, turnId, parts: respond.parts, turnState: respond.turnState },
});
});
// 2. Register the canonical handlers. The destination set on each session
// decides where replies go; the registration is destination-agnostic.
registerCanonicalHandlers(router, {
orchestratorHandler: 'my-orchestrator',
resolveSession: (d) => d.threadId,
sessionManager: manager,
chatPublish: async (sessionId, parts) => {
await redis.publish(`reply:${sessionId}`, JSON.stringify(parts));
},
store,
});
// 3. Load the routing config (replace YOUR_ORCHESTRATOR with 'my-orchestrator' first).
router.loadRoutingConfig(routingConfig);
// 4. The inbound adapter opens each session with its destination set.
// A chat session opens with a ui-streaming destination:
manager.openSession({
id: sessionId,
actors: ['concierge'],
destinations: [{ kind: 'ui-streaming' }, { kind: 'audit' }],
});
// An email session opens with a formatter destination:
// destinations: [
// { kind: 'formatter:email-html', formatterChannel: 'email-html' },
// { kind: 'audit' },
// ]
//
// A multi-channel session populates both — fan-out emits both delivery
// events without any channel-branching code.Options
interface CanonicalHandlerOptions {
/** Consumer's orchestrator handler name (must be registered before this call). */
orchestratorHandler: string;
/** Resolves sessionId from a channel-blind first-layer event (the inbound
* adapter has already populated threadId on it). Typically returns d.threadId. */
resolveSession: (data: InboundEventData) => string | Promise<string>;
/** Session manager — required so the destination fan-out can read each
* session's destination set. */
sessionManager: SessionTurnManager;
/** Publishes reply parts for streaming. Wired to the chat-collector
* whenever the inbound adapter populates a `ui-streaming` destination. */
chatPublish?: (sessionId: string, parts: MissivePart[]) => Promise<void>;
/** Store for outbound missive drafts. Wired to the response-collector
* whenever the inbound adapter populates a `formatter:*` destination. */
store?: MissiveStore;
}Individual handler factories
Each handler is exported individually if you need partial adoption:
| Export | Consumes | Emits |
|--------|----------|-------|
| messageMatcherHandler | channel:message_received | channel:message_matched |
| channelInboundHandler | channel:message_matched | session:turn_requested |
| destinationFanOutHandler | assistant:reply_ready | one delivery:<kind> event per destination on the session |
| chatCollectorHandler | delivery:ui_streaming | publishes via chatPublish |
| responseCollectorHandler | delivery:formatter | writes to MissiveStore |
The canonical handler names are exported as HANDLER_NAMES, and the canonical event tuples as EVENTS.
Per-tool-call research fan-out — what used to ship here as filterAndDistributeHandler — moved to the FilterAndDistribute primitive in @cool-ai/beach-core (CR-156). Wire it via callActor's filterAndDistribute option.
Session resolution strategies
The inbound adapter populates threadId on the first-layer event before routing it; resolveSession typically returns it directly. Per-channel computation moves into the channel adapters (CR-169):
| Channel adapter | Computes threadId from |
|---|---|
| @cool-ai/beach-channel-email | RFC 5322 In-Reply-To / References chain |
| @cool-ai/beach-channel-whatsapp | quotedMessageId |
| SSE / chat | session-cookie / session id from the HTTP layer |
Routing template
@cool-ai/beach-starter/templates/routing.json ships as the reference routing config. Import it with a JSON module assertion and pass it to router.loadRoutingConfig(). Replace YOUR_ORCHESTRATOR with your registered handler name before loading.
Architectural discipline
This package is the concrete answer to design-principle 2.1 (the router is the boundary) and the post-CR-169 channel-blind discipline. Consumers that bypass the canonical pipeline — calling runTurn() directly from an HTTP route, for instance — lose every architectural guarantee Beach exists to provide: audit, observability, approval interception, replay, distributed tracing. Consumers that read channelId inside the orchestrator's tool loop have the type system reject the read; channel-aware logic belongs at the inbound adapter or the outbound Composer<E> (see @cool-ai/beach-format).
The canonical handlers are the working shape. Import them, fork them, or replace them with a compatible implementation — but do not skip them.
Related
@cool-ai/beach-core— the event router, manifest registry, andFilterAndDistributeprimitive.@cool-ai/beach-session— turn lifecycle and session destination set.@cool-ai/beach-format—Composer<E>and the seven canonical channel formats.- https://cool-ai.org/docs/design-principles — Part 1 (the architectural invariant), principle 2.1 (router is the boundary), principle 2.7 (channel-blind interior).
- https://cool-ai.org/docs/contribution-policy — what "protocol-leaking" means; why the canonical pipeline is not optional.
