npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@magic-marker/nurt

v0.2.0

Published

A type-safe, zero-dependency DAG execution engine for TypeScript

Readme

Nurt

[!CAUTION] This documentation and library is mostly Claude generated. It does have plenty of tests and is used in minimal scope but further cleanup is required at some point.

A type-safe, zero-dependency DAG flow execution engine for TypeScript. Nurt lets you define directed acyclic graphs of async steps, execute them with automatic parallelism, and observe progress in real time. It supports dynamic step spawning, nested subgraphs, cross-boundary dependencies, and fine-grained error handling.

The name comes from the Polish word for "flow" (nurt).

Install

npm install @marker/nurt

Quick Start

import { flow } from "@marker/nurt";

const result = await flow("my-flow")
  .step("fetch", async () => {
    const data = await fetchData();
    return { items: data };
  })
  .step("process", ["fetch"], async (input) => {
    // input.fetch is typed as { items: ... }
    return { count: input.fetch.items.length };
  })
  .step("save", ["process"], async (input) => {
    await saveResult(input.process.count);
    return { saved: true };
  })
  .build()
  .run().result;

console.log(result.status); // "success"

Core Concepts

Flow

An immutable DAG blueprint created by the builder. Defines steps, their dependencies, and groups. Validated at build time (cycle detection, parent existence). Can spawn multiple concurrent runs.

FlowRun

A single execution of a flow. Tracks step statuses, outputs, and timing. Provides hooks for observability and a snapshot() method for serialization.

Steps

Async functions that receive their parents' outputs as typed input. Steps execute as soon as all their dependencies complete. Multiple independent steps run in parallel automatically.

Groups

Typed collection points for dynamically-added members. An arbiter step can decide at runtime which members to spawn into a group. Downstream steps wait for all group members to complete.

Subgraphs

Group members can be entire nested flows (DAGs within DAGs). A subgraph member contains its own Flow that executes as a child FlowRun. Subgraphs can reference steps outside their boundary via externalDeps.

API Reference

flow(name)

Creates a new FlowBuilder.

import { flow } from "@marker/nurt";

const builder = flow("my-flow");

FlowBuilder

Chainable builder. Each .step() call returns a new builder with an expanded type registry.

.step(name, handler) - Root step

.step("start", async () => {
  return { documentId: "doc-1" };
})

.step(name, parents, handler) - Step with dependencies

.step("process", ["start"], async (input) => {
  // input.start is typed from the "start" step's return type
  return { processed: true };
})

.step(name, parents, options) - Step with options

.step("save", ["process"], {
  execute: async (input) => ({ saved: true }),
  terminal: true,        // determines run success/failure
  allowFailures: true,   // runs even if parents fail
  transform: (raw) => transform(raw), // transform parent outputs
})

.step(name, parents, inputMapper, handler | options) - Step with typed input mapping

Cherry-pick or restructure parent outputs with full type inference before passing to the handler.

.step("summarize", ["userStep", "configStep"], 
  (input) => ({
    name: input.userStep.name,
    maxRetries: input.configStep.maxRetries,
  }),
  async (mapped) => {
    // mapped is typed as { name: string; maxRetries: number }
    return `${mapped.name}: ${mapped.maxRetries} retries`;
  },
)

The input mapper can also be paired with an options object instead of a plain handler:

.step("save", ["process"],
  (input) => input.process.data,
  {
    execute: async (data) => ({ saved: true }),
    terminal: true,
    allowFailures: true,
  },
)

.group<T>(name, options) - Declare a group

.group<ReviewOutput>("reviews", { dependsOn: ["arbiter"] })

The group adds T[] to the type registry. Downstream steps receive an array of member outputs.

.step(name, [group(...)], handler) - Depend on a group

import { group } from "@marker/nurt";

.step("merge", [group("reviews")], async (input) => {
  // input.reviews is ReviewOutput[]
  return { total: input.reviews.length };
})

.build() - Create the Flow

Validates the DAG (cycle detection, parent existence) and returns an immutable Flow.

Flow

const myFlow = flow("example").step(...).build();

// Create a run
const run = myFlow.run();
const run2 = myFlow.run({ failFast: true }); // multiple concurrent runs OK

