@magic-marker/nurt
v0.2.0
Published
A type-safe, zero-dependency DAG execution engine for TypeScript
Readme
Nurt
[!CAUTION] This documentation and library is mostly Claude generated. It does have plenty of tests and is used in minimal scope but further cleanup is required at some point.
A type-safe, zero-dependency DAG flow execution engine for TypeScript. Nurt lets you define directed acyclic graphs of async steps, execute them with automatic parallelism, and observe progress in real time. It supports dynamic step spawning, nested subgraphs, cross-boundary dependencies, and fine-grained error handling.
The name comes from the Polish word for "flow" (nurt).
Install
npm install @marker/nurtQuick Start
import { flow } from "@marker/nurt";
const result = await flow("my-flow")
.step("fetch", async () => {
const data = await fetchData();
return { items: data };
})
.step("process", ["fetch"], async (input) => {
// input.fetch is typed as { items: ... }
return { count: input.fetch.items.length };
})
.step("save", ["process"], async (input) => {
await saveResult(input.process.count);
return { saved: true };
})
.build()
.run().result;
console.log(result.status); // "success"Core Concepts
Flow
An immutable DAG blueprint created by the builder. Defines steps, their dependencies, and groups. Validated at build time (cycle detection, parent existence). Can spawn multiple concurrent runs.
FlowRun
A single execution of a flow. Tracks step statuses, outputs, and timing. Provides hooks for observability and a snapshot() method for serialization.
Steps
Async functions that receive their parents' outputs as typed input. Steps execute as soon as all their dependencies complete. Multiple independent steps run in parallel automatically.
Groups
Typed collection points for dynamically-added members. An arbiter step can decide at runtime which members to spawn into a group. Downstream steps wait for all group members to complete.
Subgraphs
Group members can be entire nested flows (DAGs within DAGs). A subgraph member contains its own Flow that executes as a child FlowRun. Subgraphs can reference steps outside their boundary via externalDeps.
API Reference
flow(name)
Creates a new FlowBuilder.
import { flow } from "@marker/nurt";
const builder = flow("my-flow");FlowBuilder
Chainable builder. Each .step() call returns a new builder with an expanded type registry.
.step(name, handler) - Root step
.step("start", async () => {
return { documentId: "doc-1" };
}).step(name, parents, handler) - Step with dependencies
.step("process", ["start"], async (input) => {
// input.start is typed from the "start" step's return type
return { processed: true };
}).step(name, parents, options) - Step with options
.step("save", ["process"], {
execute: async (input) => ({ saved: true }),
terminal: true, // determines run success/failure
allowFailures: true, // runs even if parents fail
transform: (raw) => transform(raw), // transform parent outputs
}).step(name, parents, inputMapper, handler | options) - Step with typed input mapping
Cherry-pick or restructure parent outputs with full type inference before passing to the handler.
.step("summarize", ["userStep", "configStep"],
(input) => ({
name: input.userStep.name,
maxRetries: input.configStep.maxRetries,
}),
async (mapped) => {
// mapped is typed as { name: string; maxRetries: number }
return `${mapped.name}: ${mapped.maxRetries} retries`;
},
)The input mapper can also be paired with an options object instead of a plain handler:
.step("save", ["process"],
(input) => input.process.data,
{
execute: async (data) => ({ saved: true }),
terminal: true,
allowFailures: true,
},
).group<T>(name, options) - Declare a group
.group<ReviewOutput>("reviews", { dependsOn: ["arbiter"] })The group adds T[] to the type registry. Downstream steps receive an array of member outputs.
.step(name, [group(...)], handler) - Depend on a group
import { group } from "@marker/nurt";
.step("merge", [group("reviews")], async (input) => {
// input.reviews is ReviewOutput[]
return { total: input.reviews.length };
}).build() - Create the Flow
Validates the DAG (cycle detection, parent existence) and returns an immutable Flow.
Flow
const myFlow = flow("example").step(...).build();
// Create a run
const run = myFlow.run();
const run2 = myFlow.run({ failFast: true }); // multiple concurrent runs OKRunOptions
interface RunOptions {
failFast?: boolean; // abort entire run on first error (default: false)
hooks?: FlowHooks; // lifecycle callbacks
injectedSteps?: Map<string, unknown>; // pre-resolved step outputs
}FlowRun
The execution instance. Created by flow.run().
const run = myFlow.run({ hooks: { ... } });
run.runId; // "run-1" (unique per run)
run.result; // Promise<FlowRunResult> - resolves when done
run.steps; // readonly StepRecord[] - current state snapshot
run.snapshot(); // FlowSnapshot - JSON-serializable state
run.abort(); // signal all steps to stop
// Dynamic group control
run.spawnGroup("reviews", members); // add members + auto-seal
run.addGroupMember("reviews", member); // add one member
run.sealGroup("reviews"); // seal (no more members)StepContext
Available inside every step handler as the second argument.
.step("my-step", async (input, ctx) => {
ctx.runId; // current run ID
ctx.signal; // AbortSignal (check ctx.signal.aborted)
ctx.history; // shared History store
ctx.run; // RunHandle for dynamic control
ctx.history.set("key", "value");
ctx.history.get<string>("key"); // "value"
// Spawn group members from inside a step
ctx.run.spawnGroup("reviews", [
{ name: "grammar", execute: async () => ({ ... }) },
]);
})FlowHooks
const run = myFlow.run({
hooks: {
onChange: () => {
// fires on ANY state change (including subgraph events)
updateUI(run.snapshot());
},
onStepStart: (step) => console.log(`started: ${step.name}`),
onStepComplete: (step) =>
console.log(`done: ${step.name} in ${step.durationMs}ms`),
onStepError: (step) => console.log(`failed: ${step.name}: ${step.error}`),
onStepAdded: (step) => console.log(`dynamic step: ${step.name}`),
onRunComplete: (result) => console.log(`run ${result.status}`),
},
});Hooks are isolated from execution -- if a hook throws, the run continues unaffected.
FlowRunResult
const result = await run.result;
result.runId; // "run-1"
result.status; // "success" | "error"
result.startedAt; // timestamp
result.completedAt; // timestamp
result.steps; // StepRecord[] with status, output, timing
result.history; // ReadonlyMap<string, unknown>StepRecord
interface StepRecord {
name: string;
parentNames: string[];
status: "pending" | "running" | "success" | "error" | "skipped";
startedAt?: number;
completedAt?: number;
durationMs?: number;
output?: unknown;
error?: string;
}Patterns
Linear Pipeline
const result = await flow("pipeline")
.step("extract", async () => ({ text: "hello world" }))
.step("transform", ["extract"], async (input) => ({
upper: input.extract.text.toUpperCase(),
}))
.step("load", ["transform"], async (input) => ({
saved: true,
text: input.transform.upper,
}))
.build()
.run().result;Parallel Fan-Out / Fan-In
Steps with the same parent run in parallel automatically.
const result = await flow("parallel")
.step("start", async () => ({ data: [1, 2, 3] }))
.step("branch-a", ["start"], async (input) => ({
sum: input.start.data.reduce((a, b) => a + b, 0),
}))
.step("branch-b", ["start"], async (input) => ({
count: input.start.data.length,
}))
.step("merge", ["branch-a", "branch-b"], async (input) => ({
average: input["branch-a"].sum / input["branch-b"].count,
}))
.build()
.run().result;
// branch-a and branch-b execute concurrentlyInput Mapping
Use the 4-argument form of .step() to cherry-pick values and restructure parent outputs before they reach the handler. The input mapper receives fully-typed parent outputs and returns a custom shape that becomes the handler's input.
const result = await flow("mapped")
.step("user", async () => ({
name: "Alice",
age: 30,
preferences: { theme: "dark" },
}))
.step("config", async () => ({
maxRetries: 3,
timeout: 5000,
debug: false,
}))
// Cherry-pick only what's needed from multiple parents
.step(
"greeting",
["user", "config"],
(input) => ({
name: input.user.name,
theme: input.user.preferences.theme,
timeout: input.config.timeout,
}),
async (mapped) => {
// mapped is { name: string; theme: string; timeout: number }
return `Hello ${mapped.name} (${mapped.theme} theme, ${mapped.timeout}ms timeout)`;
},
)
.build()
.run().result;This is especially useful when parent outputs are large or deeply nested and your step only needs a few values. Unlike the transform field in the options object (which is untyped), the positional input mapper provides full type inference on both sides.
Dynamic Group Spawning (Arbiter Pattern)
An arbiter step decides at runtime which members to add to a group.
type ReviewOutput = { tool: string; comments: string[] };
const reviewFlow = flow("review")
.step("start", async () => ({ wordCount: 1200 }))
.step("arbiter", ["start"], async (input, ctx) => {
const tools =
input.start.wordCount > 500
? ["grammar", "tone", "clarity"]
: ["grammar"];
ctx.run.spawnGroup(
"reviews",
tools.map((tool) => ({
name: `review-${tool}`,
execute: async () => ({
tool,
comments: [`Found issue in ${tool}`],
}),
})),
);
return { selectedTools: tools };
})
.group<ReviewOutput>("reviews", { dependsOn: ["arbiter"] })
.step("synthesize", [group("reviews")], async (input) => ({
total: input.reviews.flatMap((r) => r.comments).length,
}))
.build();
const result = await reviewFlow.run().result;Subgraph Members (Nested DAGs)
A group member can contain an entire flow with branching and parallelism.
const analysisFlow = flow("deep-analysis")
.step("extract", async () => ({ claims: ["A", "B"] }))
.step("verify", ["extract"], async (input) => ({
verified: input.extract.claims.length,
}))
.step("check-dates", ["extract"], async () => ({
issues: 0,
}))
.step("report", ["verify", "check-dates"], {
execute: async (input) => ({
result: `${input.verify.verified} verified, ${input["check-dates"].issues} date issues`,
}),
terminal: true,
})
.build();
// DAG: extract -> [verify, check-dates] -> report
run.spawnGroup("reviews", [
{ name: "grammar", execute: async () => ({ ... }) }, // single member
{ name: "deep-analysis", flow: analysisFlow }, // subgraph member
]);Pipeline Helper
Shorthand for linear subgraphs.
import { pipeline } from "@marker/nurt";
run.spawnGroup("reviews", [
pipeline("tone-check", [
{ name: "detect", execute: async () => ({ issues: ["too formal"] }) },
{ name: "classify", execute: async (input) => ({ severity: "medium" }) },
{
name: "suggest",
execute: async (input) => ({ fix: "use simpler words" }),
},
]),
]);
// Creates: detect -> classify -> suggest (terminal)Cross-Boundary Dependencies
A subgraph step can depend on a step outside the subgraph via externalDeps.
const clarityFlow = flow("clarity")
.step("extract", async () => ({ issues: ["vague intro"] }))
.step("nlp-data", async () => ({})) // placeholder for external injection
.step("assess", ["extract"], async (input) => ({ ... }))
.step("cross-ref", ["nlp-data"], async (input) => ({
// input["nlp-data"] will contain the NLP step's output
refs: input["nlp-data"].entities.length,
}))
.step("refine", ["assess", "cross-ref"], {
execute: async (input) => ({ ... }),
terminal: true,
})
.build();
// In the parent flow, nlp-process runs at the top level
// externalDeps maps the subgraph's "nlp-data" step to the parent's "nlp-process" step
run.spawnGroup("reviews", [
{
name: "clarity",
flow: clarityFlow,
externalDeps: { "nlp-data": "nlp-process" },
},
]);
// The subgraph waits for "nlp-process" to complete, then injects its output
// as the pre-resolved "nlp-data" step inside the child runError Handling with allowFailures
By default, if a step fails, its dependents are skipped. With allowFailures: true, a step runs even if parents failed, receiving StepResult<T> wrappers.
import type { StepResult } from "@marker/nurt";
const result = await flow("resilient")
.step("risky", async () => {
throw new Error("network timeout");
})
.step("handler", ["risky"], {
allowFailures: true,
execute: async (input) => {
// input.risky is StepResult<T>, not T
const result = input.risky as StepResult<unknown>;
if (result.status === "error") {
return { fallback: true, error: result.error };
}
return { fallback: false, value: result.value };
},
})
.build()
.run().result;
// result.steps[0].status = "error"
// result.steps[1].status = "success" (ran despite parent failure)Terminal Steps
Terminal steps determine the run's final status. If no steps are marked terminal, all steps are considered.
const result = await flow("with-terminal")
.step("main", async () => ({ data: "ok" }))
.step("save", ["main"], {
execute: async () => ({ saved: true }),
terminal: true,
})
.step("notify", async () => {
throw new Error("email service down");
})
.build()
.run().result;
// result.status = "success"
// Only "save" (terminal) determines status. "notify" failed but doesn't affect it.Shared State via History
Steps can share data through the ctx.history store, accessible across all steps in a run.
const result = await flow("with-history")
.step("producer", async (_, ctx) => {
ctx.history.set("config", { maxRetries: 3 });
return { produced: true };
})
.step("consumer", ["producer"], async (_, ctx) => {
const config = ctx.history.get<{ maxRetries: number }>("config");
return { retries: config?.maxRetries };
})
.build()
.run().result;
// result.history.get("config") = { maxRetries: 3 }Snapshots for Serialization
run.snapshot() returns a JSON-serializable representation of the entire flow state, including nested subgraphs. Useful for sending state to a frontend for visualization.
const run = myFlow.run({
hooks: {
onChange: () => {
const snapshot = run.snapshot();
// snapshot.flow.name, snapshot.flow.steps, snapshot.flow.groups
// snapshot.run.status, snapshot.run.runId
// Each step has: name, status, output, durationMs, error, parentNames
// Groups have: members with type, status, subgraph (recursive FlowSnapshot)
sendToFrontend(JSON.stringify(snapshot));
},
},
});Executable Classes
Steps can be class instances implementing the Executable interface.
import type { Executable, StepContext } from "@marker/nurt";
class MyTool implements Executable<{ data: string }, { result: number }> {
async execute(
input: { data: string },
ctx: StepContext,
): Promise<{ result: number }> {
return { result: input.data.length };
}
}
flow("with-class")
.step("start", async () => ({ data: "hello" }))
.step("tool", ["start"], new MyTool())
.build();Error Types
| Error | When |
| -------------------- | --------------------------------------------------------------------- |
| CycleDetectedError | .build() detects a cycle in the DAG |
| DuplicateStepError | .step() or .group() uses an already-registered name |
| UnknownParentError | .step() references a parent that doesn't exist |
| UnsealedGroupError | Run completes with a group that was never sealed |
| UnfilledSlotError | A pipeline slot was not provided an implementation |
| StepExecutionError | Wraps an error thrown by a step handler. Has .stepName and .cause |
Graph Utilities
Low-level DAG utilities, useful for custom tooling or analysis.
import {
validateAcyclic,
topologicalSort,
getReadySteps,
getSkippableSteps,
getReadyWithFailures,
} from "@marker/nurt";
const nodes = [
{ name: "a", parentNames: [] },
{ name: "b", parentNames: ["a"] },
{ name: "c", parentNames: ["a"] },
{ name: "d", parentNames: ["b", "c"] },
];
validateAcyclic(nodes); // throws CycleDetectedError if cyclic
topologicalSort(nodes); // ["a", "b", "c", "d"]
const statuses = new Map([
["a", "success"],
["b", "success"],
["c", "running"],
["d", "pending"],
]);
getReadySteps(nodes, statuses); // [] (c still running, d waits)License
MIT
