@crabbykit/agent-runtime
v0.0.1
Published
Platform-agnostic runtime for building conversational AI agents on Cloudflare Workers. Provides `AgentDO`, the `defineAgent` factory, session storage, capability lifecycle, scheduling, A2A, MCP, transport, and the channels primitives described below. See
Readme
@crabbykit/agent-runtime
Platform-agnostic runtime for building conversational AI agents on
Cloudflare Workers. Provides AgentDO, the defineAgent factory, session
storage, capability lifecycle, scheduling, A2A, MCP, transport, and the
channels primitives described below. See the top-level repo README.md
and CLAUDE.md for the full architecture overview.
Channels
Channels let agents receive inbound prompts from external messaging surfaces (Telegram, Discord, Slack, email) and route outbound replies back to the originating user. Three runtime primitives make this work:
Capability.afterTurn
An optional lifecycle hook on Capability fired once per
handleAgentPrompt / handlePrompt invocation at the agent_end
dispatch site, after entry persistence and WebSocket broadcast:
afterTurn?(ctx: AgentContext, sessionId: string, finalText: string): Promise<void>;finalTextis the concatenated text content of the final assistant message in the turn. Empty string if the turn terminated without any assistant text (abort, error-before-generation, etc.).- Errors thrown from
afterTurnare caught per-capability, logged with capability id and session id, and never prevent turn completion or subsequent capabilities' hooks from firing. afterTurnis a generic hook — not channel-specific. Any capability (debug logging, cost reporting, cache invalidation, analytics) can subscribe.
The hook runs inside runtimeContext.waitUntil(...) so async outbound
I/O extends past the current handler without blocking other event work.
ctx.rateLimit.consume
A runtime-owned, atomic sliding-window rate limiter shared by every capability:
interface RateLimiter {
consume(opts: {
key: string;
perMinute: number;
perHour?: number;
}): Promise<{ allowed: boolean; reason?: string }>;
}- Exposed on every
AgentContextand everyCapabilityHttpContext. - One shared instance per
AgentRuntime, backed by the DO's SQL store. - Atomic under DO single-threaded execution: the read-modify-write
sequence inside
consumecontains noawaitpoints, so two concurrent callers can never both pass a bucket at its limit. - Capabilities MUST call this rather than implementing their own counters. Multiple implementations means multiple bugs.
Session.sender + findBySourceAndSender
A single nullable column on Session for channel routing by remote
identity:
interface Session {
id: string;
name: string;
source: string;
sender: string | null; // NEW — channel-routed sessions only
leafId: string | null;
createdAt: string;
updatedAt: string;
}
sessionStore.findBySourceAndSender(source: string, sender: string): Session | null;- Backed by the partial index
idx_sessions_source_sender ON sessions(source, sender) WHERE sender IS NOT NULL. - NULL for WebSocket-originated sessions — they are never returned by
findBySourceAndSender. - Migration is idempotent and SQLite-valid:
PRAGMA table_infochecks whether the column exists before issuingALTER TABLE ADD COLUMN(theIF NOT EXISTSform is invalid SQLite syntax and was one of the v1 review findings). create(opts)accepts an optionalsender?: stringthat is persisted on the new row.
defineChannel
A policy-enforcing factory that wraps a ChannelDefinition into a
Capability whose HTTP handlers and afterTurn hook implement the full
inbound → inference → outbound pipeline. The contract is designed to
make unsafe channels structurally impossible — webhook verification,
per-sender rate limiting, per-account (Sybil) rate limiting, and
sendReply are all required properties at the type level.
interface ChannelDefinition<TAccount extends { id: string }, TInbound> {
id: string;
accounts(env: unknown): TAccount[] | Promise<TAccount[]>;
webhookPath(account: TAccount): string;
// MANDATORY — the type system rejects omission of any of these:
verifyWebhook(req: Request, account: TAccount): boolean | Promise<boolean>;
parseWebhook(req: Request, account: TAccount): Promise<ParsedInbound<TInbound> | null>;
rateLimit: {
perSender: RateLimitConfig; // required
perAccount: RateLimitConfig; // required — Sybil guard
};
sendReply(account: TAccount, inbound: TInbound, text: string): Promise<void>;
// Optional lifecycle hooks:
onAccountAdded?(account, ctx): Promise<void>;
onAccountRemoved?(account, ctx): Promise<void>;
}
function defineChannel<TAccount, TInbound>(
def: ChannelDefinition<TAccount, TInbound>,
): Capability;The helper's inbound pipeline (strict order):
verifyWebhook— return HTTP 403 on failure.parseWebhook— HTTP 200 without processing onnull.- Per-sender rate limit — HTTP 200 on denial (NOT 429; webhook providers would retry-storm).
- Per-account global rate limit — HTTP 200 on denial (Sybil guard).
- Session routing via
findBySourceAndSender+ create-if-missing. - Stash
{ accountId, inbound }underchannel-inbound:${sessionId}. sendPromptunderwaitUntil, with.catchthat callssendReply(account, parsed.inbound, "Sorry — something went wrong.")as a best-effort error reply.- Respond HTTP 200 immediately.
The helper's afterTurn reads the stash, looks up the matching account
by id, calls sendReply, and catches/logs any failure. The stash is
not deleted after dispatch — chat-like flows benefit from
last-known-target semantics (e.g., cron-triggered reminders reach the
user on their most recent channel).
Authoring a channel safely
- Start with
@crabbykit/channel-telegramas a reference. It builds a complete channel in ~80 lines of declarative code on top ofdefineChannel. Read its implementation before writing your own — you will absorb the security invariants from the structure rather than by documentation discipline. - Never implement your own rate-limit counters. Use
ctx.rateLimit.consume. The runtime's limiter is tested once and atomic; yours will have bugs. - Constant-time secret comparison is mandatory in
verifyWebhook. Naive string equality leaks a timing oracle;channel-telegram'sconstantTimeEqualis a reference implementation. - Strip credentials from error messages before rethrowing — tokens in logs are a full account takeover.
- Rate-limit defaults should be conservative. LLM inference costs are the highest-risk failure mode; tune buckets upward only after observing real traffic.
- Treat user content as untrusted. It flows into session entries and compaction summaries without sanitization. Design your system prompt to handle prompt-injection defensively until the CLAW-wide sanitization primitive lands.
See packages/channels/channel-telegram/README.md for a concrete end-to-end
example including bot setup, config shape, and a Cloudflare Quick Tunnel
smoke test.
Dynamic capability state
The SDK makes a sharp distinction between two tiers of "agent configuration":
- Code tier — the set of capability types wired in at
defineAgenttime. This is genuinely compile-time: you can't invoke a function that isn't bundled into the worker. - State tier — per-DO runtime state (accounts, credentials, enabled
flags, tunable parameters). This lives in
ConfigStoreandCapabilityStorage, and is editable without a redeploy.
Anything that a human operator or the agent itself needs to mutate
should live in the state tier, not baked into a defineAgent
closure. The Telegram channel is the reference implementation of this
pattern — its bot accounts are stored per-DO, exposed to the agent via
configNamespaces, to the UI via capability_action / onAction, and
to clients via broadcastState / capability_state. A user can add
a Telegram bot by pasting a token into the UI; no env vars, no
redeploy.
To adopt the pattern in a new capability:
- Store state in
CapabilityStorage(scoped per-capability automatically). Use a small storage wrapper class for type safety — seeTelegramAccountStoreinpackages/channel-telegramfor a minimal example. - Expose
configNamespacesfor agent-driven CRUD. Use a pattern-matched namespace (/^my-thing:(.+)$/) for per-item read / update / delete, plus an exact-match namespace for listing. Theprompt-schedulercapability is the reference. - Expose
onActionfor UI-driven CRUD. Route the mutation helpers through a single internal function so agent and UI paths share one code path and stay consistent. - Broadcast state on every mutation via
ctx.broadcastState("sync", redactedView, "global"). Also broadcast fromonConnectso newly connecting UIs hydrate immediately. - Never echo credentials back. Define a redacted view type and
ensure both the namespace
getand the state broadcast convert to it. Tokens, secrets, and keys stay server-side.
The runtime currently does NOT support a universal enabled: boolean
flag on capabilities — that's a documented follow-up. For now a
capability that wants to be toggleable should encode an enabled
field in its own config schema and check it inside tools() /
promptSections() / hooks.
