npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@graph-compose/execution-kernel

v1.0.0

Published

Shared execution kernel — graph building, scheduling, validation, and state primitives for workflow DAG execution

Readme

@graph-compose/execution-kernel

The execution kernel is the foundation layer for DAG-based workflow orchestration in Graph Compose. It provides the core primitives (graph building, dependency scheduling, validation, state management, and a pluggable orchestrator) that higher-level packages build on.

If you just want to run HTTP workflows, use @graph-compose/runtime instead. The runtime is a batteries-included package that wires the kernel together with sensible defaults. The kernel is for you if you need to:

  • Add custom node types beyond HTTP
  • Change how nodes are scheduled or executed
  • Plug in lifecycle hooks (logging, persistence, error conversion)
  • Build your own orchestrator on top of the kernel's primitives

Installation

npm install @graph-compose/execution-kernel

Peer Dependencies

The kernel uses Temporal for durable execution:

npm install @temporalio/workflow @temporalio/common

The built-in HTTP activity optionally uses axios:

npm install axios  # only needed if using the built-in httpCall activity

How It Works

The kernel executes a WorkflowGraph (a JSON definition of nodes and their dependencies) by repeatedly finding nodes whose dependencies are satisfied, executing them in parallel, and recording results, until every node is complete.

WorkflowGraph (JSON)
       │
       ▼
┌─────────────┐
│  Validation  │  assertNoCycles, assertDependencyTargetsExist, ...
└──────┬──────┘
       ▼
┌─────────────┐
│  buildGraph  │  WorkflowGraph → RuntimeGraph (graphlib DAG)
└──────┬──────┘
       ▼
┌──────────────────────────────────────────┐
│           Execution Loop                  │
│                                           │
│  while (!stateManager.flowIsFinished())   │
│    nodes = getNextReadyNodes(graph)       │
│    for each node:                         │
│      handler = handlers.get(node.type)    │
│      handler.execute(node, ctx)           │
│    plugins.onBatchComplete(nodeIds)       │
│                                           │
│  plugins.onAfterExecution()               │
│  return stateManager.getAllResults()       │
└──────────────────────────────────────────┘

Package Layout

src/
  orchestrator.ts       WorkflowOrchestrator, the extensible base class
  plugins.ts            NodeHandler and ExecutionPlugin interfaces
  graph.ts              buildGraph, DAG construction from WorkflowGraph
  scheduler.ts          getNextReadyNodes, topological batch scheduling
  state.ts              IWorkflowStateManager interface + base implementation
  validation.ts         Composable validators (cycles, deps, expressions)
  types.ts              Core domain types (BaseRuntimeNode, RuntimeGraph)
  handlers/             Built-in HttpNodeHandler
  temporal/             Temporal bindings (queries, reference workflow)
  activities/           Activity implementations + type contracts

Core Concepts

WorkflowOrchestrator

The central class. It validates a workflow graph, builds a DAG, then runs the execution loop dispatching nodes to registered handlers.

import { WorkflowOrchestrator } from "@graph-compose/execution-kernel";

const orchestrator = new WorkflowOrchestrator(
  workflowGraph,    // WorkflowGraph - the JSON workflow definition
  workflowInfo,     // { workflowId, orgId } - metadata for the run
  activities,       // { resolveExpression, httpCall } - Temporal activity proxies
  {
    context: { userId: "user_123" },    // variables available via {{ context.* }}
    validate: myCustomValidator,        // optional - replaces default validation
    nodeHandlers: [myCustomHandler],    // optional - additional node type handlers
    plugins: [myPlugin],                // optional - lifecycle hooks
  },
);

const result = await orchestrator.buildWorkflow();
// => { context, results: { nodeId: { data, statusCode, headers } }, workflowId }

The orchestrator ships with a built-in HttpNodeHandler registered for type: "http" nodes. You can register additional handlers for custom node types, or override the HTTP handler entirely.

Graph Building

buildGraph converts a WorkflowGraph JSON definition into a RuntimeGraph (a directed graph powered by graphlib). Edges represent dependency relationships.

import { buildGraph } from "@graph-compose/execution-kernel";

