la-machina-engine
v0.21.0
Published
Headless, multi-provider LLM agent engine for workflow automation. Pause/resume, MCP, skills, R2/Workers compatible.
Maintainers
Readme
la-machina-engine
Headless, multi-provider LLM agent engine for workflow automation.
A library, not a CLI. You import it, give it a task, and it runs a bounded agent loop — streaming an LLM, dispatching tools, spawning subagents, persisting a durable transcript — until the task is done, paused, or fails. Memory learns across runs. Runs pause mid-execution and resume later with full state. Storage is pluggable — local filesystem in dev, Cloudflare R2 in production, same code.
Built for embedding inside a workflow orchestrator (e.g. an n8n-style DAG runner where each node needs an LLM brain). If you want a terminal chatbot, use Claude Code. If you want the brain that runs inside each node of a production workflow, use this.
npm install la-machina-engineStatus
v0.3.0 — published on npm; production-ready core, evolving feature surface.
- 1553 unit + integration tests pass (8 pre-existing Bun-timer failures unrelated)
- Zero top-level
node:imports — runs on Node.js AND Cloudflare Workers - 14 live workflow tests (W1–W14) verified against OpenRouter, real R2, real MCP servers
- Pause/resume + async runs + webhooks + state.json + R2 binding storage adapter
- MCP support: stdio, http (Streamable + Workers-safe binding transport), sse — with auth refresh + sampling
- Skills: disk-backed default + per-run override (inline body or HTTPS url)
- Subagent gate propagation (opt-in) — parent can pause when a child's tool is gated
Table of Contents
- Design Principles
- Install
- Quick Start
- CLI Test Harness
- Core Concepts
- Async API (start / waitFor / webhooks)
- Multi-Provider Support
- Agent Hierarchy
- Configuration Reference
- What's Implemented
- What's Deferred
- Architecture
- Development
- License
Design Principles
- Zero-config works.
initEngine()with no arguments runs, givenANTHROPIC_API_KEYin the environment. - Every knob has a default. No config option is required; all are overridable.
- Headless. No terminal UI, no React, no Ink. Plain Node/Workers library.
- Cloud-native. Storage is pluggable — local filesystem in dev, Cloudflare R2 in production.
- Pausable. Runs can suspend mid-turn via a gate callback and resume later with full state.
- Workers-compatible. Zero top-level
node:imports. All platform-specific code is lazy-loaded. - Multi-provider. Anthropic native + Vercel AI SDK (OpenAI, Google, OpenRouter, 75+ providers).
- Error-isolated. A misbehaving tool cannot crash the agent loop. The loop never throws.
- TypeScript-first. Strict mode, Zod-inferred types, discriminated unions for all results.
Install
npm install la-machina-engineRequires Node 20+. Works in Bun, Cloudflare Workers (with R2 storage).
The package is published as a single bundled module (~330 KB ESM, ~340 KB CJS) with full TypeScript types plus two Node-only subpaths (/node-tools, /node-mcp). Built with tsup, CI publishes with Sigstore provenance.
0.7.x → 0.8.0 migration
0.8.0 makes the engine fully runtime-agnostic. The default bundle no longer auto-registers Bash or auto-stubs MCP stdio servers based on process.versions.node probing — the caller now decides which Tools the engine sees per environment. See plans/031-runtime-agnostic-engine.md for the full design.
Two changes consumers may need:
// Workers callers — Bash via capabilityStub (model sees it; engine
// pauses with handoff_to_runner when called):
import { initEngine, capabilityStub } from 'la-machina-engine'
initEngine({
tools: { custom: [capabilityStub({ name: 'Bash', description: '...' })] },
})
// Node callers — real Bash via the new node-tools subpath:
import { initEngine } from 'la-machina-engine'
import { createBashTool } from 'la-machina-engine/node-tools'
initEngine({ tools: { custom: [createBashTool()] } })
// Stdio MCP on Node — pass a transport factory:
import { createStdioTransportFactory } from 'la-machina-engine/node-mcp'
initEngine({
mcp: {
servers: { local: { type: 'stdio', command: 'npx', args: ['mcp-server-x'], ... } },
stdioTransport: createStdioTransportFactory(),
},
})Without mcp.stdioTransport, configuring a stdio MCP server fails at connect with a clean error pointing at the subpath (see plans/031-runtime-agnostic-engine.md).
Quick Start
import { initEngine } from 'la-machina-engine'
const engine = initEngine() // uses ANTHROPIC_API_KEY from env
const response = await engine.run({
nodeId: 'node_xyz',
task: 'Summarize the contents of data.csv and propose 3 insights.',
// runId optional — auto-generated as run_<uuid> if omitted
})
if (response.status === 'done') {
console.log(response.data) // text string (or JSON object if outputFormat: 'json')
console.log(`runId: ${response.runId}`)
console.log(`${response.meta.turns} turns, ${response.meta.tokensUsed.input + response.meta.tokensUsed.output} tokens`)
}With OpenRouter (multi-provider)
const engine = new Engine(
initEngine({
model: {
provider: 'proxy',
modelId: 'google/gemini-2.5-pro',
apiKey: 'sk-or-...',
baseURL: 'https://openrouter.ai/api',
},
}).config,
{
fetch: async (input, init = {}) => {
const headers = new Headers(init.headers ?? {})
headers.delete('x-api-key')
headers.set('Authorization', 'Bearer ' + apiKey)
return fetch(input, { ...init, headers })
},
},
)With R2 cloud storage
const engine = initEngine({
storage: {
provider: 'r2',
rootPath: 'my-project',
workspaceId: 'production',
r2: {
bucket: 'my-bucket',
region: 'auto',
accessKeyId: '...',
secretAccessKey: '...',
endpoint: 'https://xxx.r2.cloudflarestorage.com',
},
},
})CLI Test Harness
A minimal interactive CLI for testing the engine directly:
node cli.mjs # interactive REPL
node cli.mjs "your task here" # one-shot
node cli.mjs --model anthropic/claude-sonnet-4 "task"Multi-turn conversation maintained across prompts. Commands: /clear (reset), /turns (info).
Env vars: OPENROUTER_API_KEY (required), ENGINE_MODEL, ENGINE_STORAGE, ENGINE_MAX_TURNS.
Response Format
engine.run() and engine.resume() return EngineResponse directly — one flat shape for every status.
const response = await engine.run({
nodeId: 'n1',
task: '...',
// runId optional — auto-generated if omitted
})
// response.runId, response.status, response.data, response.meta, response.errors, response.timestampSingle shape, every status. Client always reads response.data and tracks response.runId.
Done — Text Mode (default)
{
"runId": "run_abc",
"status": "done",
"data": "The analysis shows revenue grew 15% year-over-year with strongest performance in Q4.",
"meta": {
"nodeId": "analyze",
"turns": 5,
"tokensUsed": { "input": 12500, "output": 3200 },
"durationMs": 8500,
"output": "The analysis shows revenue grew 15% year-over-year with strongest performance in Q4.",
"transcript": { "path": "projects/run_abc/nodes/analyze", "lastShardIndex": 0 }
},
"errors": [],
"timestamp": 1712966400000
}Done — JSON Mode with Schema
const result = await engine.run({
task: 'Fetch example.com and extract pricing tiers',
outputFormat: 'json',
outputSchema: z.object({
tiers: z.array(z.object({ name: z.string(), price: z.number() })),
}),
}){
"runId": "run_abc",
"status": "done",
"data": {
"tiers": [
{ "name": "Starter", "price": 29 },
{ "name": "Pro", "price": 99 },
{ "name": "Enterprise", "price": 299 }
]
},
"meta": {
"nodeId": "extract",
"turns": 3,
"tokensUsed": { "input": 8000, "output": 1500 },
"durationMs": 12000,
"output": "{\"tiers\":[{\"name\":\"Starter\",\"price\":29},{\"name\":\"Pro\",\"price\":99},{\"name\":\"Enterprise\",\"price\":299}]}",
"transcript": { "path": "projects/run_abc/nodes/extract", "lastShardIndex": 0 }
},
"errors": [],
"timestamp": 1712966400000
}Failed — JSON Mode, Parse / Schema Error
When outputFormat: 'json' is requested but the final model output is
empty, not valid JSON, or fails the supplied outputSchema, the run
terminates as status: 'failed' with a typed code (engine ≥ 0.21.0). The
old permissive fallback (status: 'done' with data falling back to raw
text) is removed — a strict-JSON request that can't satisfy the contract
fails at the engine boundary so callers never mistake an empty/invalid
output for a successful structured result.
Raw parse failure:
{
"runId": "run_abc",
"status": "failed",
"data": null,
"meta": {
"nodeId": "extract",
"turns": 0,
"tokensUsed": { "input": 0, "output": 0 },
"durationMs": 6000,
"transcript": { "path": "projects/run_abc/nodes/extract", "lastShardIndex": 0 }
},
"errors": [
{
"code": "ERR_JSON_OUTPUT_PARSE",
"message": "Output is not valid JSON: …"
}
],
"timestamp": 1712966400000
}Schema mismatch (parse succeeded, validation failed):
{
"runId": "run_abc",
"status": "failed",
"data": null,
"errors": [
{
"code": "ERR_JSON_OUTPUT_SCHEMA",
"message": "Output failed schema validation: …"
}
]
}In both cases the engine still appends a json_parse_failure inspect
event (unchanged from Plan 026) — the failure is now visible BOTH in
inspect AND on the public run result.
Paused — Human Approval Needed
data is the content the human reviews — not internal details like paths. Path info is in meta.pendingToolCall.
{
"runId": "run_abc",
"status": "paused",
"data": "# Q4 Revenue Summary\n\nTotal revenue: $705,000...",
"meta": {
"nodeId": "write-report",
"turns": 3,
"tokensUsed": { "input": 8500, "output": 2100 },
"durationMs": 9200,
"snapshot": {
"version": 1,
"status": "paused",
"runId": "run_abc",
"nodeId": "write-report",
"pausedAt": "2026-04-13T14:30:00.000Z",
"pauseReason": "gate_required",
"messageCount": 6,
"lastShardIndex": 3,
"lastMessageUuid": "a1b2c3d4-...",
"pendingToolCall": {
"toolName": "Write",
"toolUseId": "toolu_abc123",
"input": { "path": "reports/q4-summary.md", "content": "# Q4 Revenue Summary..." },
"calledAt": "2026-04-13T14:30:00.000Z"
},
"tokensUsedSoFar": { "input": 8500, "output": 2100 },
"turnsUsed": 3
},
"pendingToolCall": {
"toolName": "Write",
"toolUseId": "toolu_abc123",
"input": { "path": "reports/q4-summary.md", "content": "..." },
"calledAt": "2026-04-13T14:30:00.000Z"
},
"pauseReason": "gate_required",
"transcript": { "path": "projects/run_abc/nodes/write-report", "lastShardIndex": 3 }
},
"errors": [],
"timestamp": 1712966400000
}data = the tool input the agent wanted to execute (what the human reviews).
meta.snapshot = pass to engine.resume() to continue.
meta.pendingToolCall = shortcut to see what tool was blocked.
Paused — Topic Selection (Custom Tool)
When the agent tries to Write a JSON file with choices, data is the JSON content string:
{
"runId": "blog-001",
"status": "paused",
"data": "{\"topics\":[{\"title\":\"AI Trade Wars\",\"angle\":\"startup impact\"},{\"title\":\"EU Migration Reform\",\"angle\":\"policy analysis\"},{\"title\":\"Climate Summit\",\"angle\":\"developing nations\"}]}",
"meta": {
"nodeId": "research",
"turns": 2,
"tokensUsed": { "input": 5000, "output": 1200 },
"durationMs": 7500,
"snapshot": { "..." },
"pendingToolCall": {
"toolName": "Write",
"toolUseId": "toolu_xyz",
"input": { "path": "topics.json", "content": "{\"topics\":[...]}" },
"calledAt": "2026-04-13T10:00:00.000Z"
},
"pauseReason": "gate_required"
},
"errors": [],
"timestamp": 1712966400000
}Resume with user's choice:
await engine.resume({
snapshot: response.meta.snapshot,
gateAnswer: 'Approved. User selected Topic 1: AI Trade Wars.',
})Failed — Max Turns Exceeded
{
"runId": "run_abc",
"status": "failed",
"data": null,
"meta": {
"nodeId": "complex-task",
"turns": 0,
"tokensUsed": { "input": 0, "output": 0 },
"durationMs": 45000,
"transcript": { "path": "projects/run_abc/nodes/complex-task", "lastShardIndex": 8 }
},
"errors": [
{
"code": "ERR_MAX_TURNS",
"message": "Run exceeded max turns"
}
],
"timestamp": 1712966400000
}Failed — API Error After Retries
{
"runId": "run_abc",
"status": "failed",
"data": null,
"meta": {
"nodeId": "task",
"turns": 0,
"tokensUsed": { "input": 0, "output": 0 },
"durationMs": 12000,
"transcript": { "path": "projects/run_abc/nodes/task", "lastShardIndex": 0 }
},
"errors": [
{
"code": "ERR_RATE_LIMIT",
"message": "429 Too Many Requests"
}
],
"timestamp": 1712966400000
}Failed — Max Tokens Recovery Exhausted
{
"runId": "run_abc",
"status": "failed",
"data": null,
"meta": {
"nodeId": "long-task",
"turns": 0,
"tokensUsed": { "input": 0, "output": 0 },
"durationMs": 30000,
"transcript": { "path": "projects/run_abc/nodes/long-task", "lastShardIndex": 5 }
},
"errors": [
{
"code": "ERR_MAX_TOKENS",
"message": "max_tokens recovery exhausted after 3 attempts"
}
],
"timestamp": 1712966400000
}Error Codes Reference
| Code | When | Retryable? |
|------|------|-----------|
| ERR_MAX_TURNS | Run exceeded execution.maxTurns | No — increase maxTurns |
| ERR_MAX_TOKENS | max_tokens recovery failed after 3 attempts | No — reduce task scope |
| ERR_RUN_TIMEOUT | Run exceeded execution.runTimeoutMs | No — increase timeout |
| ERR_RATE_LIMIT | 429 after retry backoff exhausted | Yes — wait and retry |
| ERR_API | 500/502/503 after retries | Yes — transient |
| ERR_API_OVERLOADED | 529 five consecutive times | Yes — wait longer |
| ERR_AUTH | 401/403 invalid API key | No — fix credentials |
| ERR_CONFIG | Invalid configuration | No — fix config |
| ERR_STREAM_PARSE | Malformed API response | No — provider issue |
| ERR_STREAM_INCOMPLETE | Stream ended without message_stop | Yes — transient |
| ERR_UNEXPECTED_STOP | Unknown stop reason from API | No — investigate |
| ERR_JSON_OUTPUT_PARSE | outputFormat: 'json' returned empty / non-JSON output (engine ≥ 0.21.0) | No — adjust task / prompt |
| ERR_JSON_OUTPUT_SCHEMA | JSON output failed outputSchema validation (engine ≥ 0.21.0) | No — adjust schema or task |
| SCHEMA_VALIDATION_FAILED | (Legacy inspect-only code; pre-0.21.0) JSON output doesn't match outputSchema | No — adjust schema or task |
| JSON_PARSE_FAILED | (Legacy inspect-only code; pre-0.21.0) Model didn't return valid JSON | No — adjust task |
Workflow Runner Integration
// Step 1: Run — runId optional, auto-generated if omitted
const response = await engine.run({
nodeId: 'extract',
task: 'Fetch pricing from example.com',
outputFormat: 'json',
outputSchema: pricingSchema,
})
switch (response.status) {
case 'done':
// data is already typed/validated per outputSchema (JSON mode)
// or a plain text string (text mode)
passToNextNode(response.data)
break
case 'paused':
// Client only needs to remember the runId
saveToApprovalQueue({
runId: response.runId,
pendingAction: response.meta.pendingToolCall?.toolName,
data: response.data, // what to show the human
})
notifyHuman('Approval needed')
break
case 'failed':
logErrors(response.errors) // [{ code, message }]
retryOrEscalate(response)
break
}
// Step 2: Later, resume — just pass the runId
const resumed = await engine.resume({
runId: response.runId,
gateAnswer: 'Approved by manager',
})Core Concepts
The Run Lifecycle
engine.run({ runId, nodeId, task })
│
├─ Build: storage, client, tools, memory, prompt, transcript
├─ preRun hook
├─ agentLoop:
│ while (!done) {
│ normalize messages (strip blocks, ensure alternation, tool pairing)
│ API streamMessage (with reactive recovery on max_tokens/413)
│ collect text + thinking + tool_use blocks
│ dispatch tools via StreamingToolExecutor (parallel safe, serial unsafe)
│ truncate results > 100K chars
│ postTurn + stopHooks (can prevent continuation)
│ }
├─ postRun hook (always fires)
└─ return: done | paused | failedStorage Adapter
Three backends, same interface and the same relative layout on all of them:
| Adapter | Backend | Use |
|---------|---------|-----|
| LocalStorageAdapter | node:fs/promises (lazy import) | Dev, tests |
| R2StorageAdapter | Cloudflare R2 via S3 protocol | Node / anywhere with S3 creds |
| R2BindingStorageAdapter | Cloudflare R2 native binding (env.BUCKET) | Cloudflare Workers (provider: 'r2-binding') |
Path layout (identical across all three backends):
{rootPath}/workspaces/{workspaceId}/.claude/ ← tenant root
├── memory/ ← tenant-shared, survives across runs
├── skills/ ← (if config.skills.autoload)
└── projects/{runId}/nodes/{nodeId}/
├── state.json, snapshot.json, 000000.jsonl, meta.json
└── subagents/{agentId}/… ← recursive, same shapeworkspaces/ is a namespace guard (keeps engine data separate from
anything else in a shared bucket/filesystem); .claude/ marks
engine-owned content. Both cost one directory level each.
The workspace IS the tenant boundary. One workspaceId per
tenant; nothing is shared across workspaces. The previous
global storage scope was removed in v0.5.0 — see migration note
below.
Migration from pre-0.5.0: if you had data at
{rootPath}/.claude/(the old global scope), move it under your workspace root:mv {rootPath}/.claude {rootPath}/workspaces/{workspaceId}/.claude.config.memory.scope: 'global'still parses but emits a deprecation warning and is rewritten to'workspace'; it'll be rejected outright in v1.0.0.
Smart Memory
Per-workspace learning across runs:
- Profile — agent identity
- Rules — behavioral constraints (always/never/when)
- Lessons — facts learned from prior runs (token-budgeted)
- Episodes — session-level observations (JSONL per session)
Modes: off (stateless), read-only (recall only), read-write (self-improving).
Skills
Markdown docs the model can pull on demand via the SkillPage tool. Two resolution modes, both drive the same runtime contract:
1. Disk-backed (default) — one directory per skill:
{storage-root}/workspaces/{ws}/.claude/skills/
├── memo-style/
│ ├── SKILL.md ← required — name + description + body
│ └── pages/
│ └── examples.md ← optional multi-page skill
└── brand-voice/
└── SKILL.mdEnable via config.skills.autoload: true. The engine lists directory entries at run start, emits name + description into the system prompt, and lazy-loads bodies when the model calls SkillPage.
2. Per-run override — bind a specific skill bundle to one engine.run() / engine.resumeAsync() call without touching storage:
await engine.run({
runId, nodeId, task,
skills: [
{
name: 'memo-style',
description: 'Internal memo format.',
body: '# memo-style\n\n## TL;DR\n...', // inline = zero-latency
},
{
name: 'brand-voice',
description: 'Company tone and voice.',
url: 'https://cdn.acme.com/skills/brand-voice/v3/SKILL.md', // lazy fetch, cached per run
headers: { Authorization: 'Bearer ...' },
pages: {
examples: { url: 'https://cdn.acme.com/skills/brand-voice/v3/examples.md' },
},
},
],
})Override replaces disk discovery for that run — the model sees exactly the skills you list, nothing from config.skills.path. Useful in per-node workflow engines where each node needs a different bundle.
Security: set config.skills.allowedHosts (e.g. ['cdn.acme.com']) to restrict URL fetches. Undefined = open (dev default). Requests outside the allowlist throw before hitting the network.
Caching: within one run, each URL is fetched at most once — subsequent SkillPage calls for the same skill/page are served from memory. Cache is per InlineSkillSource instance, so a fresh engine.run() always re-reads.
Tools (22 built-in)
| Tool | Safe? | Description |
|------|-------|-------------|
| Bash | No | Shell execution via /bin/sh -c (Node.js only) |
| Read | Yes | File read with line numbers, PDF, images |
| Write | No | Atomic file write |
| Edit | No | String replacement with uniqueness check |
| Glob | Yes | File pattern matching |
| Grep | Yes | Regex search (ripgrep when available, JS fallback) |
| WebFetch | Yes | HTTP fetch with HTML-to-text |
| WebSearch | Yes | DuckDuckGo web search |
| Agent | No | Spawn subagent (depth-bounded) |
| SendMessage | No | Inter-agent communication |
| Sleep | Yes | Delay for rate limiting |
| ToolSearch | Yes | Search registered tools |
| Memorize | No | Write to smart memory |
| Recall | Yes | Read from smart memory |
| TaskCreate/Get/List/Update | Mixed | Task tracking |
| NotebookEdit | No | Jupyter notebook editing |
| ListMcpResources | Yes | MCP resource browsing |
| ReadMcpResource | Yes | MCP resource reading |
| SkillPage | Yes | Lazy skill page loading |
"Safe" = isConcurrencySafe — safe tools run in parallel via the StreamingToolExecutor.
Hooks (8 slots)
| Hook | When | Can block? |
|------|------|-----------|
| preRun | Before agent loop starts | No |
| postRun | After run completes (always fires) | No |
| preTurn | Before each API call | No |
| postTurn | After each tool dispatch | No |
| preToolCall | Before each tool execution | No |
| postToolCall | After each tool execution | No |
| gateBeforeTool | Before tool dispatch — can pause the run | Yes (pause) |
| stopHooks | After each turn — can stop the run | Yes (stop) |
Async API
engine.run() and engine.resume() are synchronous — they block until the run reaches a terminal state (done | paused | failed). For long-running work (multi-minute tasks, HITL workflows with human wait time, Cloudflare Workers / Durable Object hosts), the engine ships a parallel async API.
The async API is additive: sync calls still work exactly as before. Async just adds dispatch, polling, webhooks, and durable state.
Methods
| Method | Purpose |
|---|---|
| engine.start(opts) | Schedule a run in the background. Returns { runId, nodeId, status } immediately. |
| engine.resumeAsync(opts) | Async version of resume(). Same options + optional webhook. |
| engine.getStatus(runId, nodeId?) | Read current state. Returns EngineResponse (provisional while running, final when terminal). |
| engine.waitFor(runId, opts?) | Poll until terminal. Returns the final EngineResponse. Respects timeoutMs. |
| engine.cancelRun(runId, nodeId?) | Abort a running run. Marks state as cancelled. |
| engine.retryWebhook(runId, deliveryId) | Re-fire a past webhook delivery (useful after downstream downtime). |
| engine.recoverOrphanedRuns({ staleThresholdMs }) | Scan state.json files on startup and mark stale-heartbeat runs as failed. |
state.json — durable per-run state
Every async run writes a state.json file alongside the transcript:
projects/{runId}/nodes/{nodeId}/
├── 000000.jsonl # transcript shards
├── meta.json # transcript metadata
├── snapshot.json # pause snapshot (if paused)
└── state.json # async run state + full responseShape:
{
version: 1,
runId: 'run_abc',
nodeId: 'node_1',
status: 'queued' | 'running' | 'paused' | 'done' | 'failed' | 'cancelled' | 'not_found',
startedAt: 1700000000000,
lastHeartbeat: 1700000012345,
progress: {
turns: number, // advances as the agent loop runs
tokensUsed: { input, output }, // cumulative across turns
currentActivity: // what the loop is doing RIGHT NOW
'idle' | 'streaming' | 'tool_dispatch' | 'compacting',
lastTool?: string, // set when currentActivity === 'tool_dispatch'
},
response: EngineResponse | null, // populated on terminal; same shape as sync run()
webhook?: {
url, events, secret?, headers?,
deliveries: [{ id, event, attempt, status, httpCode?, error? }, ...]
}
}getStatus() reads this file and returns:
- the embedded
responseonce the run is terminal, - a provisional snapshot with real-time
progressfields while work is in flight, status: 'not_found'witherrors[0].code === 'NOT_FOUND'if no state file exists.
Heartbeat: the agent loop updates progress at each turn boundary (streaming start, tool dispatch, turn end). Writes are throttled to at most one per 500ms AND only when activity changes, so R2 costs stay predictable even on long runs.
Webhooks
Async runs deliver status changes to a URL you own. Pass a webhook
object to start() / resumeAsync() and the engine POSTs the final
EngineResponse whenever the run reaches a terminal or pause state.
await engine.start({
runId: 'run_abc',
nodeId: 'node_1',
task: 'long task',
webhook: {
url: 'https://your-app.com/hooks/la-machina',
secret: 'shared-hmac-secret', // optional — enables X-LaMachina-Signature
events: ['paused', 'done', 'failed'], // default: all three
headers: { 'X-Tenant': 'acme' }, // optional — passed through per request
},
})Events — what fires and when
Three events, mapped 1:1 from EngineResponse.status:
| Event | Fires when | data field | meta.pauseReason |
|---|---|---|---|
| done | Model reached end_turn cleanly | Output string (or parsed JSON in structured-output mode) | — |
| paused | Gate callback returned { allow: false }, OR run needs runner handoff | null | gate_required | handoff_to_runner |
| failed | Anything threw (API 5xx after retries, max turns, timeout, cancel, runner unreachable) | null | — (errors[0] has the cause) |
queued, running, and not_found never fire webhooks — they're
only observable via getStatus() polling. Webhooks are terminal /
pausal only.
When webhooks do vs don't fire
| API call | Webhooks? | Why |
|---|---|---|
| engine.start({webhook}) | ✓ fires on terminal/pause | |
| engine.resumeAsync({webhook}) | ✓ fires on terminal/pause | |
| engine.run() | never | Caller already has the response in hand |
| engine.resume() | never | Same — synchronous, caller holds the result |
| engine.cancelRun(runId) | in-flight run aborts and fires failed | Cancellation is a normal failure path |
Webhooks are for the async surface exclusively. Anything running synchronously returns its response directly.
Request shape
POST {webhook.url} with body = JSON.stringify(EngineResponse) and:
| Header | Value | Notes |
|---|---|---|
| Content-Type | application/json | |
| X-LaMachina-Event | status.done | status.paused | status.failed | Event-type routing on the receiver |
| X-LaMachina-RunId | Run ID from your start() call | Correlate with client-side state |
| X-LaMachina-Delivery | Fresh UUID per attempt | Use this for idempotency — same delivery ID = retry of same logical event |
| X-LaMachina-Timestamp | Unix ms | Covered by the HMAC — lets receivers reject replays |
| X-LaMachina-Signature | sha256=<hex> | Only when secret is set; see "Verifying the signature" below |
| (user headers) | whatever you passed in webhook.headers | Merged last, cannot override engine headers |
Request timeout is 30 s by default. The engine aborts slower receivers and treats them as a retryable network failure.
Payload — one schema for every event
The body is always an EngineResponse (the same shape engine.run()
returns). The event type determines which fields are meaningful:
done payload:
{
"runId": "run_abc",
"status": "done",
"data": "The analysis is complete. Revenue grew 15% YoY.",
"meta": {
"nodeId": "analyze",
"turns": 5,
"tokensUsed": { "input": 12500, "output": 3200, "cacheReadInput": 8000 },
"durationMs": 8500,
"output": "The analysis is complete. Revenue grew 15% YoY.",
"transcript": { "path": "projects/run_abc/nodes/analyze", "lastShardIndex": 2 }
},
"errors": [],
"timestamp": 1712966400000
}paused payload:
{
"runId": "run_abc",
"status": "paused",
"data": null,
"meta": {
"nodeId": "publish",
"pauseReason": "gate_required",
"turns": 3,
"tokensUsed": { "input": 8200, "output": 1900 },
"pendingToolCall": {
"toolName": "Publish",
"toolUseId": "toolu_01abc",
"input": { "post": { "title": "...", "body": "..." } }
},
"transcript": { "path": "projects/run_abc/nodes/publish", "lastShardIndex": 1 }
},
"errors": [],
"timestamp": 1712966400000
}Use pendingToolCall.input to render an approval UI, then call
engine.resumeAsync({ runId, gateAnswer: 'approve', webhook: {...} })
to continue. A separate done (or failed) webhook will fire for the
resumed run.
failed payload:
{
"runId": "run_abc",
"status": "failed",
"data": null,
"meta": {
"nodeId": "n1",
"cancelled": true // present when the failure was engine.cancelRun()
},
"errors": [
{ "code": "CANCELLED", "message": "Run was cancelled by client" }
// Other codes: RUN_FAILED, RESUME_FAILED, ERR_RUNNER_UNREACHABLE, ERR_MAX_TURNS, ORPHANED, …
],
"timestamp": 1712966400000
}The errors[] array holds {code, message} pairs — use errors[0].code
for programmatic routing, message for display.
Verifying the signature
When webhook.secret is set, the engine signs
${X-LaMachina-Timestamp}.${body} with HMAC-SHA256 and sets
X-LaMachina-Signature: sha256=<hex>. Verify in Node:
import { createHmac, timingSafeEqual } from 'node:crypto'
function verifyLaMachinaWebhook(req: Request, rawBody: string, secret: string): boolean {
const ts = req.headers.get('x-lamachina-timestamp')
const sig = req.headers.get('x-lamachina-signature')
if (!ts || !sig) return false
// Reject replays older than 5 minutes
if (Math.abs(Date.now() - Number(ts)) > 5 * 60_000) return false
const expected =
'sha256=' +
createHmac('sha256', secret).update(`${ts}.${rawBody}`).digest('hex')
// Constant-time comparison
const a = Buffer.from(sig)
const b = Buffer.from(expected)
return a.length === b.length && timingSafeEqual(a, b)
}On Cloudflare Workers (Web Crypto, no node:crypto):
async function verifyLaMachinaWebhook(req: Request, rawBody: string, secret: string) {
const ts = req.headers.get('x-lamachina-timestamp')
const sig = req.headers.get('x-lamachina-signature')
if (!ts || !sig) return false
if (Math.abs(Date.now() - Number(ts)) > 5 * 60_000) return false
const key = await crypto.subtle.importKey(
'raw',
new TextEncoder().encode(secret),
{ name: 'HMAC', hash: 'SHA-256' },
false,
['sign'],
)
const buf = await crypto.subtle.sign('HMAC', key, new TextEncoder().encode(`${ts}.${rawBody}`))
const expected =
'sha256=' +
Array.from(new Uint8Array(buf))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
return expected === sig
}Always verify against the raw bytes you read from the request. Re-serializing the parsed JSON will produce different bytes and the signature won't match.
Idempotency — receivers MUST handle duplicates
X-LaMachina-Delivery is unique per attempt, but retries of the same
logical event may send the same payload to your endpoint multiple
times (network flaps, receiver returns 5xx, etc.). De-duplicate on:
X-LaMachina-Delivery— reject second delivery with the same ID- OR
runId + status + timestamp— simpler, event-level dedup
Pattern: insert the delivery ID into a short-TTL cache (Redis, R2, DB unique constraint); on collision return 200 without reprocessing.
Retry schedule
Fixed schedule per delivery attempt:
attempt 1: immediate
attempt 2: +10 s (after the previous attempt's failure)
attempt 3: +60 s
attempt 4: +5 min
attempt 5: +30 min
→ give upRetry decisions:
| Receiver response | Retry? | |---|---| | 2xx | No — delivered | | 408 Request Timeout | Yes | | 429 Rate Limited | Yes | | 5xx (500–599) | Yes | | 410 Gone | No — give up immediately (resource intentionally removed) | | Other 4xx (400/401/403/404/…) | No — payload/auth bug; retrying won't help | | Network error / timeout | Yes |
Every attempt — success or failure — is appended to
state.webhook.deliveries[] in state.json, including the HTTP status,
error message, delivery ID, timestamps, and attempt number. Inspect
via engine.getStatus(runId) or read state.json directly from R2.
Manual replay
If the receiver was down and the engine has already given up (5 attempts exhausted, or 4xx stopped retries), replay any past delivery:
const status = await engine.getStatus(runId)
const missed = status.meta.webhook?.deliveries.find((d) => d.status === 'failed')
if (missed) {
await engine.retryWebhook(runId, missed.id)
}retryWebhook fires a fresh POST with a new delivery ID (so
receivers that already processed the original ID won't reject it as a
dup — this is a deliberate re-issuance, not a network retry) and
continues the retry schedule from attempt 1.
Correlated pause → resume
When a run emits paused, the client typically gathers a decision and
calls resumeAsync({runId, gateAnswer, webhook}). The resumed run
will emit another webhook on completion — usually done, sometimes
paused again if the model hits a second gate, or failed if resume
fails. Receivers should track runId state across events:
| State sequence | Meaning |
|---|---|
| paused → done | Happy-path HITL — approved and completed |
| paused → paused → done | Multi-step approval — each gate wake fires its own event |
| paused → failed (CANCELLED) | User rejected at the gate and cancelled the run |
| paused → (no follow-up) | Orphaned — caller never called resumeAsync |
Use runId as your correlation key across all events for a run.
What's NOT a webhook event (deliberate omissions)
These are intentionally out of scope:
- Per-turn progress — too chatty. Poll
getStatus(runId)for live turn / token / activity updates (the heartbeat writesstate.jsonevery ~500 ms when activity changes). - Per-tool dispatch — that's what
preToolCall/postToolCallhooks are for (in-process, synchronous). - Subagent lifecycle — the parent's terminal/pause state is what fires; child runs are opaque to external receivers.
- Resume started / resume completed —
resumeAsync()returns immediately with{runId, nodeId, status: 'running'}; the next webhook you'll see is the resumed run's terminal state.
If you need finer-grained updates, use getStatus() polling — it
reads the heartbeat-updated state.json and gives you
turns / tokensUsed / currentActivity / lastTool in real time
without any webhook-driven traffic.
Node.js example — sync HITL and async HITL together
import { initEngine, Engine } from 'la-machina-engine'
const { config } = initEngine({
model: { provider: 'anthropic', apiKey: process.env.ANTHROPIC_API_KEY },
storage: { provider: 'r2', rootPath: 'tenants/acme', workspaceId: 'default', r2: { ... } },
hooks: {
gateBeforeTool: (toolName) =>
toolName === 'Write' ? { allow: false, reason: 'human approval' } : { allow: true },
},
})
const engine = new Engine(config)
// Async run with webhook — returns immediately
const { runId } = await engine.start({
runId: 'run_' + Date.now(),
nodeId: 'n1',
task: 'Refactor the config module.',
webhook: { url: 'https://app.example.com/hooks', secret: process.env.HOOK_SECRET },
})
// Later: client polls
const current = await engine.getStatus(runId, 'n1')
if (current.status === 'paused') {
// Human approves → resume (with no gate) asynchronously
await engine.resumeAsync({
runId,
nodeId: 'n1',
snapshot: current.meta.snapshot,
webhook: { url: 'https://app.example.com/hooks', secret: process.env.HOOK_SECRET },
})
const final = await engine.waitFor(runId, { nodeId: 'n1', timeoutMs: 300_000 })
console.log('final:', final.status, final.data)
}
// Startup: recover any runs that crashed mid-execution
const orphaned = await engine.recoverOrphanedRuns({ staleThresholdMs: 5 * 60_000 })
console.log('recovered', orphaned.length, 'orphaned runs')MCP — auth refresh + sampling
Two opt-in features for MCP server integrations (Plan 018):
headersProvider — refresh OAuth tokens between requests:
mcp: {
servers: {
github: {
type: 'http',
url: 'https://mcp.github.example.com',
headers: { 'X-Tenant': 'acme' }, // static
headersProvider: async () => ({ // dynamic, called per send
Authorization: `Bearer ${await refreshGithubToken()}`,
}),
},
},
}The provider is called before every MCP request; its result merges over the static headers. On HTTP 401 the engine invokes the provider a second time and retries the request once. Without this hook, a long run dies the moment its bearer token expires (~1 hour for OAuth).
allowSampling — let an MCP server request LLM completions through the engine:
mcp: {
servers: {
research_tools: {
type: 'http',
url: 'https://mcp.research.example.com',
allowSampling: true, // OFF by default
},
},
}
// Optional — provide a custom handler. Default routes to engine's own ModelAdapter.
new Engine(config, {
samplingHandler: async (request, context) => {
// request.messages, request.maxTokens, request.systemPrompt, ...
// context.serverName, context.depth, context.runId
return {
role: 'assistant',
model: 'cheap-model-for-mcp',
content: { type: 'text', text: '...' },
stopReason: 'endTurn',
}
},
})When allowSampling: false (the default), the engine omits the sampling capability from its MCP handshake — servers that try to call sampling/createMessage get a "method not supported" error from the SDK directly.
When allowSampling: true, the engine installs a request handler that routes to either your custom samplingHandler or a built-in default that uses the engine's own model. The default handler refuses recursive sampling past DEFAULT_SAMPLING_MAX_DEPTH = 3 to prevent loops. Token usage from sampling counts against any tokenBudget you've set on the parent run.
Off-by-default is deliberate: sampling consumes your LLM budget. Opt in per server only when you've vetted the MCP.
Cloudflare Workers — three building blocks
A Worker deployment needs three pieces beyond the standard engine:
- Storage: native R2 binding via
storage.provider: 'r2-binding'— avoids the@aws-sdk/client-s3bundle and its ListObjectsV2 hang on the Workers runtime. - Agent loop lifetime: Durable Objects — the default fire-and-forget executor can't survive a Worker request return, so wrap work in
ctx.waitUntil()inside a DO, or provide a customBackgroundExecutor. - MCP transport:
preferBindingTransport: true— makes the engine's MCP client use plain POST JSON-RPC instead of the SDK's Streamable-HTTP SSE client (which hangs on Workers afterinitialize).
Storage — R2 binding provider
import { initEngine, Engine } from 'la-machina-engine'
const { config } = initEngine({
model: { provider: 'anthropic', apiKey: env.ANTHROPIC_API_KEY },
storage: {
provider: 'r2-binding',
rootPath: 'tenants/acme',
workspaceId: 'default',
r2Binding: env.STORAGE, // the R2Bucket binding from wrangler.toml
},
})
const engine = new Engine(config)No S3 credentials, no endpoint URL — the binding handles auth. Works with wrangler dev --local (Miniflare emulates R2 in-memory).
wrangler.toml:
[[r2_buckets]]
binding = "STORAGE"
bucket_name = "la-machina"
preview_bucket_name = "la-machina-preview"Agent loop lifetime — Durable Objects
Each runId maps to a DO via idFromName(runId). The DO calls engine.start() inside ctx.waitUntil(), which keeps the isolate alive past the Worker request's return. Resumes route to the same DO so they pick up the paused snapshot.
export class RunDurableObject extends DurableObject<Env> {
override async fetch(req: Request): Promise<Response> {
const body = await req.json()
this.ctx.waitUntil(this.doRun(body)) // keeps DO alive until done
return new Response(null, { status: 202 })
}
private async doRun(body: StartBody): Promise<void> {
const engine = buildEngine(this.env, body.rootPath)
await engine.start({
runId: body.runId,
nodeId: body.nodeId,
task: body.task,
...(body.webhook ? { webhook: body.webhook } : {}),
})
await engine.waitFor(body.runId, { nodeId: body.nodeId, pollIntervalMs: 500 })
}
}Alternative (advanced): implement BackgroundExecutor and pass it via EngineInternals.backgroundExecutor if you want engine.start() itself to schedule into a DO from the Worker fetch handler. See examples/cloudflare-worker-ts/src/runDO.ts for the common-case pattern.
MCP on Workers — preferBindingTransport
initEngine({
// ...
mcp: {
servers: {
flow: {
type: 'http',
url: 'https://your-mcp-server.com/mcp',
preferBindingTransport: true, // ← Workers-safe
},
},
},
})When this flag is set, the engine's MCP client uses BindingHttpTransport — a stateless POST-only JSON-RPC transport. No long-lived SSE reader, no streaming notifications (not needed for tool calling).
On Node, leave the flag off to keep the full Streamable-HTTP feature set.
Working reference
A complete TypeScript example is at examples/cloudflare-worker-ts/:
src/env.ts— builds an Engine withr2-binding+preferBindingTransportsrc/runDO.ts—RunDurableObjectwithctx.waitUntil()src/index.ts—POST /sync,POST /async/start,GET /async/status/:runId,POST /async/resume/:runId,POST /demo/webhookreceiver with HMAC verificationmcp-server/server.mjs— local HTTP MCP server for the memo-pipeline demotest-client.sh— end-to-end curl demo
Run:
cd examples/cloudflare-worker-ts
cp .dev.vars.example .dev.vars && $EDITOR .dev.vars
bunx wrangler dev --local
./test-client.shEverything else (state.json, webhooks, polling, resume, recovery) works unchanged.
External APIs — the ApiCall built-in
When you configure one or more services via config.api, the engine
auto-registers an ApiCall tool that lets the model call tenant-scoped
external HTTP APIs without ever seeing credentials. The model picks
a service name from a closed enum; the engine injects auth from
your env map (or a resolveAuth callback for dynamic schemes).
const engine = initEngine({
// …model, storage, etc.
api: {
services: [
{
name: 'widgets',
baseUrl: 'https://api.acme.example/v1',
auth: { type: 'bearer', tokenRef: 'widgets:token' },
allowedPaths: [/^\/widgets(\/\d+)?$/], // optional safety rail
},
],
env: { 'widgets:token': 'sk_real_token' }, // loaded from your vault
},
})The model sees ApiCall with a service enum locked to your list.
When it calls ApiCall({ service: 'widgets', method: 'POST',
path: '/widgets', body: {...} }), the engine resolves the bearer
token from env, attaches it as Authorization: Bearer ..., and
fetches. The token never enters the model's context, the transcript,
state.json, logs, or any response field — a dedicated test suite
(apiCallSecretIsolation.test.ts) enforces this.
Multi-tenant SaaS — pass per-tenant services via RunOptions.api
instead of config.api, so one engine instance serves many tenants:
await engine.run({
task: '...',
api: {
services: tenantServices,
env: tenantEnv,
},
})Auth types (the first four are zero-code, the last is the escape hatch):
| Type | Shape | Header produced | Use for |
|---|---|---|---|
| none | { type: 'none' } | — | Public APIs |
| bearer | { type: 'bearer', tokenRef } | Authorization: Bearer <env[tokenRef]> | OpenAI, GitHub PAT, Airtable |
| header | { type: 'header', name, valueRef } | <name>: <env[valueRef]> | SendGrid (X-API-Key), any single-header API |
| basic | { type: 'basic', userRef, passRef } | Authorization: Basic <base64(user:pass)> | Twilio, Bitbucket |
| custom | { type: 'custom', id } | Whatever resolveAuth returns | OAuth refresh, HMAC signing, JWT minting |
For custom, supply resolveAuth(auth, ctx): an async function the
engine calls per dispatch. The ctx carries serviceName, method,
path so HMAC-style schemes can sign the request context.
api: {
services: [{ name: 'gdrive', baseUrl: '...', auth: { type: 'custom', id: 'oauth:google' } }],
resolveAuth: async (auth, ctx) => {
if (auth.type === 'custom' && auth.id === 'oauth:google') {
const token = await oauthCache.getFreshAccessToken(tenantId)
return { Authorization: `Bearer ${token}` }
}
return {}
},
}Safety rails enforced per-call: service enum lockdown, per-service
allowedPaths + allowedMethods, maxBodyBytes cap,
maxResponseBytes cap, case-insensitive auth-header sanitizer (the
model cannot spoof Authorization via input.headers).
Observability: onRequest / onResponse hooks fire around each
dispatch with { service, method, path, status, latencyMs, bytesIn }
— no secrets — for metering, billing, audit logs.
Raw-body uploads (0.9.0): set bodyEncoding: 'raw' on the call to
send a string body verbatim — no JSON.stringify. Use for upload APIs
that take binary or text-blob bodies (Drive media upload, S3
PutObject, image push, log ingestion, etc.). The caller-supplied
Content-Type header wins; default falls back to text/plain.
ApiCall({
service: 'gdrive',
method: 'PATCH',
path: `/upload/drive/v3/files/${fileId}?uploadType=media`,
headers: { 'Content-Type': 'text/markdown' },
body: '# Hello world',
bodyEncoding: 'raw',
})bodyEncoding: 'raw' requires the body to be a string — passing an
object returns ERR_API_RAW_BODY_NOT_STRING and never makes the
fetch. The default 'json' encoding behaves exactly as in 0.8.x.
maxBodyBytes is enforced under both encodings.
Disabling: tools.disabled: ['ApiCall'] turns it off even when
services are configured. Absent config.api → tool never registered,
no prompt mention.
Tool-result offload — keep the context lean on chatty runs
Research-style runs often fire a dozen WebSearch / WebFetch /
ApiCall invocations and each one can return tens of KB. The raw
payloads flood the main context even before the compactor kicks in.
Tool-result offload is the preventive fix: when a tool returns
more bytes than your threshold, the engine stores the full content
under the agent's log path and replaces the in-context message with
a short deterministic summary + a ref token. A built-in FetchData
tool rehydrates the original payload on demand — same UX as
SkillPage for skills.
Off by default. Flip it on at the engine level:
const engine = initEngine({
compaction: {
toolResultOffload: {
enabled: true,
thresholdBytes: 2048, // default: 2048 (2 KB)
maxPreviewChars: 500, // default: 500
},
},
})When enabled, every tool result whose body exceeds thresholdBytes
lands at
projects/{runId}/nodes/{nodeId}/toolResults/{toolUseId}.json
(subagents offload under their own subagents/{agentId}/toolResults/…),
and the model sees a summary like:
[WebSearch] Array of 10 items (14.2 KB).
First item preview: {"title":"…","url":"…"}
Use FetchData with ref="toolu_abc" to read the full array.The model calls FetchData({ ref: "toolu_abc" }) when it actually
needs the raw bytes. One extra round-trip, and only when the
information is actually required.
Per-run override (SaaS pattern — one engine, different thresholds per tenant or per task):
await engine.run({
task: '…',
compaction: {
toolResultOffload: { enabled: true, thresholdBytes: 4096 },
},
})Behavioural invariants:
- Error tool results are never offloaded. The model needs them verbatim to adapt; replacing them with a ref would break debugging.
FetchData's own output is never re-offloaded. Would trap the model in a hydrate loop.- Each agent sees only its own refs. A subagent can't
FetchDataa parent's offloaded blob —FetchDatais bound to each agent's log path at construction. - Refs survive resume. Blobs live in the same storage adapter as transcripts and snapshots; resuming a paused run still has access to every offloaded payload.
- Strict
>at threshold. A result at exactlythresholdBytesstays inline.
Pluggable summarizer — the default is deterministic (shape-aware for JSON arrays and top-level objects, first-N-chars for arbitrary text). Users wanting semantic summaries can plug in their own:
toolResultOffload: {
enabled: true,
summarizer: async (ctx) => {
// ctx = { toolName, toolInput, rawContent, rawBytes, ref, maxPreviewChars }
// Return a string; it MUST include `ctx.ref` so the model can
// call FetchData to rehydrate the full content.
return myCustomSummary(ctx)
},
}Coming in a future release: an engine-shipped LLM summarizer (call a small/fast model to write a real summary instead of the deterministic one). Track via Plan 021's "Deferred with triggers" list. The
summarizercallback IS the extension point the LLM version will plug into, so users implementing custom summarizers today are on the stable surface.
Disabling: tools.disabled: ['FetchData'] removes the tool even
when offload is enabled. Absent config.compaction.toolResultOffload
→ tool not registered, no threshold checks, no storage writes, no
prompt mention.
When offload actually saves tokens. Offload wins when (a) a run
makes many tool calls and (b) the model rarely needs the full payload
of most of them — e.g. a research run with 12 WebSearch calls where
only 2 are deep-read for quotes. If the model needs the full content
of every tool call (a single-shot "fetch this large thing and answer")
offload costs an extra FetchData round-trip without saving anything.
Rule of thumb: leave it off for single-tool-call workflows, turn it
on for multi-call research / browsing / repeated-API flows. Benchmark
on your workload — the live test at
scripts/workflows/w17-offload-live.mjs shows how.
Knowledge base — SearchKnowledge + ReadKnowledge
When the model needs to look things up in your tenant's docs without
loading whole files into context, opt into the knowledge base. Two
built-in tools — SearchKnowledge (token-overlap-ranked snippets)
and ReadKnowledge (one section or a whole file) — let an agent
walk a per-tenant vault on demand.
Layout. Each tenant gets a folder at
workspaces/{workspaceId}/knowledge/, sibling to .claude/. Top-
level subfolders are bases — independent corpora each with their
own pre-built _index.json:
workspaces/acme-corp/
├── .claude/ # engine state — transcripts, memory, …
└── knowledge/
├── hr-policies/ # base: "hr-policies"
│ ├── _index.json # built by writeKnowledgeIndex()
│ ├── handbook.md
│ └── remote-work.md
└── sales-playbook/ # base: "sales-playbook"
├── _index.json
└── q1/
└── pricing.mdBuild the index when the corpus changes — for each base, the
indexer walks its subtree, splits markdown at heading boundaries,
tokenises section bodies, and writes one _index.json per base:
import { writeKnowledgeIndex, R2StorageAdapter } from 'la-machina-engine'
const k = new R2StorageAdapter(r2Config, 'workspaces/acme-corp/knowledge')
await writeKnowledgeIndex({ adapter: k, base: 'hr-policies' })
await writeKnowledgeIndex({ adapter: k, base: 'sales-playbook' })Forgot to build the index? Both tools fall back to an in-memory
build on first call when _index.json is missing or corrupted. The
fallback caches for the rest of the run, so subsequent searches are
free. This makes the index a performance optimisation (skip the walk
on every fresh run), not a setup requirement — drop files into the
folder and the agent can discover them immediately. Pre-build with
writeKnowledgeIndex() for production-scale corpora where the
first-call cost matters.
Configure the engine to enable the tools (off by default):
const engine = initEngine({
storage: { provider: 'r2', /* … */ },
knowledge: {
enabled: true, // engine-level capability flag
maxSearchResults: 5, // top-K per SearchKnowledge call
maxReadBytes: 10_000, // ReadKnowledge truncation cap
},
})Per-run scoping. Folders are runtime-only — pass them via
RunOptions.knowledge.folders. Sub-paths inside a base work too
(e.g., 'sales-playbook/q1' only sees Q1 content):
await engine.run({
task: 'What is our 401k match rate?',
knowledge: {
folders: ['hr-policies', 'sales-playbook/q1'],
external: [
// External file links — fetched on demand, never indexed.
// `headers` are runtime-only and NEVER persist anywhere.
{
name: 'product-catalog',
description: 'Product catalog CSV with unit pricing',
url: 'https://api.acme.example/catalog.csv',
format: 'csv',
headers: { Authorization: 'Bearer sk_real_token' },
},
],
},
})The model then calls:
SearchKnowledge({ query: '401k matching' })→ top-K ranked snippets, each with areflikehr-policies/handbook.md#benefitsReadKnowledge({ ref: 'hr-policies/handbook.md#benefits' })→ full body of that sectionReadKnowledge({ ref: 'ext:product-catalog' })→ fetches the registered URL with its headers, runs thecsvextractor, returns text
Format support. Native: md, txt, json, csv, html (script/
style stripped, entities decoded, whitespace collapsed). Optional:
pdf (via pdf-parse) and docx (via mammoth). Both have
requiresNode: true — on Workers without those packages installed,
they return a structured ERR_KNOWLEDGE_FORMAT_UNSUPPORTED error.
Path safety. All folder + ref strings flow through one validator
in src/knowledge/scope.ts that rejects absolute paths, traversal
(..), unsafe characters, and out-of-base file refs. A dedicated
test (scope.test.ts) pins the behaviour — every weakening would
open a tenant-boundary hole.
External link headers — non-persistence guarantee. External
headers live entirely inside the tool factory closure and on the
init.headers of one fetch call per request. They never reach the
LLM, the transcript, state.json, snapshots, or any storage write.
A sentinel-based test suite (externalLinkSecrets.test.ts) and the
live R2 test (scripts/workflows/w20-knowledge-r2.mjs) verify this:
the live test seeds a known sentinel into the Authorization header,
runs against real R2, and reads every transcript shard back from the
bucket asserting zero leaks.
Composing with offload. If a ReadKnowledge result exceeds your
compaction.toolResultOffload.thresholdBytes, the offload pipeline
takes over — the body is written under toolResults/, the model
sees a summary + ref, and FetchData rehydrates on demand. The two
features compose without any extra wiring.
Disabling. tools.disabled: ['SearchKnowledge', 'ReadKnowledge']
turns the tools off even when knowledge is enabled. Absent
config.knowledge.enabled → no adapter built, no tools registered,
no prompt mention.
Codebase layout
The knowledge subsystem is small and self-contained. If you need to extend it (new format extractor, custom scorer, alternative index schema), these are the files involved:
src/
├── knowledge/ # subsystem (self-contained)
│ ├── types.ts # V1-suffixed public types (KnowledgeFolderRefV1, KnowledgeExternalLinkV1, KnowledgeFormatV1, ResolvedKnowledgeConfigV1, RunKnowledgeOptionsV1, SectionEntryV1, KnowledgeIndexV1, …)
│ ├── scope.ts # parseFolderRef / parseKnowledgeRef / refInScope — load-bearing path safety
│ ├── tokenize.ts # tokenize() + scoreOverlap() — deterministic, no LLM
│ ├── indexer.ts # buildKnowledgeIndex() / writeKnowledgeIndex() — section split + wiki-link extraction
│ └── extractors.ts # getExtractor(format) — md/txt/json/csv/html native; pdf/docx lazy-import
│
├── tools/
│ ├── searchKnowledge.ts # createSearchKnowledgeTool() — token-overlap ranked snippets
│ └── readKnowledge.ts # createReadKnowledgeTool() — section / file / ext: ref dispatch
│
├── storage/
│ ├── interface.ts # adds optional `EngineStorage.knowledge?`
│ └── factory.ts # builds the knowledge adapter at workspaces/{ws}/knowledge/ when enabled
│
├── config/
│ ├── types.ts # ResolvedConfig.knowledge?: ResolvedKnowledgeConfigV1
│ ├── schema.ts # KnowledgeConfigResolved zod schema (scalars only — no folders/headers)
│ └── merge.ts # KNOWLEDGE_DEFAULTS + fillKnowledgeDefaults()
│
├── engine/
│ ├── engine.ts # resolveKnowledgeRuntime() + buildToolRegistry knowledge wire-up
│ └── types.ts # adds `knowledge?: RunKnowledgeOptionsV1` to RunOptions/ResumeOptions
│
└── index.ts # public exports: writeKnowledgeIndex, buildKnowledgeIndex,
# createSearchKnowledgeTool, createReadKnowledgeTool, getExtractor,
# KnowledgeFormatV1, KnowledgeIndexV1, RunKnowledgeOptionsV1, …
test/
├── unit/
│ ├── knowledge/
│ │ ├── tokenize.test.ts # 15 tests — stop-words, dedup, scoring
│ │ ├── indexer.test.ts # 16 tests — section split, wiki-links, recursion
│ │ ├── scope.test.ts # path-safety pin (every weakening = tenant-boundary hole)
│ │ ├── extractors.test.ts # 17 tests — all formats including pdf/docx fallbacks
│ │ └── externalLinkSecrets.test.ts # 7 sentinel-based non-persistence tests
│ ├── tools/
│ │ ├── searchKnowledge.test.ts # caching, multi-base, sub-path, cap, factory rejection
│ │ └── readKnowledge.test.ts # 18 tests — all three ref kinds + error paths
│ └── config/
│ └── knowledgeSchema.test.ts # 13 tests — defaults, partials, header rejection
│
└── integration/engine/
├── knowledgeE2E.test.ts # 6 scenarios — registration, round-trip, disabled, sub-path, subagent inheritance
├── knowledgeWithOffload.test.ts # large ReadKnowledge → offload blob + clean transcript
└── knowledgeMultiBase.test.ts # multi-base ranking, base-prefixed refs, indexed + external mix
scripts/workflows/
├── w20-knowledge-r2.mjs # live R2 — vault search/read + external link
└── w21-external-files-knowledge.mjs # live R2 — md/json/csv/html external file round-tripsrc/knowledge/ is the only directory you need to touch to add a
new format. Append a new KnowledgeExtractorV1 to extractors.ts
and add the type to KnowledgeFormatV1 in types.ts — everything
else is dispatched off getExtractor(format).
Sync vs. async — when to use which
| Scenario | Use |
|---|---|
| Simple task, < 60s | engine.run() (sync) |
| HITL where you can block the caller | engine.run() + engine.resume() |
| Long task, client can't block | engine.start() + getStatus / waitFor |
| HITL in a web app (user closes tab) | engine.start() + webhook on paused |
| Cloudflare Workers (any non-trivial run) | storage.provider: 'r2-binding' + DO + preferBindingTransport |
| Worker needs Bash / stdio MCP | Async + config.runner → handoff to Node (see below) |
| Server crash recovery | engine.recoverOrphanedRuns() on startup |
Runner contract — Node-only tools on Workers
Cloudflare Workers can't spawn processes, which means no Bash, no
stdio-based MCP, no ripgrep. When the engine detects this, it
replaces each such tool with a capability stub — same name, same
description (so the model still sees it in the catalogue), but calling
the stub returns isError: true with a structured message.
You have two options when a Worker run needs those tools:
- Sync run (
engine.run()) — the stub executes, the model adapts its answer ("I couldn't run Bash in this environment, so here's what I can tell you…"), and the run completesstatus: 'done'withmeta.capabilitiesMissing: ['Bash', …]so callers can detect missing capabilities and decide whether to retry elsewhere. - Async run (
engine.start()/engine.resumeAsync()) withconfig.runnerset — the engine intercepts the stub call, pauses with reason'handoff_to_runner', and POSTs{ runId }to your runner. The runner is a separate Node process that reads the snapshot from the same R2 bucket, resumes with real tools registered, and writes the final state back. Worker'sengine.waitFor(runId)returns'done'once the runner finishes.
The engine ships no runner package — you build yours against the
HTTP contract below. A ~100-line reference implementation you can
fork lives at examples/runner-node/.
Configuring the Worker side
const engine = initEngine({
// …storage, model, etc.
runner: {
url: 'https://runner.tenant-a.internal/continue',
secret: process.env.RUNNER_SECRET, // shared with the runner
},
})Leave runner unset to disable handoff entirely — stubbed tools then
fall back to the sync-style graceful degradation even on async runs.
The HTTP contract
A runner must implement two endpoints:
POST /continue — called by the engine when an async run hits a
Node-only tool.
Headers:
Authorization: Bearer <secret> # MUST match config.runner.secret
Content-Type: application/json
Body:
{ "runId": string }
Response:
202 Accepted — runner accepted, will process in background
401 Unauthorized — bad bearer
400 Bad Request — missing / malformed runIdBehavior:
- Verify the bearer token.
- Call
engine.resumeAsync({ runId })on a runner-side engine configured with:- The same R2 bucket + rootPath + workspaceId as the Worker
- The real Node-only tools registered (Bash, stdio MCP, etc.)
- The same LLM provider config
- No
config.runner(the runner doesn't hand off further)
- Return 202 immediately; the engine's own background executor finishes the run and writes state back to R2.
GET /health — returns 200 when the runner accepts /continue.
POST failures
If the runner POST throws (network error) or returns non-2xx, the
engine flips the run to status: 'failed' with error code
ERR_RUNNER_UNREACHABLE before finalizing — callers never see a
silent hang. Rotate the bearer secret by updating both ends and
redeploying; in-flight runs during the rotation fail with
ERR_RUNNER_UNREACHABLE and can be retried.
Per-tenant isolation
Deployment concern, not engine concern. Run one runner process per tenant when secrets must not be shared or when tenants need resource isolation. Each tenant's Worker points at its matching runner URL