RunOptions

interface RunOptions {
  failFast?: boolean; // abort entire run on first error (default: false)
  hooks?: FlowHooks; // lifecycle callbacks
  injectedSteps?: Map<string, unknown>; // pre-resolved step outputs
}

FlowRun

The execution instance. Created by flow.run().

const run = myFlow.run({ hooks: { ... } });

run.runId;              // "run-1" (unique per run)
run.result;             // Promise<FlowRunResult> - resolves when done
run.steps;              // readonly StepRecord[] - current state snapshot

run.snapshot();         // FlowSnapshot - JSON-serializable state
run.abort();            // signal all steps to stop

// Dynamic group control
run.spawnGroup("reviews", members);  // add members + auto-seal
run.addGroupMember("reviews", member); // add one member
run.sealGroup("reviews");             // seal (no more members)

StepContext

Available inside every step handler as the second argument.

.step("my-step", async (input, ctx) => {
  ctx.runId;           // current run ID
  ctx.signal;          // AbortSignal (check ctx.signal.aborted)
  ctx.history;         // shared History store
  ctx.run;             // RunHandle for dynamic control

  ctx.history.set("key", "value");
  ctx.history.get<string>("key"); // "value"

  // Spawn group members from inside a step
  ctx.run.spawnGroup("reviews", [
    { name: "grammar", execute: async () => ({ ... }) },
  ]);
})

FlowHooks

const run = myFlow.run({
  hooks: {
    onChange: () => {
      // fires on ANY state change (including subgraph events)
      updateUI(run.snapshot());
    },
    onStepStart: (step) => console.log(`started: ${step.name}`),
    onStepComplete: (step) =>
      console.log(`done: ${step.name} in ${step.durationMs}ms`),
    onStepError: (step) => console.log(`failed: ${step.name}: ${step.error}`),
    onStepAdded: (step) => console.log(`dynamic step: ${step.name}`),
    onRunComplete: (result) => console.log(`run ${result.status}`),
  },
});

Hooks are isolated from execution -- if a hook throws, the run continues unaffected.

FlowRunResult

const result = await run.result;

result.runId; // "run-1"
result.status; // "success" | "error"
result.startedAt; // timestamp
result.completedAt; // timestamp
result.steps; // StepRecord[] with status, output, timing
result.history; // ReadonlyMap<string, unknown>

StepRecord

interface StepRecord {
  name: string;
  parentNames: string[];
  status: "pending" | "running" | "success" | "error" | "skipped";
  startedAt?: number;
  completedAt?: number;
  durationMs?: number;
  output?: unknown;
  error?: string;
}

Patterns

Linear Pipeline

const result = await flow("pipeline")
  .step("extract", async () => ({ text: "hello world" }))
  .step("transform", ["extract"], async (input) => ({
    upper: input.extract.text.toUpperCase(),
  }))
  .step("load", ["transform"], async (input) => ({
    saved: true,
    text: input.transform.upper,
  }))
  .build()
  .run().result;

Parallel Fan-Out / Fan-In

Steps with the same parent run in parallel automatically.

const result = await flow("parallel")
  .step("start", async () => ({ data: [1, 2, 3] }))
  .step("branch-a", ["start"], async (input) => ({
    sum: input.start.data.reduce((a, b) => a + b, 0),
  }))
  .step("branch-b", ["start"], async (input) => ({
    count: input.start.data.length,
  }))
  .step("merge", ["branch-a", "branch-b"], async (input) => ({
    average: input["branch-a"].sum / input["branch-b"].count,
  }))
  .build()
  .run().result;
// branch-a and branch-b execute concurrently

Input Mapping

Use the 4-argument form of .step() to cherry-pick values and restructure parent outputs before they reach the handler. The input mapper receives fully-typed parent outputs and returns a custom shape that becomes the handler's input.

