@agent-workflow/engine
v1.0.0
Published
Agent Workflow Protocol — reference engine (definition validation and execution)
Readme
@agent-workflow/engine
Publishable npm package: definition-time validation for Agent Workflow Protocol workflow documents per docs/engine-profile.md, an append-only execution history port (SQLite or in-memory), a linear graph runner, and a general graph walker with switch, interrupt / resume, parallel join policies (all / any / n_of_m), wait (duration / until; signal needs a host), set_state, agent_delegate (in-process mock A2A), and subworkflow (nested runs with depth limit; register child defs via registerWorkflowRef).
Entrypoint (CLI)
From the repository root (after npm install):
node packages/engine/src/cli.mjs validate path/to/workflow.jsonOr use the package bin name when linked:
npx workflows-engine validate path/to/workflow.jsonValidate an operator MCP manifest (Cursor-style mcpServers subset for stdio servers):
node packages/engine/src/cli.mjs mcp-manifest validate path/to/mcp.jsonSee docs/architecture/arc42-assets/contracts/mcp-operator-manifest.md and exports validateMcpOperatorManifest, readAndValidateMcpOperatorManifestFile, resolveMcpOperatorManifestPath from the package entrypoint.
MCP stdio adapter
Run from repository root:
npm run engine:mcp:stdioOr from the engine workspace:
npm run mcp:stdio --workspace=@agent-workflow/engineOr invoke the bin entrypoint directly:
npx workflows-engine-mcpNo-install npm usage for MCP hosts:
# consume the latest alpha channel publish (use -p: package ships two bins)
npx -y -p @agent-workflow/engine@alpha workflows-engine-mcp
# consume a pinned, reproducible package version
npx -y -p @agent-workflow/[email protected] workflows-engine-mcpOperator setup (default for MCP clients) — register the published package; the host runs npx and does not need this repository on disk. Use -y (non-interactive) and -p so npm selects the workflows-engine-mcp bin when both workflows-engine and workflows-engine-mcp are present:
{
"mcpServers": {
"workflow-engine": {
"command": "npx",
"args": ["-y", "-p", "@agent-workflow/engine@alpha", "workflows-engine-mcp"]
}
}
}Development setup — point node at packages/engine/src/mcp-stdio-server.mjs inside your clone when working on the adapter or engine.
This starts a dedicated MCP stdio adapter layer with tools workflow_start, workflow_status, workflow_resume, workflow_submit_activity, workflow_signal, workflow_cancel, and workflow_list. The adapter maps MCP request DTOs to the stable application port (createWorkflowApplicationPort) and translates engine failures into structured MCP tool errors with stable error codes.
Cooperative cancel: workflow_cancel appends ExecutionCancelled at host pause points (awaiting_signal, awaiting_activity, interrupted). It does not interrupt an in-process node that is actively executing inside the same call stack.
Operator smoke runbook: docs/architecture/arc42-assets/runbooks/mcp-stdio-host-smoke.md.
Execution history store (workflows-engine-mcp)
By default the MCP stdio bin uses an in-memory execution history store (non-persistent; history is lost when the process exits). For durable runs across restarts, enable SQLite:
workflows-engine-mcp --store sqlite --store-path /path/to/runs.sqliteEnvironment equivalents (CLI flags override env):
| Variable | Role |
|----------|------|
| WORKFLOW_ENGINE_STORE | memory (default) or sqlite |
| WORKFLOW_ENGINE_STORE_PATH | SQLite database file path when store is sqlite |
On startup the process logs the selected backend to stderr, for example [engine-mcp-stdio] execution history store: sqlite (/path/to/runs.sqlite).
Shared-host risks: A SQLite file is a single-writer append log. Do not point multiple MCP engine processes at the same database file unless you accept coordination risk (WAL helps readers but concurrent writers on the same executionId can still corrupt monotonic seq assumptions). Restrict filesystem permissions on the database path; backups and migration are operator responsibilities. Treat the file like any local secret-adjacent artifact on shared machines.
Migrating from in-memory: Stop the in-memory server (history in that process cannot be exported). Start a new process with --store sqlite --store-path <path> (or env equivalents). In-flight executions cannot be resumed across the switch; clients must use new execution_id values or replay from exported history if you built a custom export path via the library store API.
Requires Node.js ≥ 22.5.0 (node:sqlite). See Execution history below for library embedder details and table layout.
Assistant hosts that own LLM/tool credentials should pass activity_execution_mode: "host_mediated" and complete activities via workflow_submit_activity. End-user guide: docs/user/host-mediated-activities.md (ADR-0002).
Engine-direct tool_call execution (optional)
By default, workflows-engine-mcp uses the in-process stub executor for activity placeholders when no operator activity config is set. That default is intended for local demo and smoke tests only — not a silent production fallback. For production, configure at least one real sub-executor (see Composite activity routing below).
To run tool_call nodes against real MCP stdio servers, enable engine-direct configuration:
- Environment: set
WORKFLOW_ENGINE_MCP_CONFIGto the absolute or relative path of an operator MCP manifest JSON file. - CLI: pass
--mcp-config <path>after the bin name (overridesWORKFLOW_ENGINE_MCP_CONFIGwhen both are set).
The manifest matches the schema validated by workflows-engine mcp-manifest validate (Cursor-style mcpServers with stdio command / args / env). Workflow nodes use tool_call with config.server (manifest key) and config.tool (MCP tool name). If the file is missing or invalid JSON/schema, the process exits with code 1 before accepting MCP traffic; errors are written to stderr.
Security, credentials, and trust boundaries for this profile are documented in ADR-0003: Engine-direct MCP activity execution. Host-mediated completion via workflow_submit_activity is unchanged when you use activity_execution_mode: host_mediated; after a submit, continuations still use the same port-level executor when engine-direct is enabled.
Engine-direct llm_call execution (LlmActivityExecutor)
LlmActivityExecutor implements the ActivityExecutor port for llm_call nodes. It reads node.config (model, system_prompt, user_prompt / prompt, optional output_schema) and resolves provider credentials from operator config — not workflow JSON (RFC-07 §7.3).
import { LlmActivityExecutor, runGraphWorkflow } from "@agent-workflow/engine";
const activityExecutor = new LlmActivityExecutor({
operatorConfig: {
apiKeyEnv: "OPENAI_API_KEY", // or apiKeySecretRef: "env:OPENAI_API_KEY" / "file:.secrets/key"
baseUrl: "https://api.openai.com/v1", // optional OpenAI-compatible base
},
});Inject a custom LlmProvider for tests or non-OpenAI backends; the default uses fetch against /chat/completions with no extra SDK. Structured outputs are validated with AJV when output_schema is set. In-process executor failures use LLM_OUTPUT_VALIDATION_FAILED; at the activity boundary (in-process and host-mediated submit) schema violations surface as OUTPUT_SCHEMA_VIOLATION on ActivityFailed. Other stable executor codes: LLM_CONFIG_INVALID, LLM_CREDENTIALS_MISSING, LLM_PROVIDER_ERROR.
Prompt resolution (buildLlmChatMessages)
LlmActivityExecutor builds the provider chat payload from node.config and the current workflow state via buildLlmChatMessages:
| Message | Source |
|---------|--------|
| system | config.system_prompt when present — passed through as a literal string |
| user | config.user_prompt or config.prompt (alias) when present — literal string; otherwise JSON.stringify(state) when state has keys; otherwise the fixed fallback "Respond according to the system instructions." |
No jq or state templating. Unlike agent_delegate, subworkflow, and engine-direct tool_call nodes (which resolve config.input_mapping with jq), llm_call prompts are not evaluated against workflow state. Placeholders in system_prompt or user_prompt are sent to the provider unchanged.
When prompts must be derived from state (for example inserting ticket_text into a user message), use one of:
- A preceding
set_statenode to shape state, then omituser_promptso the executor serializes state into the user message (lighthouseclassifyuses this pattern). activity_execution_mode: "host_mediated"so the host templates prompts fromnode.configandstatebefore calling its LLM and submitting the outcome (seedocs/user/host-mediated-activities.md).
Engine-direct step execution (StepActivityExecutor)
StepActivityExecutor implements the ActivityExecutor port for step nodes. It reads node.config.handler (a string URN) and looks up the implementation in an operator-provided StepHandlerRegistry registered at bootstrap — not in workflow JSON.
import { StepActivityExecutor, StepHandlerRegistry, runGraphWorkflow } from "@agent-workflow/engine";
const registry = new StepHandlerRegistry();
registry.register("urn:my-app:handlers:lookup-customer", async (ctx) => {
return { customerId: "c-1", name: "Ada" };
});
const activityExecutor = new StepActivityExecutor({ registry: registry.createFrozenCopy() });v1 sandboxing: handlers run in the same Node.js process as the engine (in-process dispatch). There is no worker isolation, VM sandbox, or resource cap in this profile milestone — treat registered handlers as trusted operator code. Isolated worker processes are deferred to a later release.
WORKFLOW_ENGINE_STEP_HANDLERS is static output only. The MCP stdio bin reads this env var as a JSON map of handler URN → fixed output object. Each entry is wrapped as an async handler that returns that object unchanged — there is no way to supply programmatic logic (no I/O, no state inspection, no side effects). Use this for smoke tests and conformance-style fixtures. For real handler code, register async functions via StepHandlerRegistry.register at library bootstrap (see example above) and pass the frozen registry to StepActivityExecutor / createWorkflowApplicationPort.
Stable failure codes: STEP_CONFIG_INVALID (missing/invalid handler), HANDLER_NOT_FOUND (URN not registered), HANDLER_ERROR (handler threw). Success merges handler output into workflow state per state_schema reducers.
Composite activity routing (CompositeActivityExecutor)
CompositeActivityExecutor is the production router for step, llm_call, and tool_call nodes. It dispatches by ctx.node.type to optional sub-executors (StepActivityExecutor, LlmActivityExecutor, McpManifestActivityExecutor). When a routed node type has no configured sub-executor, execution fails with stable code COMPOSITE_EXECUTOR_NOT_CONFIGURED (unless an explicit fallback is injected — see demo profile below).
import {
buildCompositeActivityExecutor,
LlmActivityExecutor,
McpManifestActivityExecutor,
StepActivityExecutor,
} from "@agent-workflow/engine";
const activityExecutor = buildCompositeActivityExecutor({
step: new StepActivityExecutor({ registry }),
llm_call: new LlmActivityExecutor({ operatorConfig: { apiKeyEnv: "OPENAI_API_KEY" } }),
tool_call: new McpManifestActivityExecutor({ manifest }),
});Pass the composite (or any custom ActivityExecutor) to createWorkflowApplicationPort({ activityExecutor }) or runGraphWorkflow({ activityExecutor }).
workflows-engine-mcp wiring: when any operator config is present, the stdio bin loads a composite via loadProductionActivityExecutor:
| Env var | Sub-executor |
|---------|----------------|
| WORKFLOW_ENGINE_MCP_CONFIG / --mcp-config | tool_call → McpManifestActivityExecutor |
| WORKFLOW_ENGINE_LLM_CONFIG (inline JSON or file path) | llm_call → LlmActivityExecutor |
| WORKFLOW_ENGINE_STEP_HANDLERS (inline JSON or file path; static URN → output map only — not programmatic handlers) | step → StepActivityExecutor |
When none of these are set, the MCP adapter omits activityExecutor and the walker uses StubActivityExecutor (demo/local only). Set WORKFLOW_ENGINE_PROFILE=demo to add stub fallback inside a partial composite (unconfigured node types return {} instead of COMPOSITE_EXECUTOR_NOT_CONFIGURED). Do not rely on stub fallback in production deployments.
On successful startup, workflows-engine-mcp writes a structured activity routing summary to stderr so operators can see which node types are production-configured vs missing before workflows fail at runtime:
[engine-mcp-stdio] activity routing: llm_call=production, tool_call=missing, step=missing
[engine-mcp-stdio] demo stub fallback: inactiveRoute values: production (env/config present), missing (partial composite, no demo fallback — hits COMPOSITE_EXECUTOR_NOT_CONFIGURED at runtime), stub(demo) (unconfigured but covered by WORKFLOW_ENGINE_PROFILE=demo fallback), stub(default) (no operator activity config; full in-process stub mode).
Delegate routing (CompositeDelegateExecutor)
agent_delegate nodes run through a DelegateExecutor port (executeDelegate(ctx)). Omit delegateExecutor to use MockA2ADelegateExecutor (offline demos). Production adapters:
| Executor | Protocol | Wiring |
|----------|----------|--------|
| A2ADelegateExecutor | a2a | HTTP submit + poll (operatorConfig.baseUrl, apiKeyEnv) |
| McpDelegateExecutor | mcp | Operator manifest delegateAgents → MCP stdio tools/call |
| SdkDelegateExecutor | sdk | In-process Map<agent_id, handler> (extension point for embedded agents) |
Route multiple protocols with buildCompositeDelegateExecutor({ a2a, mcp, sdk }). Stable failure codes include DELEGATE_AGENT_NOT_FOUND, DELEGATE_PROTOCOL_ERROR, and DELEGATE_PROTOCOL_UNSUPPORTED. See A2A delegate mapping and MCP operator manifest.
Operator constraints:
A2ADelegateExecutorruns submit + poll inside the control-plane call (workflow_start, resume, or in-process continuation). Poll may block up topollTimeoutMs(default 120s); MCP stdio hosts often timeout sooner. Poll throws on A2Ainput-required— it does not yield.McpDelegateExecutoris single-shot (one stdiotools/call; no status poll loop). For long-running, interactive, orinput-requireddelegates, passactivity_execution_mode: "host_mediated"and complete viaworkflow_submit_activity(host-mediated guide, A2A input-required migration).
import {
buildCompositeDelegateExecutor,
McpDelegateExecutor,
SdkDelegateExecutor,
} from "@agent-workflow/engine";
const delegateExecutor = buildCompositeDelegateExecutor({
mcp: new McpDelegateExecutor({ manifest }),
sdk: new SdkDelegateExecutor({
handlers: {
"urn:my-app:agents:local": async (input) => ({ patch: input.task }),
},
}),
});workflows-engine-mcp wiring: when any operator config is present, the stdio bin loads a composite via loadProductionDelegateExecutor:
| Env var | Sub-executor |
|---------|----------------|
| WORKFLOW_ENGINE_A2A_CONFIG (inline JSON or file path) | a2a → A2ADelegateExecutor |
| WORKFLOW_ENGINE_MCP_CONFIG / --mcp-config (delegateAgents in manifest) | mcp → McpDelegateExecutor |
When none of these are set, the MCP adapter omits delegateExecutor and the walker uses MockA2ADelegateExecutor (demo/local only). Set WORKFLOW_ENGINE_PROFILE=demo to add mock fallback inside a partial composite (unconfigured protocols succeed with in-process mock output instead of DELEGATE_PROTOCOL_UNSUPPORTED). Do not rely on mock fallback in production deployments.
Invalid WORKFLOW_ENGINE_A2A_CONFIG (missing baseUrl) or an invalid operator manifest fails at MCP stdio startup with readable stderr messages (same pattern as activity bootstrap).
Host compatibility constraints for no-install use
- Node runtime: Node.js
>=22.5.0is required (usesnode:sqlite). - Process launch model: Host must be able to spawn
npxand execute package bin commands. - Transport expectation: Host must communicate over MCP stdio with piped
stdin/stdout. - Stderr behavior: Treat
stderras logs/diagnostics; do not parse protocol frames fromstderr.
MCP tool contracts (minimum set)
workflow_start- args:
{ execution_id?: string, definition: object, input: object, activity_execution_mode?: "in_process" | "host_mediated", allow_existing_execution_id?: boolean } - returns:
{ execution_id, status, final_state?, result?, error?, node_id?, state?, parallel_span? } - notes: if
execution_idis omitted, the engine generates a stable UUID and returns it for follow-up calls. Reusing anexecution_idthat already has persisted history is rejected withDUPLICATE_EXECUTION_IDunlessallow_existing_execution_id: true(replay/idempotency continuation). Withactivity_execution_mode: "host_mediated", the engine returnsstatus: "awaiting_activity"after recordingActivityRequestedfor the next activity node; the host completes it viaworkflow_submit_activity.
- args:
workflow_status- args:
{ execution_id: string } - returns:
{ execution_id, phase, current_node_id?, last_error? } - notes:
phase/current_node_id/last_errorare projected deterministically from persisted execution history (including resume/checkpoint-driven progress), not adapter-local mutable state. Phaseawaiting_activityindicates the last non-checkpoint event isActivityRequested(host-mediated pending).
- args:
workflow_resume- args:
{ execution_id: string, definition: object, resume_payload: object, activity_execution_mode?: "in_process" | "host_mediated" } - returns:
{ execution_id, status, final_state?, result?, error?, node_id?, state?, parallel_span? } - notes: resume payloads are validated against the interrupt node
resume_schema; invalid or stale resume attempts return structured tool errors.
- args:
workflow_submit_activity- args:
{ execution_id: string, definition: object, input: object, node_id: string, outcome: { ok: true, result?: object } | { ok: false, error: string, code?: string }, parallel_span?: object, activity_execution_mode?: "in_process" | "host_mediated" } - returns: same shape as
workflow_resumeresults, plus optionalcodewhenstatusisfailedfrom submit validation (usually surfaced as a tool error instead). - notes: append activity success/failure after a host-mediated yield;
definitionmust match the canonical hash bound at the latestCheckpointWritten(definitionHash); mismatch returnsSUBMIT_VALIDATION_ERROR.inputmust match the originalworkflow_start(replay). For activities under aparallelbranch, passparallel_spanmatching theparallel_spanreturned fromworkflow_start/ prior submit.
- args:
workflow_signal- args:
{ execution_id, definition, input, signal_name, payload?, activity_execution_mode? } - returns: same shape as
workflow_resumeresults when continuing after a signal wait. - notes: delivers
DeliverSignal/SignalReceivedfor a pendingwaitnode withconfig.kind: signal. Signalpayloadkeys merge into workflow state viastate_schemareducers. Unknown execution ids returnEXECUTION_NOT_FOUND.
- args:
workflow_cancel- args:
{ execution_id, reason? } - returns:
{ execution_id, status: "cancelled" | "failed", reason?, error?, code? } - notes: cooperative cancel at pause points; unknown execution ids return
EXECUTION_NOT_FOUND; terminal runs returnCANCEL_NOT_ALLOWED.
- args:
workflow_list- args:
{ phase?, definition_name?, updated_after?, updated_before?, limit?, cursor? } - returns:
{ executions: [{ execution_id, phase, definition_name?, updated_at? }], next_cursor? } - notes: lists persisted executions newest-first; default page size 50, max 100.
- args:
Structured adapter error codes:
VALIDATION_ERROR— MCP request payload fails contract validation.EXECUTION_NOT_FOUND— requested execution id has no persisted history (workflow_status/ store lookups).DUPLICATE_EXECUTION_ID—workflow_startreused anexecution_idthat already has history withoutallow_existing_execution_id: true.INVALID_RESUME_PAYLOAD— resume payload fails schema, definition hash mismatch vs latest checkpoint, or resume is stale/not allowed.ACTIVITY_SUBMIT_NOT_AWAITING— cannot submit: execution missing or last event is notActivityRequested.ACTIVITY_SUBMIT_NODE_MISMATCH—node_iddoes not match the pending activity.ACTIVITY_SUBMIT_PARALLEL_MISMATCH—parallel_spanmissing or does not match the pendingActivityRequested.SUBMIT_VALIDATION_ERROR— submit request failed definition/store validation before append.SIGNAL_NOT_AWAITING/SIGNAL_NAME_MISMATCH/SIGNAL_VALIDATION_ERROR— signal delivery rejected.CANCEL_NOT_ALLOWED/CANCEL_VALIDATION_ERROR— cancel rejected (terminal run or invalid args).ENGINE_FAILURE— engine reported workflow failure that is not an adapter contract issue.INTERNAL_ERROR— unexpected adapter failure.File argument: Path to a file containing canonical JSON (RFC-03: normalized JSON, not YAML at runtime).
Stdin: Omit the file argument or pass
-to read JSON from stdin.
Exit codes:
| Code | Meaning | |------|---------| | 0 | Document is valid against the bundled workflow schema. | | 1 | JSON parsed but schema validation failed (details on stderr). | | 2 | Usage error, I/O failure, or JSON parse error. |
On validation failure, stderr lists each AJV error with instancePath, keyword, schemaPath, params, and message where present so documents can be fixed without guessing.
Library API
The package exports:
validateWorkflowDefinition(data)— returns{ ok: true }or{ ok: false, errors }whereerrorsis AJV’sErrorObject[](includesinstancePath,keyword,schemaPath, etc.).compileWorkflowValidator()— returns a reusable(data) => { ok: true } | { ok: false, errors }function; the compiled schema is cached per process.findWorkflowRepoRoot(startDir?)— locates the workflows monorepo root (lighthouse fixture + rootpackage.jsonnamedworkflows) for tests and examples; schema loading prefers the bundledpackages/engine/schemas/workflow-definition.jsonwhen present.runGraphWorkflow(...)/resumeGraphWorkflow(...)/submitActivityOutcome(...)— general graph walker withswitch,interrupt, parallel joins, and composition nodes (see General graph orchestration below).
Linear orchestration
API: runLinearWorkflow({ definition, input, executionId, store, stubActivityOutputs?, activityExecutor? }) → Promise<{ status: 'completed'|'failed', finalState?, result?, error? }>.
Phases: validate (bundled workflow schema + reject state_schema.properties.*.reducer === "custom") → start (ExecutionStarted) → walk each node on the unique chain from __start__ through exactly one start … end → complete (ExecutionCompleted with jq result) or fail (ExecutionFailed, plus FailNode command when failure happens after start).
Graph rules: Edges must form a single linear path covering every node: exactly one edge from __start__, at most one outgoing edge per node, no cycles, exactly one start and one end. switch and interrupt nodes are rejected by this runner (use the general graph walker). Unknown topology errors throw from computeLinearNodePath or return { status: 'failed', error } from runLinearWorkflow.
Activity boundary: step, llm_call, and tool_call are executed through an ActivityExecutor port (executeActivity(ctx) → success with output or failure with error / optional code). The walker only calls this port (no MCP, HTTP, or provider SDKs inside the runner). StubActivityExecutor is the default when activityExecutor is omitted: deterministic, returns {} or per-node outputs from an outputsByNodeId map (library demos and tests). CompositeActivityExecutor routes production workloads to configured sub-executors; pass activityExecutor to inject real adapters; pass stubActivityOutputs only affects the default stub when activityExecutor is omitted.
Retry: Per-node retry settings (max_attempts, initial_interval, backoff_coefficient, max_interval, non_retryable_errors) are applied for step, llm_call, and tool_call in the graph walker and linear runner. Intermediate failures emit ActivityFailed with attempt (1-based) and willRetry: true; the next attempt emits ActivityRequested with an incremented attempt. Replay uses the last ActivityCompleted per node and does not re-invoke the activity port.
Timeout: Per-node timeout duration strings (500ms, 30s, 1m, 1h) are enforced for the same activity node types. In-process execution races the activity port against the deadline and records ActivityFailed with code TIMEOUT when exceeded. McpManifestActivityExecutor passes min(node timeout, defaultTimeoutMs) to MCP tools/call. Host-mediated runs include timeoutMs on ActivityRequested so the host can fail the submit with code TIMEOUT if work exceeds the deadline (see docs/user/host-mediated-activities.md).
Node types in this runner: start, end, and step / llm_call / tool_call behind the activity executor. Default stub outputs are {}; override per node id with stubActivityOutputs[nodeId] (merged into state via reducers).
State: Initial state is a shallow copy of input. After each non-end node, outputs are merged using state_schema.properties.<key>.reducer: overwrite (default), append (array concat), merge (deep object merge). After each merge, state is validated with Ajv against state_schema (reducer annotations are stripped for compilation only).
end node config.output_mapping: Must be a jq program string. It is evaluated with jq’s input root = the current workflow state object (after all reducer updates from prior nodes). If output_mapping is omitted, the runner uses . (identity). Evaluation uses the jq-wasm package (WebAssembly jq — no native compile).
History (command/event names): Appends include at least ExecutionStarted; for each node ScheduleNode (command) and NodeScheduled (event); for activities ActivityRequested then ActivityCompleted or ActivityFailed; CompleteNode (command); StateUpdated (event) after state changes; terminal ExecutionCompleted or ExecutionFailed. On activity failure the runner records ActivityFailed, then FailNode (reason: "activity_failed"), then ExecutionFailed. Payloads include executionId and nodeId where applicable.
Helpers (also exported): assertNoCustomReducers(definition), applyOutputWithReducers(state, output, stateSchema), computeLinearNodePath(nodes, outgoingMap).
General graph orchestration
API: runGraphWorkflow({ definition, input, executionId, store, stubActivityOutputs?, activityExecutor?, activityExecutionMode? }) and resumeGraphWorkflow({ definition, executionId, store, resumePayload, stubActivityOutputs?, activityExecutor?, activityExecutionMode? }). activityExecutionMode defaults to "in_process" (run ActivityExecutor immediately). With "host_mediated", the walker returns { status: 'awaiting_activity', nodeId, state, parallelSpan? } after ActivityRequested (assistant-class hosts opt in explicitly — see ADR-0002). Continue by appending the outcome and calling runGraphWorkflow again, or use submitActivityOutcome({ definition, executionId, store, input, nodeId, outcome, expectedParallelSpan?, ... }) (also exported) which validates the pending request and re-enters the walker. Parallel branches attach parallelSpan to ActivityRequested; submits for those nodes must pass the same expectedParallelSpan.
runGraphWorkflow supports node types start, end, step, llm_call, tool_call, switch, interrupt, parallel, wait, set_state, agent_delegate, and subworkflow. Phases and command/event names match the linear runner (ExecutionStarted, ScheduleNode, NodeScheduled, activity events, CompleteNode, StateUpdated, terminal ExecutionCompleted / ExecutionFailed), plus interrupt lifecycle: RaiseInterrupt, InterruptRaised, and on resume ResumeInterrupt, InterruptResumed. On entering an interrupt node the walker appends RaiseInterrupt / InterruptRaised (payload includes nodeId and a short prompt summary) and returns { status: 'interrupted', executionId, nodeId, state } without CompleteNode for that node until resumeGraphWorkflow runs.
switch routing: Successors come only from config.cases (first jq match wins; jq input root is the current workflow state object, same as the linear runner) and config.default when no case matches. If any cases exist and none match and default is omitted, the run fails with a clear error. Static edges whose source is the switch node id are ignored for routing (they may exist in documents; the engine does not follow them). This matches the routing guidance in docs/engine-profile.md (avoid duplicate routing channels).
Static edges (non-switch): Exactly one outgoing edge from __start__, and from each of start, step, llm_call, tool_call, and interrupt; none from end. The walker does not require outgoing edges from switch nodes.
Resume: resumeGraphWorkflow loads history, takes the latest StateUpdated payload state, validates resumePayload with Ajv against the interrupt node’s config.resume_schema (reducer annotations stripped the same way as workflow state_schema), merges resume fields into state (overwrite keys), then continues from the single static successor of the interrupt node. Invalid resume appends FailNode (reason: "resume_validation_failed" when schema fails) and ExecutionFailed. If the last event is not InterruptRaised, resume fails with FailNode / ExecutionFailed and reason: "resume_not_allowed".
Checkpoint policy: checkpointing is on by default (definition.checkpointing omitted ⇒ after_each_node). Disable or tune via definition.checkpointing.strategy (alias policy):
| Strategy | Effect |
|----------|--------|
| after_each_node (default) | Emit CheckpointWritten on each eligible boundary below. |
| every_n_nodes / interval | Emit every n eligible boundaries (n or interval integer ≥ 1). |
| disabled / off / none | No CheckpointWritten events. |
Eligible boundaries (when enabled and the interval counter allows):
- after each
StateUpdated(activity nodes,switch,parallelbranch progress,wait,set_state,agent_delegate,subworkflow, and post-resume continuation), - after
InterruptRaised, - after
InterruptResumedstate is recorded.
Parallel branches may attach parallelSpan on the checkpoint payload. Each checkpoint includes workflowVersion, definitionHash (SHA-256 of canonical JSON with lexicographically sorted object keys; see packages/engine/src/canonical-json.mjs and RFC-03), lastAppliedEventSeq, nodeId, and stateRef (inline_state snapshot today). Resume, activity submit, and graph continuation verify caller definition against the latest checkpoint hash when a checkpoint exists.
Recovery loading: hydrateReplayContext({ startMode: "safe_point" }) prefers the latest valid CheckpointWritten boundary and starts replay from lastAppliedEventSeq + 1. If checkpoints are absent or invalid, hydration falls back to genesis replay (startSeq = 1).
Workflow references
subworkflow nodes resolve config.workflow_ref at runtime through src/orchestrator/workflow-ref-resolver.mjs.
registerWorkflowRef(workflowRef, definition)— register a parsed child definition object before running a parent that referencesworkflowRef. Registrations last for the process lifetime.resolveWorkflowRef(workflowRef, { versionPin? })— resolve a registry URN, built-in URN (monorepo checkout), or HTTP(S) URL. OptionalversionPin(fromconfig.version_pin) must match the SHA-256 hash of the child definition’s canonical JSON.computeWorkflowDefinitionHash(definition)— SHA-256 of canonical JSON (same algorithm as checkpointdefinitionHash).setWorkflowRefFetchImpl(fetch)— injectfetchfor tests or restricted hosts.clearWorkflowRefs()— reset registry and fetch cache (tests and long-lived hosts).
Exported from the package entrypoint (@agent-workflow/engine).
Monorepo checkout: one built-in reference resolves from disk when the workflows repository root is discoverable (findWorkflowRepoRoot()): urn:awp:wf:unit-tests → examples/r3-unit-tests-child.workflow.json.
Published npm install: the tarball ships only src/, schemas/, and this README (examples/ is not bundled). Built-in URNs do not resolve on disk. Choose one of:
registerWorkflowRef— load child JSON from your artifact store (or bundle it in your app) and register eachworkflow_refbeforerunGraphWorkflow/ MCPworkflow_start.- HTTP(S)
workflow_ref— pointconfig.workflow_refat a hosted definition URL; the engine fetches and caches definitions keyed by ref +version_pin. Pin withconfig.version_pinset tocomputeWorkflowDefinitionHash(childDefinition)so cache hits and refetches are verified.
Operator-oriented summary: arc42 cross-cutting — workflow reference resolution. Release notes: alpha — known limitations.
Execution history
Port: ExecutionHistoryStore (documented in src/persistence/types.mjs) — append-only, per-executionId ordering.
append(executionId, { kind: 'command' | 'event', name, payload })→ assignedseq(integer, starts at 1 per execution).readRange(executionId, fromSeq?, toSeq?)— inclusive bounds when provided; rows ordered byseqascending.listByExecution(executionId)— all rows for that execution.listExecutions({ phase?, definitionName?, updatedAfter?, updatedBefore?, limit?, cursor? })— list execution summaries across ids (newestupdatedAtfirst).phasematches status projection values (running,completed,failed,interrupted,awaiting_activity,awaiting_signal,cancelled). Pagination: defaultlimit50, max 100;cursorisupdatedAt|executionIdfrom a prior page’snextCursor.
Adapters:
SqliteExecutionHistoryStore— uses built-innode:sqlite(DatabaseSync). Pass{ path }for a file or:memory:; pass{ database }to inject aDatabaseSync(caller closes it if needed). Requires Node.js ≥ 22.5.0. Callclose()to release a store-opened connection.MemoryExecutionHistoryStore— array-backed; same monotonic semantics for tests.
Storage layout (SQLite table history):
| Column | Type | Role |
|----------------|---------|------|
| execution_id | TEXT | Correlation key for one run |
| seq | INTEGER | Monotonic sequence per execution (part of primary key) |
| kind | TEXT | command or event |
| name | TEXT | Command/event name (protocol taxonomies) |
| payload_json | TEXT | JSON-serialized payload object |
| created_at | TEXT | ISO 8601 timestamp when the row was appended |
| record_schema_version | INTEGER | Persisted row envelope version (not document.schema); see below |
Primary key: (execution_id, seq). Historical rows are not updated or deleted by the adapter API.
Envelope versioning: Each row is stamped with record_schema_version (currently 1). Opening an existing database without this column runs ALTER TABLE … ADD COLUMN … DEFAULT 1. Replay, resume, and status paths call assertHistoryReadableByEngine: rows newer than this engine build fail fast so hosts upgrade @agent-workflow/engine instead of corrupting replay. Policy and read rules: docs/persistence-history-record-versioning.md.
Concurrency: Treat the store as single-writer per process for a given execution (and avoid multiple processes appending to the same file for the same execution id). SQLite assigns seq inside a transaction (BEGIN IMMEDIATE … COMMIT) that reads MAX(seq) for the execution and then inserts the next row, so ordering stays monotonic for that writer.
Why node:sqlite: Avoids native addon builds (for example on Windows without a full C++ toolchain). On some Node versions the module may log an experimental/RC warning; behavior is still suitable for this append-only log.
Example:
import { validateWorkflowDefinition, SqliteExecutionHistoryStore } from "@agent-workflow/engine";
import { readFileSync } from "node:fs";
const data = JSON.parse(readFileSync("workflow.json", "utf8"));
const result = validateWorkflowDefinition(data);
if (!result.ok) console.error(result.errors);
const history = new SqliteExecutionHistoryStore({ path: "./runs.sqlite" });
const seq = history.append("run-1", { kind: "command", name: "StartRun", payload: {} });
console.log(seq, history.listByExecution("run-1"));
history.close();Stable “valid definition” boundary (for later engine stories)
- Schema contract: The engine validates against the same file as CI and
scripts/validate-workflows.mjs:schemas/workflow-definition.json(JSON Schema Draft 2020-12). - Ajv options:
allErrors: true,strict: false— identical toscripts/validate-workflows.mjsto avoid drift from “repo truth.” - Engine-specific limits: None beyond the schema and JSON parse rules. The engine does not enforce file size limits,
document.schemaversion bumps, or trace companions; only the workflow definition JSON shape is checked. - Resolution rule: The engine loads
schemas/workflow-definition.jsonfrom the published package (packages/engine/schemas/next tosrc/, kept in sync with the repo canonical schema). In a full workflows monorepo checkout it can fall back to the rootschemas/copy.findWorkflowRepoRoot()locates the monorepo root viaexamples/lighthouse-customer-routing.workflow.jsonand the rootpackage.jsonnamedworkflows(for tests and fixtures), not via schema path alone.
Tests
npm test --workspace=@agent-workflow/enginePackaging verification guidance
From packages/engine, validate the publish payload before release:
npm pack --dry-runCheck that:
- tarball metadata resolves to
@agent-workflow/enginewith the intended version/tag source, - both binaries are present:
src/cli.mjsandsrc/mcp-stdio-server.mjs, - bundled workflow schema is present:
schemas/workflow-definition.json, - runtime/library entrypoint is present:
src/index.mjs, - payload is minimal (runtime
src/, bundledschemas/, package docs), with no test fixtures or unrelated repository files.
