npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

ex-flow

v1.1.0

Published

Dependency-graph execution planner using Kahn's Algorithm with priority-aware batching

Readme

ex-flow 📊

Priority-aware DAG execution planner for TypeScript based on Kahn's Algorithm.

npm version npm downloads Coverage Bundle Size License CI

ex-flow is a TypeScript DAG scheduler and dependency-graph planner powered by Kahn's Algorithm. It creates deterministic execution batches and a full topological sequence, with cycle detection and observability-ready diagnostics.

Use it for workflow orchestration, job scheduling, CI dependency graphs, and ETL pipelines that require predictable ordering and clear runtime error context.

ex-flow helps you model task dependencies and produce:

  • batches: execution levels where nodes in the same batch can run in parallel
  • fullSequence: flattened ordered list across all batches

Within each batch, tasks are sorted by priority (higher first).

This library is designed for teams searching for a TypeScript DAG scheduler, topological sort engine, dependency graph execution planner, and a React-friendly workflow orchestration helper.

Features

  • Deterministic topological execution planning with cycle detection
  • Priority-aware ordering inside each batch
  • Explicit error codes for common graph/config issues
  • Configurable output cloning strategy: shallow, deep, or custom
  • External tie-breaker support for equal priorities
  • Optional scheduling constraints: concurrency caps, resource caps, deadline and weight strategies
  • Scheduler telemetry via resolveExecutionDetails()
  • Structured diagnostics via ExFlowRuntimeError
  • Fairness controls (aging, maxDeferralRounds) for constrained throughput scheduling
  • Preset profiles for enterprise-ready defaults

Use Cases

  • Workflow orchestration where tasks have hard dependency ordering
  • Worker runtime scheduling with per-resource-class limits
  • CI or release pipelines that need deterministic execution batches
  • ETL or data processing stages with priority, deadline, and fairness constraints

Why ex-flow

  • Deterministic Kahn-style topological planning with clear batch outputs
  • Multi-layer tie resolution chain for stable and explainable ordering
  • Runtime diagnostics and observability adapters for production operations
  • Throughput and fairness controls to reduce starvation under constraints

Installation

pnpm add ex-flow

or

npm install ex-flow

Quick Start

import { ExFlow, type ExNode } from "ex-flow";

type TaskData = { name: string };

const data: ExNode<TaskData>[] = [
  { id: "A", dependsOn: [], data: { name: "Task A" }, priority: 2 },
  { id: "B", dependsOn: ["A"], data: { name: "Task B" }, priority: 1 },
  { id: "C", dependsOn: ["A"], data: { name: "Task C" }, priority: 3 },
  { id: "D", dependsOn: ["B", "C"], data: { name: "Task D" }, priority: 2 },
];

const flow = new ExFlow<TaskData>();
for (const node of data) {
  flow.addEntity(node);
}

const plan = flow.resolveExecutionPlan();
console.log(plan.batches);
console.log(plan.fullSequence);

React Integration (Tree-Shakable)

For React apps, import from the React-only entrypoint:

import { useExFlow } from "ex-flow/react";

This keeps React utilities isolated from the core ex-flow entry and avoids mixing React exports into existing imports.

React Hook (useExFlow)

useExFlow<T>() wraps a stable ExFlow instance and exposes helper functions for common React workflows.

import { useExFlow } from "ex-flow/react";

type TaskData = { label: string };

const { addEntities, mapDataToNodes, addFromData, resolvePlan, resolveDetails, getLastMetrics } =
  useExFlow<TaskData>({ schedulerMode: "throughput", log: "error" });

Hook options (UseExFlowOptions<T>):

  • Accepts all ExFlowOptions<T> fields.
  • log?: "debug" | "error" (default: "debug"): selects which console method is used for serialized diagnostics logs in non-production.

| Option | Type | Default | Description | | ------------------ | -------------------- | ----------------------- | ------------------------------------------------------------------------------------------------------ | | All ExFlow options | ExFlowOptions<T> | Same as ExFlow defaults | Pass-through scheduler/config options (for example schedulerMode, concurrencyCap, resourceCaps). | | log | "debug" \| "error" | "debug" | Controls diagnostics logging method in non-production (console.debug or console.error). |