const result = await flow("mapped")
  .step("user", async () => ({
    name: "Alice",
    age: 30,
    preferences: { theme: "dark" },
  }))
  .step("config", async () => ({
    maxRetries: 3,
    timeout: 5000,
    debug: false,
  }))
  // Cherry-pick only what's needed from multiple parents
  .step(
    "greeting",
    ["user", "config"],
    (input) => ({
      name: input.user.name,
      theme: input.user.preferences.theme,
      timeout: input.config.timeout,
    }),
    async (mapped) => {
      // mapped is { name: string; theme: string; timeout: number }
      return `Hello ${mapped.name} (${mapped.theme} theme, ${mapped.timeout}ms timeout)`;
    },
  )
  .build()
  .run().result;

This is especially useful when parent outputs are large or deeply nested and your step only needs a few values. Unlike the transform field in the options object (which is untyped), the positional input mapper provides full type inference on both sides.

Dynamic Group Spawning (Arbiter Pattern)

An arbiter step decides at runtime which members to add to a group.

type ReviewOutput = { tool: string; comments: string[] };

const reviewFlow = flow("review")
  .step("start", async () => ({ wordCount: 1200 }))
  .step("arbiter", ["start"], async (input, ctx) => {
    const tools =
      input.start.wordCount > 500
        ? ["grammar", "tone", "clarity"]
        : ["grammar"];

    ctx.run.spawnGroup(
      "reviews",
      tools.map((tool) => ({
        name: `review-${tool}`,
        execute: async () => ({
          tool,
          comments: [`Found issue in ${tool}`],
        }),
      })),
    );

    return { selectedTools: tools };
  })
  .group<ReviewOutput>("reviews", { dependsOn: ["arbiter"] })
  .step("synthesize", [group("reviews")], async (input) => ({
    total: input.reviews.flatMap((r) => r.comments).length,
  }))
  .build();

const result = await reviewFlow.run().result;

Subgraph Members (Nested DAGs)

A group member can contain an entire flow with branching and parallelism.

const analysisFlow = flow("deep-analysis")
  .step("extract", async () => ({ claims: ["A", "B"] }))
  .step("verify", ["extract"], async (input) => ({
    verified: input.extract.claims.length,
  }))
  .step("check-dates", ["extract"], async () => ({
    issues: 0,
  }))
  .step("report", ["verify", "check-dates"], {
    execute: async (input) => ({
      result: `${input.verify.verified} verified, ${input["check-dates"].issues} date issues`,
    }),
    terminal: true,
  })
  .build();
// DAG: extract -> [verify, check-dates] -> report

run.spawnGroup("reviews", [
  { name: "grammar", execute: async () => ({ ... }) },      // single member
  { name: "deep-analysis", flow: analysisFlow },              // subgraph member
]);

Pipeline Helper

Shorthand for linear subgraphs.

import { pipeline } from "@marker/nurt";

run.spawnGroup("reviews", [
  pipeline("tone-check", [
    { name: "detect", execute: async () => ({ issues: ["too formal"] }) },
    { name: "classify", execute: async (input) => ({ severity: "medium" }) },
    {
      name: "suggest",
      execute: async (input) => ({ fix: "use simpler words" }),
    },
  ]),
]);
// Creates: detect -> classify -> suggest (terminal)

Cross-Boundary Dependencies

A subgraph step can depend on a step outside the subgraph via externalDeps.

const clarityFlow = flow("clarity")
  .step("extract", async () => ({ issues: ["vague intro"] }))
  .step("nlp-data", async () => ({}))  // placeholder for external injection
  .step("assess", ["extract"], async (input) => ({ ... }))
  .step("cross-ref", ["nlp-data"], async (input) => ({
    // input["nlp-data"] will contain the NLP step's output
    refs: input["nlp-data"].entities.length,
  }))
  .step("refine", ["assess", "cross-ref"], {
    execute: async (input) => ({ ... }),
    terminal: true,
  })
  .build();

// In the parent flow, nlp-process runs at the top level
// externalDeps maps the subgraph's "nlp-data" step to the parent's "nlp-process" step
run.spawnGroup("reviews", [
  {
    name: "clarity",
    flow: clarityFlow,
    externalDeps: { "nlp-data": "nlp-process" },
  },
]);
// The subgraph waits for "nlp-process" to complete, then injects its output
// as the pre-resolved "nlp-data" step inside the child run

Error Handling with allowFailures

