dagflowjs
v0.5.0
Published
A lightweight, type-safe DAG (Directed Acyclic Graph) execution engine for TypeScript
Maintainers
Readme
dagflowjs
A lightweight, type-safe DAG (Directed Acyclic Graph) execution engine for TypeScript. Execute complex workflows with dependency management, parallel execution, retries, timeouts, and comprehensive metrics.
Features
- 🔄 Dependency Management: Define node dependencies and execute in the correct order
- ⚡ Parallel Execution: Automatically runs independent nodes in parallel
- 🔁 Retry Logic: Configurable retries with exponential backoff
- ⏱️ Timeouts: Set timeouts per node to prevent hanging operations
- 📊 Metrics: Track execution time, attempts, and success rates
- ✅ Validation: Pre-execution validation hooks
- 🧹 Cleanup: Post-execution cleanup hooks
- 🛡️ Error Strategies: Choose to fail or skip nodes on error
- 📝 TypeScript: Full type safety with generic context types
Installation
npm install dagflowjsExample
import { DagEngine } from 'dagflowjs';
interface OrderContext {
orderId: string;
paymentId?: string;
inventoryReserved: boolean;
shippingLabel?: string;
orderStatus: 'pending' | 'processing' | 'completed';
}
const engine = new DagEngine<OrderContext>({
logger: {
info: (msg) => console.log(`[INFO] ${msg}`),
warn: (msg) => console.warn(`[WARN] ${msg}`),
error: (msg) => console.error(`[ERROR] ${msg}`),
},
});
engine
.addNode({
id: 'validate-order',
execute: async (ctx) => {
return { orderStatus: 'processing' as const };
},
})
.addNode({
id: 'process-payment',
dependsOn: ['validate-order'],
config: { maxRetries: 3, timeoutMs: 30000 },
shouldRun: async (ctx) => {
// Gate: blocks dependents if false
return ctx.orderStatus === 'processing';
},
validate: async (ctx) => {
// Validation: skips node but allows dependents
return !!ctx.orderId;
},
execute: async (ctx) => {
return { paymentId: 'pay_12345' };
},
})
.addNode({
id: 'reserve-inventory',
dependsOn: ['validate-order'],
validate: async (ctx) => {
return ctx.orderStatus === 'processing';
},
execute: async (ctx) => {
return { inventoryReserved: true };
},
})
.addNode({
id: 'create-shipping-label',
dependsOn: ['process-payment', 'reserve-inventory'],
shouldRun: async (ctx) => {
return !!ctx.paymentId && ctx.inventoryReserved === true;
},
execute: async (ctx) => {
return { shippingLabel: 'LABEL_67890' };
},
})
.addNode({
id: 'complete-order',
dependsOn: ['create-shipping-label'],
validate: async (ctx) => {
return !!ctx.shippingLabel;
},
execute: async (ctx) => {
return { orderStatus: 'completed' as const };
},
});
const result = await engine.execute({
orderId: 'ORD_001',
inventoryReserved: false,
orderStatus: 'pending',
});
if (result.success) {
console.log('Success:', result.context);
console.log('Metrics:', result.metrics);
}API Reference
DagEngine<T>
The main engine class for executing DAG workflows.
Constructor
new DagEngine<T>(deps: DagNodeDeps)Methods
addNode(node: DagNode<T>): this- Add a node to the workflowexecute(initial: T): Promise<DagResult<T>>- Execute the workflow
DagNode<T, Patch>
Interface for defining workflow nodes.
interface DagNode<T, Patch = Partial<T>> {
id: string;
dependsOn?: string[];
config?: DagNodeConfig;
shouldRun?(ctx: T): boolean | Promise<boolean>;
execute(ctx: Readonly<T>, deps: DagNodeDeps, signal: AbortSignal): Promise<Patch>;
validate?(ctx: T): boolean | Promise<boolean>;
cleanup?(ctx: T): void | Promise<void>;
}Hooks
shouldRun: Determines if a node should execute. Iffalse, the node is skipped and all dependent nodes are blocked (marked as "blocked" in metrics). Use this for gating logic that should prevent an entire branch of the workflow from executing.validate: Validates the context before execution. Iffalse, the node is skipped but dependent nodes can still run. Use this for conditional execution where other nodes might still be valid.cleanup: Called after execution (whether successful or failed). Useful for logging, notifications, or resource cleanup.
DagNodeConfig
Configuration options for a node.
interface DagNodeConfig {
timeoutMs?: number; // Node timeout in milliseconds
maxRetries?: number; // Maximum retry attempts (default: 0)
retryDelayMs?: number; // Base delay between retries (default: 500ms)
onError?: 'fail' | 'skip' | 'skip-dependents'; // Error handling strategy (default: 'fail')
}DagResult<T>
Result of workflow execution.
interface DagResult<T> {
success: boolean;
context: T;
metrics: DagMetrics;
error?: Error;
}Development
- Install dependencies:
npm install- Run the unit tests:
npm run test- Build the library:
npm run buildLicense
MIT