Hook API:

| Method | Input | Returns | Purpose | | ---------------- | ----------------- | --------------------------- | ------------------------------------------------------------------------------ | | addEntities | ExNode<T>[] | void | Add pre-built nodes directly into the internal ExFlow instance. | | mapDataToNodes | items, mapper | ExNode<T>[] | Convert arbitrary source data into ExFlow nodes without mutating ExFlow state. | | addFromData | items, mapper | ExNode<T>[] | Map source data to nodes and add them immediately in one step. | | resolvePlan | - | ExecutionPlan<T> | Resolve current graph state into execution batches and full sequence. | | resolveDetails | - | ExFlowExecutionDetails<T> | Resolve plan and include runtime scheduling metrics. | | getLastMetrics | - | ExFlowMetrics \| null | Read the latest metrics snapshot from the last resolve call. |

Example mapping plain data into ExNode before adding:

type TaskInput = {
  key: string;
  deps?: string[];
  label: string;
  priority?: number;
};

const { mapDataToNodes, addEntities, resolvePlan } = useExFlow<{ label: string }>();

const nodes = mapDataToNodes<TaskInput>(tasks, (item) => ({
  id: item.key,
  dependsOn: item.deps ?? [],
  data: { label: item.label },
  priority: item.priority,
}));

addEntities(nodes);
const plan = resolvePlan();

Example map+add in one call:

type TaskInput = {
  key: string;
  deps?: string[];
  label: string;
  priority?: number;
};

const { addFromData, resolveDetails } = useExFlow<{ label: string }>();

addFromData<TaskInput>(tasks, (item) => ({
  id: item.key,
  dependsOn: item.deps ?? [],
  data: { label: item.label },
  priority: item.priority,
}));

const { plan, metrics } = resolveDetails();
console.log(plan.fullSequence);
console.log(metrics);

Duplicate id caution (addFromData):

  • addFromData calls addEntity for each mapped node.
  • If your mapper produces repeated id values, ExFlow throws [EXFLOW_DUPLICATE_NODE].
  • Prefer stable, globally unique ids from source data (for example database id or UUID).
  • If source data can repeat, dedupe before mapping or compose ids with a namespace key.

Debug diagnostics logging:

  • In non-production environments, useExFlow logs serialized diagnostics with serializeExFlowError when addEntities, addFromData, resolvePlan, or resolveDetails throw.
  • Log level is controlled by log option: log: "debug" uses console.debug, log: "error" uses console.error.
  • This helps inspect code, message, and structured diagnostics fields quickly while debugging.

API

new ExFlow<T>(options?)

Creates an execution planner.

Options:

  • cloneMode?: "shallow" | "deep" | "custom"
  • cloneFn?: (data: T) => T (required when cloneMode is custom)
  • priorityAscending?: boolean (default: false, so higher priority runs first)
  • tieBreaker?: (a, b) => number (used when priority values are equal)
  • concurrencyCap?: number
  • resourceCaps?: Record<string, number>
  • deadlineStrategy?: "earliest-first" | "latest-first"
  • weightStrategy?: "higher-first" | "lower-first"
  • schedulerMode?: "level" | "throughput" (default: level)
  • tieFallbackPolicy?: "insertion" | "id-asc" | "id-desc" (default: insertion)
  • fairnessPolicy?: "none" | "aging" (default: none)
  • maxDeferralRounds?: number
  • requireResourceCapForAllClasses?: boolean
  • presetName?: "stable-enterprise" | "high-throughput" | "strict-fairness"

Ordering semantics for priorityAscending, deadlineStrategy, and weightStrategy:

  • They are not overlapping switches; they are evaluated as a tie-resolution chain.
  • priorityAscending always controls the primary priority direction.
  • deadlineStrategy is applied only when two nodes have equal priority.
  • weightStrategy is applied only when both priority and deadline comparisons are tied.