const graph = buildGraph(workflowGraph, {
  // All options are optional:
  multigraph: true,                        // enable labeled edges (for DEPENDENCY vs PROTECTS etc.)
  transformNode: (raw) => MySchema.parse(raw),  // validate/enrich nodes during construction
  nodeFilter: (node) => node.type !== "comment", // exclude certain node types
});

graph.nodes();              // ["fetch_user", "enrich", "notify"]
graph.node("fetch_user");   // the full node object
graph.inEdges("enrich");    // [{ v: "fetch_user", w: "enrich" }]

Scheduling

getNextReadyNodes is a pure function that returns all nodes whose dependencies are satisfied and that haven't been executed yet. The orchestrator calls it each iteration of the execution loop.

import { getNextReadyNodes } from "@graph-compose/execution-kernel";

const ready = getNextReadyNodes(graph, executedNodeIds, {
  // Optional filters:
  edgeTypeFilter: (edge) => edge.name === "dependency",  // for multigraphs
  nodeFilter: (node) => node.type !== "error_boundary",  // exclude certain types
});

Nodes returned in the same batch have no dependencies on each other and can execute concurrently.

Validation

The kernel provides individual validator functions that can be composed into a pipeline:

import {
  composeValidators,
  assertDependencyTargetsExist,
  assertNoCycles,
  assertExpressionsAreValid,
} from "@graph-compose/execution-kernel";

// Compose your own validation pipeline
const validate = composeValidators(
  assertDependencyTargetsExist,  // every dependency references an existing node
  assertNoCycles,                // the graph is acyclic
  assertExpressionsAreValid,     // all {{ }} expressions parse as valid JSONata
  myCustomValidator,             // your own checks (e.g. node type restrictions)
);

validate(workflowGraph);  // throws on first failure

The WorkflowValidator type is simply (workflow: WorkflowGraph) => void. Throw to reject, return to accept.

State Management

IWorkflowStateManager is the interface the orchestrator uses to track node execution state. The kernel provides a default implementation (WorkflowStateManager) for the simple three-state model (pending / executed / executed_and_failed).

import type { IWorkflowStateManager } from "@graph-compose/execution-kernel";

IWorkflowStateManager methods:

| Method | Returns | Description | |--------|---------|-------------| | getExecutedNodeIds() | string[] | IDs of all non-pending nodes | | flowIsFinished() | boolean | true when every node has reached a terminal state | | setExecutionResult(nodeId, result) | Promise<void> | Record a successful node execution | | getAllResults() | WorkflowResults & { workflowId } | Final aggregated results | | getWorkflowState() | GraphWorkflowState | Snapshot with context, executed list, and results | | getNodeResult(nodeId) | NodeResult | Result for a specific node (throws if not available) | | getNodeState(nodeId) | unknown | Full state object for a node |

To use a custom state model (e.g. with additional states like streaming or awaiting_signal), implement IWorkflowStateManager and override createStateManager on your orchestrator subclass.


Extension Model

The orchestrator supports two complementary strategies for customization.

Strategy 1: Plugins (composition, no subclassing)

Use this when you want to add new node types or hook into lifecycle events.

NodeHandler: custom node types

A NodeHandler is responsible for executing a single node type. It must:

  1. Perform the node's work
  2. Record the result via ctx.stateManager.setExecutionResult()
  3. Throw on unrecoverable failure
import type { NodeHandler, NodeExecutionContext } from "@graph-compose/execution-kernel";

interface SlackNode {
  id: string;
  type: "slack";
  dependencies?: string[];
  channel: string;
  message: string;
}

const slackHandler: NodeHandler<SlackNode> = {
  type: "slack",

  async execute(node: SlackNode, ctx: NodeExecutionContext): Promise<void> {
    // ctx.activities contains your Temporal activity proxies
    // ctx.stateManager lets you read results from upstream nodes
    // ctx.workflowInfo has { workflowId, orgId }

    const upstream = ctx.stateManager.getAllResults();
    const response = await sendSlackMessage(node.channel, node.message);

    await ctx.stateManager.setExecutionResult(node.id, {
      data: response,
      statusCode: 200,
      headers: {},
    });
  },
};