By default, if a step fails, its dependents are skipped. With allowFailures: true, a step runs even if parents failed, receiving StepResult<T> wrappers.

import type { StepResult } from "@marker/nurt";

const result = await flow("resilient")
  .step("risky", async () => {
    throw new Error("network timeout");
  })
  .step("handler", ["risky"], {
    allowFailures: true,
    execute: async (input) => {
      // input.risky is StepResult<T>, not T
      const result = input.risky as StepResult<unknown>;
      if (result.status === "error") {
        return { fallback: true, error: result.error };
      }
      return { fallback: false, value: result.value };
    },
  })
  .build()
  .run().result;

// result.steps[0].status = "error"
// result.steps[1].status = "success" (ran despite parent failure)

Terminal Steps

Terminal steps determine the run's final status. If no steps are marked terminal, all steps are considered.

const result = await flow("with-terminal")
  .step("main", async () => ({ data: "ok" }))
  .step("save", ["main"], {
    execute: async () => ({ saved: true }),
    terminal: true,
  })
  .step("notify", async () => {
    throw new Error("email service down");
  })
  .build()
  .run().result;

// result.status = "success"
// Only "save" (terminal) determines status. "notify" failed but doesn't affect it.

Shared State via History

Steps can share data through the ctx.history store, accessible across all steps in a run.

const result = await flow("with-history")
  .step("producer", async (_, ctx) => {
    ctx.history.set("config", { maxRetries: 3 });
    return { produced: true };
  })
  .step("consumer", ["producer"], async (_, ctx) => {
    const config = ctx.history.get<{ maxRetries: number }>("config");
    return { retries: config?.maxRetries };
  })
  .build()
  .run().result;

// result.history.get("config") = { maxRetries: 3 }

Snapshots for Serialization

run.snapshot() returns a JSON-serializable representation of the entire flow state, including nested subgraphs. Useful for sending state to a frontend for visualization.

const run = myFlow.run({
  hooks: {
    onChange: () => {
      const snapshot = run.snapshot();
      // snapshot.flow.name, snapshot.flow.steps, snapshot.flow.groups
      // snapshot.run.status, snapshot.run.runId
      // Each step has: name, status, output, durationMs, error, parentNames
      // Groups have: members with type, status, subgraph (recursive FlowSnapshot)
      sendToFrontend(JSON.stringify(snapshot));
    },
  },
});

Executable Classes

Steps can be class instances implementing the Executable interface.

import type { Executable, StepContext } from "@marker/nurt";

class MyTool implements Executable<{ data: string }, { result: number }> {
  async execute(
    input: { data: string },
    ctx: StepContext,
  ): Promise<{ result: number }> {
    return { result: input.data.length };
  }
}

flow("with-class")
  .step("start", async () => ({ data: "hello" }))
  .step("tool", ["start"], new MyTool())
  .build();

Error Types

| Error | When | | -------------------- | --------------------------------------------------------------------- | | CycleDetectedError | .build() detects a cycle in the DAG | | DuplicateStepError | .step() or .group() uses an already-registered name | | UnknownParentError | .step() references a parent that doesn't exist | | UnsealedGroupError | Run completes with a group that was never sealed | | UnfilledSlotError | A pipeline slot was not provided an implementation | | StepExecutionError | Wraps an error thrown by a step handler. Has .stepName and .cause |

Graph Utilities

Low-level DAG utilities, useful for custom tooling or analysis.

import {
  validateAcyclic,
  topologicalSort,
  getReadySteps,
  getSkippableSteps,
  getReadyWithFailures,
} from "@marker/nurt";

const nodes = [
  { name: "a", parentNames: [] },
  { name: "b", parentNames: ["a"] },
  { name: "c", parentNames: ["a"] },
  { name: "d", parentNames: ["b", "c"] },
];

validateAcyclic(nodes); // throws CycleDetectedError if cyclic
topologicalSort(nodes); // ["a", "b", "c", "d"]

const statuses = new Map([
  ["a", "success"],
  ["b", "success"],
  ["c", "running"],
  ["d", "pending"],
]);
getReadySteps(nodes, statuses); // [] (c still running, d waits)

License

MIT