Quick example:

  • Node A: priority=10, deadline=100, weight=1
  • Node B: priority=9, deadline=1, weight=999
  • With default priorityAscending: false, Node A still runs before Node B because priority is the first comparator.

createExFlowConfigBuilder<T>()

Builds ExFlow options with a fluent API.

Example:

import { ExFlow, createExFlowConfigBuilder } from "ex-flow";

type Task = { name: string; meta: { tags: string[] } };

const options = createExFlowConfigBuilder<Task>()
  .withPreset("high-throughput")
  .withSchedulerMode("throughput")
  .withPriorityAscending(true)
  .withTieFallbackPolicy("id-asc")
  .withFairnessPolicy("aging")
  .withMaxDeferralRounds(2)
  .withConcurrencyCap(2)
  .withResourceCaps({ cpu: 1 })
  .requireResourceCapForAllClasses()
  .withDeadlineStrategy("earliest-first")
  .withWeightStrategy("higher-first")
  .useCustomClone((data) => ({
    ...data,
    meta: { ...data.meta, tags: [...data.meta.tags] },
  }))
  .build();

const flow = new ExFlow<Task>(options);

throughput mode allows newly-ready nodes to be scheduled between constrained sub-batches, which can improve total throughput while preserving dependency correctness.

resolveExecutionDetails()

Returns both plan and scheduler metrics:

  • plan: same shape as resolveExecutionPlan()
  • metrics:
    • schedulerMode
    • rounds
    • emittedNodes
    • deferredNodes
    • maxReadyQueueSize
    • constraintHits.concurrencyCap
    • constraintHits.resourceCaps

addEntity(node)

Adds a graph node.

  • TypeScript enforces that input data must not declare exFlowPriority.
  • Throws [EXFLOW_DUPLICATE_NODE] when id already exists.
  • Throws [EXFLOW_RESERVED_FIELD] when input data already contains exFlowPriority.

You can model this explicitly with the exported SafeTask<T> helper type.

resolveExecutionPlan()

Builds the execution plan.

  • Throws [EXFLOW_UNKNOWN_DEPENDENCY] when a dependency id does not exist.
  • Throws [EXFLOW_CYCLE_DETECTED] when the graph has a cycle.

Cycle errors now include a detected cycle path in the message when available.

Structured diagnostics are available through ExFlowRuntimeError.diagnostics.

Diagnostics Serialization (Logging & Observability)

Use serializeExFlowError to normalize runtime errors before forwarding to your observability stack.

import {
  ExFlow,
  ExFlowRuntimeError,
  serializeExFlowError,
  type ExFlowObservabilityEvent,
} from "ex-flow";

const flow = new ExFlow<{ name: string }>();

try {
  flow.addEntity({ id: "A", dependsOn: ["B"], data: { name: "Task A" } });
  flow.addEntity({ id: "B", dependsOn: ["A"], data: { name: "Task B" } });
  flow.resolveExecutionPlan();
} catch (error) {
  const event: ExFlowObservabilityEvent = serializeExFlowError(error);

  // Example mapping
  console.log(
    JSON.stringify({
      level: "error",
      service: "scheduler-service",
      error_code: event.code,
      error_name: event.name,
      message: event.message,
      diagnostics: event.diagnostics,
      ts: event.timestamp,
    }),
  );

  if (error instanceof ExFlowRuntimeError) {
    // Domain-specific handling for cycle errors
    if (error.code === "EXFLOW_CYCLE_DETECTED") {
      console.error("Cycle path:", error.diagnostics?.cyclePath);
    }
  }
}

Recommended observability fields:

  • error_code: EXFLOW_* code for alert grouping
  • diagnostics.cyclePath: direct cycle troubleshooting
  • diagnostics.unresolvedNodeIds: impact scope
  • diagnostics.invalidOptionField + invalidOptionValue: config hygiene dashboards

Integration: API Server

Example with OpenTelemetry-style attributes:

import { ExFlow, serializeExFlowError, toOpenTelemetryAttributes } from "ex-flow";

const flow = new ExFlow<{ name: string }>();