// Register at construction time
const orchestrator = new WorkflowOrchestrator(graph, info, activities, {
  nodeHandlers: [slackHandler],
});

// Or register after construction
orchestrator.registerNodeHandler(slackHandler);

ExecutionPlugin: lifecycle hooks

An ExecutionPlugin hooks into specific points in the workflow lifecycle at two levels: workflow-level (once per workflow) and per-node (once per node execution). Here's exactly when each hook fires:

buildWorkflow() called
  │
  ▼
╔═══════════════════════════════════╗
║  onBeforeExecution                ║  Called once, before the first node executes.
╚═══════════════════════════════════╝  Use for: setup, logging, initial state snapshots.
  │
  ▼
┌───────────────────────────────────────────────────────┐
│  Execution Loop                                       │
│                                                       │
│  ┌─ getNextReadyNodes ◄──────────────────────────┐   │
│  │                                                │   │
│  ▼                                                │   │
│  For each ready node (in parallel):               │   │
│    ╔═══════════════════════════════╗               │   │
│    ║  onBeforeNodeExecution       ║               │   │
│    ╚══════════════╤═══════════════╝               │   │
│                   ▼                               │   │
│         handler.execute(node, ctx)                │   │
│                   │                               │   │
│          ┌───────┴───────┐                        │   │
│     (success)        (error)                      │   │
│          │               │                        │   │
│    ╔═════╧═════╗   ╔════╧════════════════════╗    │   │
│    ║ onAfter   ║   ║ onNodeExecutionError    ║    │   │
│    ║ Node      ║   ║                         ║    │   │
│    ║ Execution ║   ║ return void → observe   ║    │   │
│    ╚═══════════╝   ║ return {handled:true} → ║    │   │
│                    ║   error is swallowed    ║    │   │
│                    ╚═════════════════════════╝    │   │
│  ╔═════════════════════════╗                      │   │
│  ║  onBatchComplete        ║──────────────────────┘   │
│  ╚═════════════════════════╝                          │
│  Loop repeats until all nodes reach a terminal state. │
└───────────────────────────────────────────────────────┘
  │
  ▼ (success)                     ▼ (any error thrown)
╔══════════════════════╗    ╔══════════════════════════╗
║  onAfterExecution    ║    ║  onExecutionError        ║
╚══════════════════════╝    ╚══════════════════════════╝
Called once after all         Called once when any
nodes complete                unhandled error escapes
successfully.                 the loop. Can re-throw
Use for: cleanup,             to convert the error
final persistence.            (e.g. to ApplicationFailure).

Workflow-level hooks fire once per workflow run:

import type { ExecutionPlugin, ExecutionPluginContext } from "@graph-compose/execution-kernel";

const loggingPlugin: ExecutionPlugin = {
  name: "LoggingPlugin",

  async onBeforeExecution(ctx: ExecutionPluginContext): Promise<void> {
    console.log(`Starting workflow ${ctx.workflowInfo.workflowId}`);
  },

  async onBatchComplete(nodeIds: string[], ctx: ExecutionPluginContext): Promise<void> {
    console.log(`Completed batch: ${nodeIds.join(", ")}`);
    const state = ctx.stateManager.getWorkflowState();
    await persistToDatabase(ctx.workflowInfo.workflowId, state);
  },

  async onAfterExecution(ctx: ExecutionPluginContext): Promise<void> {
    console.log("Workflow complete");
  },

  async onExecutionError(error: unknown, ctx: ExecutionPluginContext): Promise<void> {
    console.error("Workflow failed:", error);
  },
};

Per-node hooks fire for each individual node execution:

import type {
  ExecutionPlugin,
  NodePluginContext,
  BaseRuntimeNode,
} from "@graph-compose/execution-kernel";

