@graph-compose/execution-kernel
v1.0.0
Published
Shared execution kernel — graph building, scheduling, validation, and state primitives for workflow DAG execution
Readme
@graph-compose/execution-kernel
The execution kernel is the foundation layer for DAG-based workflow orchestration in Graph Compose. It provides the core primitives (graph building, dependency scheduling, validation, state management, and a pluggable orchestrator) that higher-level packages build on.
If you just want to run HTTP workflows, use @graph-compose/runtime instead. The runtime is a batteries-included package that wires the kernel together with sensible defaults. The kernel is for you if you need to:
- Add custom node types beyond HTTP
- Change how nodes are scheduled or executed
- Plug in lifecycle hooks (logging, persistence, error conversion)
- Build your own orchestrator on top of the kernel's primitives
Installation
npm install @graph-compose/execution-kernelPeer Dependencies
The kernel uses Temporal for durable execution:
npm install @temporalio/workflow @temporalio/commonThe built-in HTTP activity optionally uses axios:
npm install axios # only needed if using the built-in httpCall activityHow It Works
The kernel executes a WorkflowGraph (a JSON definition of nodes and their dependencies) by repeatedly finding nodes whose dependencies are satisfied, executing them in parallel, and recording results, until every node is complete.
WorkflowGraph (JSON)
│
▼
┌─────────────┐
│ Validation │ assertNoCycles, assertDependencyTargetsExist, ...
└──────┬──────┘
▼
┌─────────────┐
│ buildGraph │ WorkflowGraph → RuntimeGraph (graphlib DAG)
└──────┬──────┘
▼
┌──────────────────────────────────────────┐
│ Execution Loop │
│ │
│ while (!stateManager.flowIsFinished()) │
│ nodes = getNextReadyNodes(graph) │
│ for each node: │
│ handler = handlers.get(node.type) │
│ handler.execute(node, ctx) │
│ plugins.onBatchComplete(nodeIds) │
│ │
│ plugins.onAfterExecution() │
│ return stateManager.getAllResults() │
└──────────────────────────────────────────┘Package Layout
src/
orchestrator.ts WorkflowOrchestrator, the extensible base class
plugins.ts NodeHandler and ExecutionPlugin interfaces
graph.ts buildGraph, DAG construction from WorkflowGraph
scheduler.ts getNextReadyNodes, topological batch scheduling
state.ts IWorkflowStateManager interface + base implementation
validation.ts Composable validators (cycles, deps, expressions)
types.ts Core domain types (BaseRuntimeNode, RuntimeGraph)
handlers/ Built-in HttpNodeHandler
temporal/ Temporal bindings (queries, reference workflow)
activities/ Activity implementations + type contractsCore Concepts
WorkflowOrchestrator
The central class. It validates a workflow graph, builds a DAG, then runs the execution loop dispatching nodes to registered handlers.
import { WorkflowOrchestrator } from "@graph-compose/execution-kernel";
const orchestrator = new WorkflowOrchestrator(
workflowGraph, // WorkflowGraph - the JSON workflow definition
workflowInfo, // { workflowId, orgId } - metadata for the run
activities, // { resolveExpression, httpCall } - Temporal activity proxies
{
context: { userId: "user_123" }, // variables available via {{ context.* }}
validate: myCustomValidator, // optional - replaces default validation
nodeHandlers: [myCustomHandler], // optional - additional node type handlers
plugins: [myPlugin], // optional - lifecycle hooks
},
);
const result = await orchestrator.buildWorkflow();
// => { context, results: { nodeId: { data, statusCode, headers } }, workflowId }The orchestrator ships with a built-in HttpNodeHandler registered for type: "http" nodes. You can register additional handlers for custom node types, or override the HTTP handler entirely.
Graph Building
buildGraph converts a WorkflowGraph JSON definition into a RuntimeGraph (a directed graph powered by graphlib). Edges represent dependency relationships.
import { buildGraph } from "@graph-compose/execution-kernel";
const graph = buildGraph(workflowGraph, {
// All options are optional:
multigraph: true, // enable labeled edges (for DEPENDENCY vs PROTECTS etc.)
transformNode: (raw) => MySchema.parse(raw), // validate/enrich nodes during construction
nodeFilter: (node) => node.type !== "comment", // exclude certain node types
});
graph.nodes(); // ["fetch_user", "enrich", "notify"]
graph.node("fetch_user"); // the full node object
graph.inEdges("enrich"); // [{ v: "fetch_user", w: "enrich" }]Scheduling
getNextReadyNodes is a pure function that returns all nodes whose dependencies are satisfied and that haven't been executed yet. The orchestrator calls it each iteration of the execution loop.
import { getNextReadyNodes } from "@graph-compose/execution-kernel";
const ready = getNextReadyNodes(graph, executedNodeIds, {
// Optional filters:
edgeTypeFilter: (edge) => edge.name === "dependency", // for multigraphs
nodeFilter: (node) => node.type !== "error_boundary", // exclude certain types
});Nodes returned in the same batch have no dependencies on each other and can execute concurrently.
Validation
The kernel provides individual validator functions that can be composed into a pipeline:
import {
composeValidators,
assertDependencyTargetsExist,
assertNoCycles,
assertExpressionsAreValid,
} from "@graph-compose/execution-kernel";
// Compose your own validation pipeline
const validate = composeValidators(
assertDependencyTargetsExist, // every dependency references an existing node
assertNoCycles, // the graph is acyclic
assertExpressionsAreValid, // all {{ }} expressions parse as valid JSONata
myCustomValidator, // your own checks (e.g. node type restrictions)
);
validate(workflowGraph); // throws on first failureThe WorkflowValidator type is simply (workflow: WorkflowGraph) => void. Throw to reject, return to accept.
State Management
IWorkflowStateManager is the interface the orchestrator uses to track node execution state. The kernel provides a default implementation (WorkflowStateManager) for the simple three-state model (pending / executed / executed_and_failed).
import type { IWorkflowStateManager } from "@graph-compose/execution-kernel";IWorkflowStateManager methods:
| Method | Returns | Description |
|--------|---------|-------------|
| getExecutedNodeIds() | string[] | IDs of all non-pending nodes |
| flowIsFinished() | boolean | true when every node has reached a terminal state |
| setExecutionResult(nodeId, result) | Promise<void> | Record a successful node execution |
| getAllResults() | WorkflowResults & { workflowId } | Final aggregated results |
| getWorkflowState() | GraphWorkflowState | Snapshot with context, executed list, and results |
| getNodeResult(nodeId) | NodeResult | Result for a specific node (throws if not available) |
| getNodeState(nodeId) | unknown | Full state object for a node |
To use a custom state model (e.g. with additional states like streaming or awaiting_signal), implement IWorkflowStateManager and override createStateManager on your orchestrator subclass.
Extension Model
The orchestrator supports two complementary strategies for customization.
Strategy 1: Plugins (composition, no subclassing)
Use this when you want to add new node types or hook into lifecycle events.
NodeHandler: custom node types
A NodeHandler is responsible for executing a single node type. It must:
- Perform the node's work
- Record the result via
ctx.stateManager.setExecutionResult() - Throw on unrecoverable failure
import type { NodeHandler, NodeExecutionContext } from "@graph-compose/execution-kernel";
interface SlackNode {
id: string;
type: "slack";
dependencies?: string[];
channel: string;
message: string;
}
const slackHandler: NodeHandler<SlackNode> = {
type: "slack",
async execute(node: SlackNode, ctx: NodeExecutionContext): Promise<void> {
// ctx.activities contains your Temporal activity proxies
// ctx.stateManager lets you read results from upstream nodes
// ctx.workflowInfo has { workflowId, orgId }
const upstream = ctx.stateManager.getAllResults();
const response = await sendSlackMessage(node.channel, node.message);
await ctx.stateManager.setExecutionResult(node.id, {
data: response,
statusCode: 200,
headers: {},
});
},
};
// Register at construction time
const orchestrator = new WorkflowOrchestrator(graph, info, activities, {
nodeHandlers: [slackHandler],
});
// Or register after construction
orchestrator.registerNodeHandler(slackHandler);ExecutionPlugin: lifecycle hooks
An ExecutionPlugin hooks into specific points in the workflow lifecycle at two levels: workflow-level (once per workflow) and per-node (once per node execution). Here's exactly when each hook fires:
buildWorkflow() called
│
▼
╔═══════════════════════════════════╗
║ onBeforeExecution ║ Called once, before the first node executes.
╚═══════════════════════════════════╝ Use for: setup, logging, initial state snapshots.
│
▼
┌───────────────────────────────────────────────────────┐
│ Execution Loop │
│ │
│ ┌─ getNextReadyNodes ◄──────────────────────────┐ │
│ │ │ │
│ ▼ │ │
│ For each ready node (in parallel): │ │
│ ╔═══════════════════════════════╗ │ │
│ ║ onBeforeNodeExecution ║ │ │
│ ╚══════════════╤═══════════════╝ │ │
│ ▼ │ │
│ handler.execute(node, ctx) │ │
│ │ │ │
│ ┌───────┴───────┐ │ │
│ (success) (error) │ │
│ │ │ │ │
│ ╔═════╧═════╗ ╔════╧════════════════════╗ │ │
│ ║ onAfter ║ ║ onNodeExecutionError ║ │ │
│ ║ Node ║ ║ ║ │ │
│ ║ Execution ║ ║ return void → observe ║ │ │
│ ╚═══════════╝ ║ return {handled:true} → ║ │ │
│ ║ error is swallowed ║ │ │
│ ╚═════════════════════════╝ │ │
│ ╔═════════════════════════╗ │ │
│ ║ onBatchComplete ║──────────────────────┘ │
│ ╚═════════════════════════╝ │
│ Loop repeats until all nodes reach a terminal state. │
└───────────────────────────────────────────────────────┘
│
▼ (success) ▼ (any error thrown)
╔══════════════════════╗ ╔══════════════════════════╗
║ onAfterExecution ║ ║ onExecutionError ║
╚══════════════════════╝ ╚══════════════════════════╝
Called once after all Called once when any
nodes complete unhandled error escapes
successfully. the loop. Can re-throw
Use for: cleanup, to convert the error
final persistence. (e.g. to ApplicationFailure).Workflow-level hooks fire once per workflow run:
import type { ExecutionPlugin, ExecutionPluginContext } from "@graph-compose/execution-kernel";
const loggingPlugin: ExecutionPlugin = {
name: "LoggingPlugin",
async onBeforeExecution(ctx: ExecutionPluginContext): Promise<void> {
console.log(`Starting workflow ${ctx.workflowInfo.workflowId}`);
},
async onBatchComplete(nodeIds: string[], ctx: ExecutionPluginContext): Promise<void> {
console.log(`Completed batch: ${nodeIds.join(", ")}`);
const state = ctx.stateManager.getWorkflowState();
await persistToDatabase(ctx.workflowInfo.workflowId, state);
},
async onAfterExecution(ctx: ExecutionPluginContext): Promise<void> {
console.log("Workflow complete");
},
async onExecutionError(error: unknown, ctx: ExecutionPluginContext): Promise<void> {
console.error("Workflow failed:", error);
},
};Per-node hooks fire for each individual node execution:
import type {
ExecutionPlugin,
NodePluginContext,
BaseRuntimeNode,
} from "@graph-compose/execution-kernel";
const nodeAuditPlugin: ExecutionPlugin = {
name: "NodeAuditPlugin",
async onBeforeNodeExecution(node: BaseRuntimeNode, ctx: NodePluginContext): Promise<void> {
console.log(`Starting node ${node.id} (type: ${node.type})`);
},
async onAfterNodeExecution(node: BaseRuntimeNode, ctx: NodePluginContext): Promise<void> {
const result = ctx.stateManager.getNodeResult(node.id);
console.log(`Node ${node.id} completed with status ${result.statusCode}`);
},
// Return void to observe the error (it continues propagating).
// Return { handled: true } to swallow it (stops propagation).
async onNodeExecutionError(
node: BaseRuntimeNode,
error: unknown,
ctx: NodePluginContext,
): Promise<void | { handled: true }> {
console.error(`Node ${node.id} failed:`, error);
// Returning void - we're just logging, not handling
},
};
orchestrator.registerPlugin(nodeAuditPlugin);Plugins are called in registration order. For onNodeExecutionError, the first plugin to return { handled: true } stops the error from propagating. Subsequent plugins still see it in observe mode.
All hooks are optional. Implement only what you need.
Strategy 2: Template Methods (inheritance)
Use this when you need to change the structural behavior of the orchestrator: how graphs are built, how nodes are scheduled, or how the execution loop itself works.
import { WorkflowOrchestrator, buildGraph } from "@graph-compose/execution-kernel";
import type { RuntimeGraph, IWorkflowStateManager } from "@graph-compose/execution-kernel";
interface CustomNode {
id: string;
type: string;
dependencies?: string[];
priority?: number;
}
class PriorityOrchestrator extends WorkflowOrchestrator<CustomNode> {
// Override graph construction to enrich nodes during build
protected buildGraph(workflow) {
return buildGraph<CustomNode>(workflow, {
multigraph: true,
transformNode: (raw) => ({ ...raw, priority: raw.priority ?? 0 }) as CustomNode,
});
}
// Override scheduling to respect priority ordering
protected getNextNodes(): CustomNode[] {
const ready = super.getNextNodes() as CustomNode[];
return ready.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
}
// Override state management to use a custom implementation
protected createStateManager(graph, options): IWorkflowStateManager {
return new MyCustomStateManager(graph, options);
}
}Available overrides:
| Method | Default Behavior | Override When... |
|--------|-----------------|------------------|
| buildGraph(workflow) | Builds a simple directed graph from dependencies | You need multigraph edges, node transforms, or filtering |
| createStateManager(graph, options) | Creates WorkflowStateManager (3-state model) | You have custom execution states or persistence needs |
| getNextNodes() | Returns all nodes with satisfied dependencies | You need priority ordering, concurrency limits, or custom scheduling |
| executeNodes(nodes) | Dispatches to handlers in parallel, runs onBatchComplete | You need sequential execution, rate limiting, etc. |
| runExecutionLoop() | while (!finished) { getNext → execute } | You need streaming, signal-driven pausing, or custom loop control |
Activities
The kernel provides two Temporal activity implementations and their type contracts.
resolveExpression
Resolves JSONata template expressions ({{ }}) in a node's URL, headers, and body against the current workflow state.
import { resolveExpression } from "@graph-compose/execution-kernel";
const output = await resolveExpression({
nodeId: "enrich",
jsonata: {
url: "https://api.example.com/users/{{results.fetch.data.id}}",
headers: { Authorization: "Bearer {{context.token}}" },
body: { name: "{{results.fetch.data.name}}" },
},
evaluationContext: currentWorkflowState,
});
// output.resolved.url => "https://api.example.com/users/42"
// output.warnings => [] (any expressions that resolved to undefined)httpCall
Executes an HTTP request using axios (optional peer dependency). Returns a NodeResult with data, statusCode, and headers.
Both activities are also available via the sub-path import @graph-compose/execution-kernel/activities.
Activity Type Contracts
If you're implementing your own activities (e.g. replacing httpCall with a custom HTTP client), implement the RuntimeActivityHandlers interface:
import type { RuntimeActivityHandlers, ActivityOptions } from "@graph-compose/execution-kernel";
const myActivities: RuntimeActivityHandlers = {
resolveExpression: async (input) => { /* ... */ },
httpCall: async (input, options?: ActivityOptions) => {
// options contains per-node retry/timeout config from node.activityConfig
// Use it to configure your HTTP client's retry behavior
const res = await myHttpClient.request({ ... });
return { data: res.body, statusCode: res.status, headers: res.headers };
},
};Per-Node Activity Options
The built-in HttpNodeHandler reads activityConfig from each node and passes it as ActivityOptions to the httpCall activity. This enables per-node retry policies and timeouts:
{
"id": "flaky_api",
"type": "http",
"dependencies": [],
"http": { "method": "GET", "url": "https://unreliable-api.com/data" },
"activityConfig": {
"retryPolicy": {
"maximumAttempts": 5,
"initialInterval": "1 second",
"backoffCoefficient": 2,
"maximumInterval": "30 seconds"
},
"startToCloseTimeout": "60 seconds"
}
}When no activityConfig is provided, the default proxy configuration is used (typically startToCloseTimeout: "30 seconds" with no retries). The ActivityOptions interface:
interface ActivityOptions {
retryPolicy?: {
backoffCoefficient?: number;
initialInterval?: string; // Duration string, e.g. "1 second"
maximumAttempts?: number;
maximumInterval?: string;
};
startToCloseTimeout?: string; // Duration string, e.g. "30 seconds"
scheduleToCloseTimeout?: string;
}Temporal Bindings
The temporal/ directory contains Temporal-specific wiring available via @graph-compose/execution-kernel/temporal.
defineWorkflowQueries
Registers Temporal query handlers on the running workflow so you can inspect execution state mid-run:
import * as wf from "@temporalio/workflow";
import { defineWorkflowQueries } from "@graph-compose/execution-kernel";
defineWorkflowQueries(wf, orchestrator.getStateManager());This registers three queries:
| Query Name | Returns | Description |
|------------|---------|-------------|
| getExecutionState | GraphWorkflowState | Context, executed node list, and all results |
| getNodeResult | NodeResult | Result for a specific node ID |
| getNodeState | node state object | Full execution state for a specific node |
Reference Workflow
temporal/workflow.ts exports runtimeWorkflow (aliased as httpWorkflow), a minimal Temporal workflow function that wires together the orchestrator, activities, and queries. The @graph-compose/runtime package uses its own version of this with additional validation; the kernel's copy serves as a reference implementation.
Type Reference
Core Types
import type {
BaseRuntimeNode, // { id: string; type: string; dependencies?: string[] }
RuntimeGraph, // graphlib Graph with typed node accessors
BaseExecutionState, // "pending" | "executed" | "executed_and_failed"
RuntimeNodeState, // { executionState, result, failureState }
} from "@graph-compose/execution-kernel";Plugin Types
import type {
NodeHandler, // { type: string; execute(node, ctx): Promise<void> }
NodeExecutionContext, // { stateManager, workflowInfo, activities }
ExecutionPlugin, // Workflow-level + per-node lifecycle hooks
ExecutionPluginContext, // { stateManager, workflowInfo }
NodePluginContext, // { stateManager, workflowInfo } - passed to per-node hooks
} from "@graph-compose/execution-kernel";Activity Types
import type {
RuntimeActivityHandlers, // { resolveExpression, httpCall }
HttpActivityInput, // input for httpCall
ActivityOptions, // per-node retry/timeout config
ExpressionResolutionInput, // input for resolveExpression
ExpressionResolutionOutput, // output from resolveExpression
ResolutionWarning, // { path, expression, error }
} from "@graph-compose/execution-kernel";Related Packages
| Package | Description |
|---------|-------------|
| @graph-compose/core | TypeScript types, Zod schemas, and validation utilities for workflow graphs |
| @graph-compose/runtime | Batteries-included HTTP workflow runtime built on this kernel |
Requirements
- Node.js 18+
- TypeScript 5+ (recommended)
- A running Temporal server (local or cloud)
License
This project is dual-licensed:
- AGPL-3.0 for open-source use. See LICENSE for details.
- Commercial License available for organizations that need an alternative to AGPL. Contact the maintainers for details.