try {
  flow.resolveExecutionPlan();
} catch (error) {
  const event = serializeExFlowError(error);
  const attributes = toOpenTelemetryAttributes(event);

  // Example: span.recordException + span.setAttributes
  console.log("otel.attributes", attributes);
}

Integration: Worker Runtime

Example with Datadog-style log fields:

import { ExFlow, serializeExFlowError, toDatadogLogFields } from "ex-flow";

const flow = new ExFlow<{ name: string }>();

try {
  flow.resolveExecutionPlan();
} catch (error) {
  const event = serializeExFlowError(error);
  const logFields = toDatadogLogFields(event);

  // Example: logger.error(logFields)
  console.log("worker.log", JSON.stringify(logFields));
}

Integration: Custom Mapper Factory

Use createDiagnosticsMapper when your team needs custom field names or value transforms.

import { createDiagnosticsMapper, serializeExFlowError } from "ex-flow";

const mapDiagnostics = createDiagnosticsMapper({
  keyPrefix: "scheduler",
  separator: "_",
  fieldNameMap: {
    code: "err_code",
    message: "err_message",
    cyclePath: "cycle",
  },
  staticFields: {
    team: "platform",
  },
  valueTransform: (value, key) => {
    if (key === "invalidOptionValue" && value !== undefined && value !== null) {
      return `value:${String(value)}`;
    }
    return value;
  },
});

try {
  // ex-flow logic
} catch (error) {
  const event = serializeExFlowError(error);
  const payload = mapDiagnostics(event);
  console.log("custom.payload", payload);
}

Returns:

  • batches: ExFlowResultItem<T>[][]
  • fullSequence: ExFlowResultItem<T>[]

Clone Modes and Immutability

shallow (default)

Fastest mode. Top-level object is cloned, but nested references are shared.

deep

Uses structuredClone to isolate nested references.

  • Throws [EXFLOW_DEEP_CLONE_UNAVAILABLE] when runtime does not support structuredClone.

custom

Uses your custom clone function.

  • Throws [EXFLOW_CUSTOM_CLONE_FN_REQUIRED] if cloneFn is missing.

Example:

type Task = { name: string; meta: { tags: string[] } };

const flow = new ExFlow<Task>({
  cloneMode: "custom",
  cloneFn: (data) => ({
    ...data,
    meta: {
      ...data.meta,
      tags: [...data.meta.tags],
    },
  }),
});

Error Codes

Exported as EXFLOW_ERROR:

  • EXFLOW_DUPLICATE_NODE
  • EXFLOW_RESERVED_FIELD
  • EXFLOW_UNKNOWN_DEPENDENCY
  • EXFLOW_CYCLE_DETECTED
  • EXFLOW_CUSTOM_CLONE_FN_REQUIRED
  • EXFLOW_DEEP_CLONE_UNAVAILABLE
  • EXFLOW_INVALID_OPTION

Compatibility Matrix

Ordering determinism and scheduling behavior by mode:

| Mode | Ready-node release | Tie fallback default | Throughput profile | | ------------ | ---------------------------------- | -------------------- | ----------------------------- | | level | strict level-by-level | insertion order | predictable phase boundaries | | throughput | unlocks between constrained rounds | insertion order | higher utilization under caps |

Tie resolution priority chain:

  1. priorityAscending / priority value
  2. deadlineStrategy (if set)
  3. weightStrategy (if set)
  4. tieBreaker (if set)
  5. tieFallbackPolicy

Migration Notes

Suggested upgrade path for existing consumers:

  1. Preserve old behavior:
  • keep schedulerMode: "level"
  • keep tieFallbackPolicy: "insertion"
  1. Adopt deterministic id-based fallback (optional):
  • set tieFallbackPolicy: "id-asc"
  1. Adopt throughput mode safely:
  • start with schedulerMode: "throughput"
  • add concurrencyCap and resourceCaps
  • consider fairnessPolicy: "aging" + maxDeferralRounds
  1. Enforce resource governance in production:
  • enable requireResourceCapForAllClasses: true

License

MIT