const nodeAuditPlugin: ExecutionPlugin = {
  name: "NodeAuditPlugin",

  async onBeforeNodeExecution(node: BaseRuntimeNode, ctx: NodePluginContext): Promise<void> {
    console.log(`Starting node ${node.id} (type: ${node.type})`);
  },

  async onAfterNodeExecution(node: BaseRuntimeNode, ctx: NodePluginContext): Promise<void> {
    const result = ctx.stateManager.getNodeResult(node.id);
    console.log(`Node ${node.id} completed with status ${result.statusCode}`);
  },

  // Return void to observe the error (it continues propagating).
  // Return { handled: true } to swallow it (stops propagation).
  async onNodeExecutionError(
    node: BaseRuntimeNode,
    error: unknown,
    ctx: NodePluginContext,
  ): Promise<void | { handled: true }> {
    console.error(`Node ${node.id} failed:`, error);
    // Returning void - we're just logging, not handling
  },
};

orchestrator.registerPlugin(nodeAuditPlugin);

Plugins are called in registration order. For onNodeExecutionError, the first plugin to return { handled: true } stops the error from propagating. Subsequent plugins still see it in observe mode.

All hooks are optional. Implement only what you need.

Strategy 2: Template Methods (inheritance)

Use this when you need to change the structural behavior of the orchestrator: how graphs are built, how nodes are scheduled, or how the execution loop itself works.

import { WorkflowOrchestrator, buildGraph } from "@graph-compose/execution-kernel";
import type { RuntimeGraph, IWorkflowStateManager } from "@graph-compose/execution-kernel";

interface CustomNode {
  id: string;
  type: string;
  dependencies?: string[];
  priority?: number;
}

class PriorityOrchestrator extends WorkflowOrchestrator<CustomNode> {

  // Override graph construction to enrich nodes during build
  protected buildGraph(workflow) {
    return buildGraph<CustomNode>(workflow, {
      multigraph: true,
      transformNode: (raw) => ({ ...raw, priority: raw.priority ?? 0 }) as CustomNode,
    });
  }

  // Override scheduling to respect priority ordering
  protected getNextNodes(): CustomNode[] {
    const ready = super.getNextNodes() as CustomNode[];
    return ready.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
  }

  // Override state management to use a custom implementation
  protected createStateManager(graph, options): IWorkflowStateManager {
    return new MyCustomStateManager(graph, options);
  }
}

Available overrides:

| Method | Default Behavior | Override When... | |--------|-----------------|------------------| | buildGraph(workflow) | Builds a simple directed graph from dependencies | You need multigraph edges, node transforms, or filtering | | createStateManager(graph, options) | Creates WorkflowStateManager (3-state model) | You have custom execution states or persistence needs | | getNextNodes() | Returns all nodes with satisfied dependencies | You need priority ordering, concurrency limits, or custom scheduling | | executeNodes(nodes) | Dispatches to handlers in parallel, runs onBatchComplete | You need sequential execution, rate limiting, etc. | | runExecutionLoop() | while (!finished) { getNext → execute } | You need streaming, signal-driven pausing, or custom loop control |


Activities

The kernel provides two Temporal activity implementations and their type contracts.

resolveExpression

Resolves JSONata template expressions ({{ }}) in a node's URL, headers, and body against the current workflow state.

import { resolveExpression } from "@graph-compose/execution-kernel";

const output = await resolveExpression({
  nodeId: "enrich",
  jsonata: {
    url: "https://api.example.com/users/{{results.fetch.data.id}}",
    headers: { Authorization: "Bearer {{context.token}}" },
    body: { name: "{{results.fetch.data.name}}" },
  },
  evaluationContext: currentWorkflowState,
});
// output.resolved.url => "https://api.example.com/users/42"
// output.warnings => [] (any expressions that resolved to undefined)

httpCall

Executes an HTTP request using axios (optional peer dependency). Returns a NodeResult with data, statusCode, and headers.

Both activities are also available via the sub-path import @graph-compose/execution-kernel/activities.

Activity Type Contracts

If you're implementing your own activities (e.g. replacing httpCall with a custom HTTP client), implement the RuntimeActivityHandlers interface:

import type { RuntimeActivityHandlers, ActivityOptions } from "@graph-compose/execution-kernel";

const myActivities: RuntimeActivityHandlers = {
  resolveExpression: async (input) => { /* ... */ },
  httpCall: async (input, options?: ActivityOptions) => {
    // options contains per-node retry/timeout config from node.activityConfig
    // Use it to configure your HTTP client's retry behavior
    const res = await myHttpClient.request({ ... });
    return { data: res.body, statusCode: res.status, headers: res.headers };
  },
};

