ex-flow
v1.1.0
Published
Dependency-graph execution planner using Kahn's Algorithm with priority-aware batching
Maintainers
Readme
ex-flow 📊
Priority-aware DAG execution planner for TypeScript based on Kahn's Algorithm.
ex-flow is a TypeScript DAG scheduler and dependency-graph planner powered by Kahn's Algorithm. It creates deterministic execution batches and a full topological sequence, with cycle detection and observability-ready diagnostics.
Use it for workflow orchestration, job scheduling, CI dependency graphs, and ETL pipelines that require predictable ordering and clear runtime error context.
ex-flow helps you model task dependencies and produce:
batches: execution levels where nodes in the same batch can run in parallelfullSequence: flattened ordered list across all batches
Within each batch, tasks are sorted by priority (higher first).
This library is designed for teams searching for a TypeScript DAG scheduler, topological sort engine, dependency graph execution planner, and a React-friendly workflow orchestration helper.
Features
- Deterministic topological execution planning with cycle detection
- Priority-aware ordering inside each batch
- Explicit error codes for common graph/config issues
- Configurable output cloning strategy:
shallow,deep, orcustom - External tie-breaker support for equal priorities
- Optional scheduling constraints: concurrency caps, resource caps, deadline and weight strategies
- Scheduler telemetry via
resolveExecutionDetails() - Structured diagnostics via
ExFlowRuntimeError - Fairness controls (
aging,maxDeferralRounds) for constrained throughput scheduling - Preset profiles for enterprise-ready defaults
Use Cases
- Workflow orchestration where tasks have hard dependency ordering
- Worker runtime scheduling with per-resource-class limits
- CI or release pipelines that need deterministic execution batches
- ETL or data processing stages with priority, deadline, and fairness constraints
Why ex-flow
- Deterministic Kahn-style topological planning with clear batch outputs
- Multi-layer tie resolution chain for stable and explainable ordering
- Runtime diagnostics and observability adapters for production operations
- Throughput and fairness controls to reduce starvation under constraints
Installation
pnpm add ex-flowor
npm install ex-flowQuick Start
import { ExFlow, type ExNode } from "ex-flow";
type TaskData = { name: string };
const data: ExNode<TaskData>[] = [
{ id: "A", dependsOn: [], data: { name: "Task A" }, priority: 2 },
{ id: "B", dependsOn: ["A"], data: { name: "Task B" }, priority: 1 },
{ id: "C", dependsOn: ["A"], data: { name: "Task C" }, priority: 3 },
{ id: "D", dependsOn: ["B", "C"], data: { name: "Task D" }, priority: 2 },
];
const flow = new ExFlow<TaskData>();
for (const node of data) {
flow.addEntity(node);
}
const plan = flow.resolveExecutionPlan();
console.log(plan.batches);
console.log(plan.fullSequence);React Integration (Tree-Shakable)
For React apps, import from the React-only entrypoint:
import { useExFlow } from "ex-flow/react";This keeps React utilities isolated from the core ex-flow entry and avoids mixing React exports into existing imports.
React Hook (useExFlow)
useExFlow<T>() wraps a stable ExFlow instance and exposes helper functions for common React workflows.
import { useExFlow } from "ex-flow/react";
type TaskData = { label: string };
const { addEntities, mapDataToNodes, addFromData, resolvePlan, resolveDetails, getLastMetrics } =
useExFlow<TaskData>({ schedulerMode: "throughput", log: "error" });Hook options (UseExFlowOptions<T>):
- Accepts all
ExFlowOptions<T>fields. log?: "debug" | "error"(default:"debug"): selects which console method is used for serialized diagnostics logs in non-production.
| Option | Type | Default | Description |
| ------------------ | -------------------- | ----------------------- | ------------------------------------------------------------------------------------------------------ |
| All ExFlow options | ExFlowOptions<T> | Same as ExFlow defaults | Pass-through scheduler/config options (for example schedulerMode, concurrencyCap, resourceCaps). |
| log | "debug" \| "error" | "debug" | Controls diagnostics logging method in non-production (console.debug or console.error). |
Hook API:
| Method | Input | Returns | Purpose |
| ---------------- | ----------------- | --------------------------- | ------------------------------------------------------------------------------ |
| addEntities | ExNode<T>[] | void | Add pre-built nodes directly into the internal ExFlow instance. |
| mapDataToNodes | items, mapper | ExNode<T>[] | Convert arbitrary source data into ExFlow nodes without mutating ExFlow state. |
| addFromData | items, mapper | ExNode<T>[] | Map source data to nodes and add them immediately in one step. |
| resolvePlan | - | ExecutionPlan<T> | Resolve current graph state into execution batches and full sequence. |
| resolveDetails | - | ExFlowExecutionDetails<T> | Resolve plan and include runtime scheduling metrics. |
| getLastMetrics | - | ExFlowMetrics \| null | Read the latest metrics snapshot from the last resolve call. |
Example mapping plain data into ExNode before adding:
type TaskInput = {
key: string;
deps?: string[];
label: string;
priority?: number;
};
const { mapDataToNodes, addEntities, resolvePlan } = useExFlow<{ label: string }>();
const nodes = mapDataToNodes<TaskInput>(tasks, (item) => ({
id: item.key,
dependsOn: item.deps ?? [],
data: { label: item.label },
priority: item.priority,
}));
addEntities(nodes);
const plan = resolvePlan();Example map+add in one call:
type TaskInput = {
key: string;
deps?: string[];
label: string;
priority?: number;
};
const { addFromData, resolveDetails } = useExFlow<{ label: string }>();
addFromData<TaskInput>(tasks, (item) => ({
id: item.key,
dependsOn: item.deps ?? [],
data: { label: item.label },
priority: item.priority,
}));
const { plan, metrics } = resolveDetails();
console.log(plan.fullSequence);
console.log(metrics);Duplicate id caution (addFromData):
addFromDatacallsaddEntityfor each mapped node.- If your mapper produces repeated
idvalues, ExFlow throws[EXFLOW_DUPLICATE_NODE]. - Prefer stable, globally unique ids from source data (for example database id or UUID).
- If source data can repeat, dedupe before mapping or compose ids with a namespace key.
Debug diagnostics logging:
- In non-production environments,
useExFlowlogs serialized diagnostics withserializeExFlowErrorwhenaddEntities,addFromData,resolvePlan, orresolveDetailsthrow. - Log level is controlled by
logoption:log: "debug"usesconsole.debug,log: "error"usesconsole.error. - This helps inspect
code,message, and structureddiagnosticsfields quickly while debugging.
API
new ExFlow<T>(options?)
Creates an execution planner.
Options:
cloneMode?: "shallow" | "deep" | "custom"cloneFn?: (data: T) => T(required whencloneModeiscustom)priorityAscending?: boolean(default:false, so higher priority runs first)tieBreaker?: (a, b) => number(used when priority values are equal)concurrencyCap?: numberresourceCaps?: Record<string, number>deadlineStrategy?: "earliest-first" | "latest-first"weightStrategy?: "higher-first" | "lower-first"schedulerMode?: "level" | "throughput"(default:level)tieFallbackPolicy?: "insertion" | "id-asc" | "id-desc"(default:insertion)fairnessPolicy?: "none" | "aging"(default:none)maxDeferralRounds?: numberrequireResourceCapForAllClasses?: booleanpresetName?: "stable-enterprise" | "high-throughput" | "strict-fairness"
Ordering semantics for priorityAscending, deadlineStrategy, and weightStrategy:
- They are not overlapping switches; they are evaluated as a tie-resolution chain.
priorityAscendingalways controls the primary priority direction.deadlineStrategyis applied only when two nodes have equal priority.weightStrategyis applied only when both priority and deadline comparisons are tied.
Quick example:
- Node A:
priority=10,deadline=100,weight=1 - Node B:
priority=9,deadline=1,weight=999 - With default
priorityAscending: false, Node A still runs before Node B because priority is the first comparator.
createExFlowConfigBuilder<T>()
Builds ExFlow options with a fluent API.
Example:
import { ExFlow, createExFlowConfigBuilder } from "ex-flow";
type Task = { name: string; meta: { tags: string[] } };
const options = createExFlowConfigBuilder<Task>()
.withPreset("high-throughput")
.withSchedulerMode("throughput")
.withPriorityAscending(true)
.withTieFallbackPolicy("id-asc")
.withFairnessPolicy("aging")
.withMaxDeferralRounds(2)
.withConcurrencyCap(2)
.withResourceCaps({ cpu: 1 })
.requireResourceCapForAllClasses()
.withDeadlineStrategy("earliest-first")
.withWeightStrategy("higher-first")
.useCustomClone((data) => ({
...data,
meta: { ...data.meta, tags: [...data.meta.tags] },
}))
.build();
const flow = new ExFlow<Task>(options);throughput mode allows newly-ready nodes to be scheduled between constrained sub-batches,
which can improve total throughput while preserving dependency correctness.
resolveExecutionDetails()
Returns both plan and scheduler metrics:
plan: same shape asresolveExecutionPlan()metrics:schedulerModeroundsemittedNodesdeferredNodesmaxReadyQueueSizeconstraintHits.concurrencyCapconstraintHits.resourceCaps
addEntity(node)
Adds a graph node.
- TypeScript enforces that input
datamust not declareexFlowPriority. - Throws
[EXFLOW_DUPLICATE_NODE]whenidalready exists. - Throws
[EXFLOW_RESERVED_FIELD]when inputdataalready containsexFlowPriority.
You can model this explicitly with the exported SafeTask<T> helper type.
resolveExecutionPlan()
Builds the execution plan.
- Throws
[EXFLOW_UNKNOWN_DEPENDENCY]when a dependency id does not exist. - Throws
[EXFLOW_CYCLE_DETECTED]when the graph has a cycle.
Cycle errors now include a detected cycle path in the message when available.
Structured diagnostics are available through ExFlowRuntimeError.diagnostics.
Diagnostics Serialization (Logging & Observability)
Use serializeExFlowError to normalize runtime errors before forwarding to your observability stack.
import {
ExFlow,
ExFlowRuntimeError,
serializeExFlowError,
type ExFlowObservabilityEvent,
} from "ex-flow";
const flow = new ExFlow<{ name: string }>();
try {
flow.addEntity({ id: "A", dependsOn: ["B"], data: { name: "Task A" } });
flow.addEntity({ id: "B", dependsOn: ["A"], data: { name: "Task B" } });
flow.resolveExecutionPlan();
} catch (error) {
const event: ExFlowObservabilityEvent = serializeExFlowError(error);
// Example mapping
console.log(
JSON.stringify({
level: "error",
service: "scheduler-service",
error_code: event.code,
error_name: event.name,
message: event.message,
diagnostics: event.diagnostics,
ts: event.timestamp,
}),
);
if (error instanceof ExFlowRuntimeError) {
// Domain-specific handling for cycle errors
if (error.code === "EXFLOW_CYCLE_DETECTED") {
console.error("Cycle path:", error.diagnostics?.cyclePath);
}
}
}Recommended observability fields:
error_code:EXFLOW_*code for alert groupingdiagnostics.cyclePath: direct cycle troubleshootingdiagnostics.unresolvedNodeIds: impact scopediagnostics.invalidOptionField+invalidOptionValue: config hygiene dashboards
Integration: API Server
Example with OpenTelemetry-style attributes:
import { ExFlow, serializeExFlowError, toOpenTelemetryAttributes } from "ex-flow";
const flow = new ExFlow<{ name: string }>();
try {
flow.resolveExecutionPlan();
} catch (error) {
const event = serializeExFlowError(error);
const attributes = toOpenTelemetryAttributes(event);
// Example: span.recordException + span.setAttributes
console.log("otel.attributes", attributes);
}Integration: Worker Runtime
Example with Datadog-style log fields:
import { ExFlow, serializeExFlowError, toDatadogLogFields } from "ex-flow";
const flow = new ExFlow<{ name: string }>();
try {
flow.resolveExecutionPlan();
} catch (error) {
const event = serializeExFlowError(error);
const logFields = toDatadogLogFields(event);
// Example: logger.error(logFields)
console.log("worker.log", JSON.stringify(logFields));
}Integration: Custom Mapper Factory
Use createDiagnosticsMapper when your team needs custom field names or value transforms.
import { createDiagnosticsMapper, serializeExFlowError } from "ex-flow";
const mapDiagnostics = createDiagnosticsMapper({
keyPrefix: "scheduler",
separator: "_",
fieldNameMap: {
code: "err_code",
message: "err_message",
cyclePath: "cycle",
},
staticFields: {
team: "platform",
},
valueTransform: (value, key) => {
if (key === "invalidOptionValue" && value !== undefined && value !== null) {
return `value:${String(value)}`;
}
return value;
},
});
try {
// ex-flow logic
} catch (error) {
const event = serializeExFlowError(error);
const payload = mapDiagnostics(event);
console.log("custom.payload", payload);
}Returns:
batches: ExFlowResultItem<T>[][]fullSequence: ExFlowResultItem<T>[]
Clone Modes and Immutability
shallow (default)
Fastest mode. Top-level object is cloned, but nested references are shared.
deep
Uses structuredClone to isolate nested references.
- Throws
[EXFLOW_DEEP_CLONE_UNAVAILABLE]when runtime does not supportstructuredClone.
custom
Uses your custom clone function.
- Throws
[EXFLOW_CUSTOM_CLONE_FN_REQUIRED]ifcloneFnis missing.
Example:
type Task = { name: string; meta: { tags: string[] } };
const flow = new ExFlow<Task>({
cloneMode: "custom",
cloneFn: (data) => ({
...data,
meta: {
...data.meta,
tags: [...data.meta.tags],
},
}),
});Error Codes
Exported as EXFLOW_ERROR:
EXFLOW_DUPLICATE_NODEEXFLOW_RESERVED_FIELDEXFLOW_UNKNOWN_DEPENDENCYEXFLOW_CYCLE_DETECTEDEXFLOW_CUSTOM_CLONE_FN_REQUIREDEXFLOW_DEEP_CLONE_UNAVAILABLEEXFLOW_INVALID_OPTION
Compatibility Matrix
Ordering determinism and scheduling behavior by mode:
| Mode | Ready-node release | Tie fallback default | Throughput profile |
| ------------ | ---------------------------------- | -------------------- | ----------------------------- |
| level | strict level-by-level | insertion order | predictable phase boundaries |
| throughput | unlocks between constrained rounds | insertion order | higher utilization under caps |
Tie resolution priority chain:
priorityAscending/ priority valuedeadlineStrategy(if set)weightStrategy(if set)tieBreaker(if set)tieFallbackPolicy
Migration Notes
Suggested upgrade path for existing consumers:
- Preserve old behavior:
- keep
schedulerMode: "level" - keep
tieFallbackPolicy: "insertion"
- Adopt deterministic id-based fallback (optional):
- set
tieFallbackPolicy: "id-asc"
- Adopt throughput mode safely:
- start with
schedulerMode: "throughput" - add
concurrencyCapandresourceCaps - consider
fairnessPolicy: "aging"+maxDeferralRounds
- Enforce resource governance in production:
- enable
requireResourceCapForAllClasses: true
License
MIT