Per-Node Activity Options

The built-in HttpNodeHandler reads activityConfig from each node and passes it as ActivityOptions to the httpCall activity. This enables per-node retry policies and timeouts:

{
  "id": "flaky_api",
  "type": "http",
  "dependencies": [],
  "http": { "method": "GET", "url": "https://unreliable-api.com/data" },
  "activityConfig": {
    "retryPolicy": {
      "maximumAttempts": 5,
      "initialInterval": "1 second",
      "backoffCoefficient": 2,
      "maximumInterval": "30 seconds"
    },
    "startToCloseTimeout": "60 seconds"
  }
}

When no activityConfig is provided, the default proxy configuration is used (typically startToCloseTimeout: "30 seconds" with no retries). The ActivityOptions interface:

interface ActivityOptions {
  retryPolicy?: {
    backoffCoefficient?: number;
    initialInterval?: string;     // Duration string, e.g. "1 second"
    maximumAttempts?: number;
    maximumInterval?: string;
  };
  startToCloseTimeout?: string;     // Duration string, e.g. "30 seconds"
  scheduleToCloseTimeout?: string;
}

Temporal Bindings

The temporal/ directory contains Temporal-specific wiring available via @graph-compose/execution-kernel/temporal.

defineWorkflowQueries

Registers Temporal query handlers on the running workflow so you can inspect execution state mid-run:

import * as wf from "@temporalio/workflow";
import { defineWorkflowQueries } from "@graph-compose/execution-kernel";

defineWorkflowQueries(wf, orchestrator.getStateManager());

This registers three queries:

| Query Name | Returns | Description | |------------|---------|-------------| | getExecutionState | GraphWorkflowState | Context, executed node list, and all results | | getNodeResult | NodeResult | Result for a specific node ID | | getNodeState | node state object | Full execution state for a specific node |

Reference Workflow

temporal/workflow.ts exports runtimeWorkflow (aliased as httpWorkflow), a minimal Temporal workflow function that wires together the orchestrator, activities, and queries. The @graph-compose/runtime package uses its own version of this with additional validation; the kernel's copy serves as a reference implementation.


Type Reference

Core Types

import type {
  BaseRuntimeNode,     // { id: string; type: string; dependencies?: string[] }
  RuntimeGraph,        // graphlib Graph with typed node accessors
  BaseExecutionState,  // "pending" | "executed" | "executed_and_failed"
  RuntimeNodeState,    // { executionState, result, failureState }
} from "@graph-compose/execution-kernel";

Plugin Types

import type {
  NodeHandler,            // { type: string; execute(node, ctx): Promise<void> }
  NodeExecutionContext,   // { stateManager, workflowInfo, activities }
  ExecutionPlugin,        // Workflow-level + per-node lifecycle hooks
  ExecutionPluginContext, // { stateManager, workflowInfo }
  NodePluginContext,      // { stateManager, workflowInfo } - passed to per-node hooks
} from "@graph-compose/execution-kernel";

Activity Types

import type {
  RuntimeActivityHandlers,     // { resolveExpression, httpCall }
  HttpActivityInput,           // input for httpCall
  ActivityOptions,             // per-node retry/timeout config
  ExpressionResolutionInput,   // input for resolveExpression
  ExpressionResolutionOutput,  // output from resolveExpression
  ResolutionWarning,           // { path, expression, error }
} from "@graph-compose/execution-kernel";

Related Packages

| Package | Description | |---------|-------------| | @graph-compose/core | TypeScript types, Zod schemas, and validation utilities for workflow graphs | | @graph-compose/runtime | Batteries-included HTTP workflow runtime built on this kernel |

Requirements

  • Node.js 18+
  • TypeScript 5+ (recommended)
  • A running Temporal server (local or cloud)

License

This project is dual-licensed:

  • AGPL-3.0 for open-source use. See LICENSE for details.
  • Commercial License available for organizations that need an alternative to AGPL. Contact the maintainers for details.