dag-workflow-engine
v1.0.0
Published
A production-grade DAG-based API orchestration engine for Node.js
Maintainers
Readme
API Chainer
A production-grade, DAG-based API orchestration engine for Node.js.
Features
- DAG-Based Execution: Define workflows as Directed Acyclic Graphs with automatic dependency resolution
- Parallel Execution: Concurrent task execution with configurable concurrency limits
- Built-in Executors: HTTP and function executors out of the box
- Extensible: Register custom executors for any task type
- Retry & Timeout: Per-task retry policies with fixed or exponential backoff
- Error Modes:
fail-fastorcontinueexecution on failures - Template Variables: Dynamic value interpolation using
{{nodeId.path}}syntax - Observability: Pluggable logger, metrics, and tracer interfaces
- Lifecycle Events: Subscribe to workflow and node events
- Type-Safe: Full TypeScript support with comprehensive type definitions
Installation
npm install api-chainerQuick Start
import { ApiChainer } from 'api-chainer';
const chainer = new ApiChainer({
concurrency: 5,
errorMode: 'fail-fast',
});
const result = await chainer.run({
tasks: [
// First task: fetch user data
{
id: 'getUser',
type: 'http',
request: {
method: 'GET',
url: 'https://api.example.com/users/123',
},
},
// Second task: fetch user's posts (depends on first task)
{
id: 'getPosts',
type: 'http',
dependsOn: ['getUser'],
request: {
method: 'GET',
url: 'https://api.example.com/users/{{getUser.data.id}}/posts',
},
},
// Third task: process results with a function
{
id: 'processData',
type: 'function',
dependsOn: ['getUser', 'getPosts'],
handler: async (inputs, results) => {
return {
user: results.getUser.data,
postCount: results.getPosts.data.length,
};
},
},
],
});
console.log(result.success); // true or false
console.log(result.results); // { getUser: {...}, getPosts: {...}, processData: {...} }Workflow Definition
Task Types
HTTP Tasks
{
id: 'fetchData',
type: 'http',
request: {
method: 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE',
url: 'https://api.example.com/data',
headers: { 'Authorization': 'Bearer {{inputs.token}}' },
params: { page: 1 },
body: { name: '{{inputs.name}}' },
responseType: 'json' | 'text' | 'blob',
},
policies: {
timeout: 5000,
retry: { attempts: 3, backoff: 'exponential', delayMs: 100 },
},
}Function Tasks
{
id: 'transform',
type: 'function',
dependsOn: ['fetchData'],
handler: async (inputs, results) => {
// inputs = original workflow input
// results = { fetchData: { status, data, ... } }
return transformedData;
},
}Template Variables
Use {{path}} syntax to reference values:
{{inputs.propertyName}}- Access workflow input{{nodeId.data.path}}- Access result from completed node
{
url: 'https://api.example.com/users/{{getUser.data.id}}',
headers: { 'Authorization': 'Bearer {{inputs.token}}' },
body: { userId: '{{getUser.data.id}}', name: '{{inputs.name}}' },
}Configuration
const chainer = new ApiChainer({
// Max concurrent task executions
concurrency: 10,
// Error handling: 'fail-fast' stops on first error, 'continue' runs independent branches
errorMode: 'fail-fast',
// Default timeout for all tasks (ms)
defaultTimeout: 30000,
// HTTP executor defaults
http: {
baseUrl: 'https://api.example.com',
defaultHeaders: { 'X-API-Key': 'your-key' },
},
// Observability
observability: {
logger: customLogger,
metrics: customMetrics,
tracer: customTracer,
},
});Lifecycle Events
chainer.lifecycle.on('workflow:start', (event) => {
console.log(`Workflow started: ${event.traceId}`);
});
chainer.lifecycle.on('node:success', (event) => {
console.log(`Node ${event.nodeId} completed in ${event.durationMs}ms`);
});
chainer.lifecycle.on('node:failure', (event) => {
console.error(`Node ${event.nodeId} failed:`, event.error);
});
chainer.lifecycle.on('workflow:end', (event) => {
console.log(`Workflow completed: success=${event.success}, duration=${event.durationMs}ms`);
});Custom Executors
import { Executor, DagNode, ContextSnapshot } from 'api-chainer';
class DatabaseExecutor implements Executor {
async execute(node: DagNode, context: ContextSnapshot, signal: AbortSignal) {
const config = node.config as { query: string };
// Execute database query...
return { rows: [...] };
}
}
chainer.registerExecutor('database', new DatabaseExecutor());Error Handling
Fail-Fast Mode (default)
Stops workflow execution immediately when any task fails.
Continue Mode
Continues executing independent branches even if some tasks fail. Dependent tasks are automatically skipped.
const result = await chainer.run(workflow);
if (!result.success) {
console.log('Failed nodes:', Object.keys(result.errors));
console.log('Skipped:', result.stats.skipped);
}Result Format
interface WorkflowResult {
success: boolean;
results: Record<string, unknown>;
errors?: Record<string, Error>;
durationMs: number;
traceId: string;
stats: {
total: number;
completed: number;
failed: number;
skipped: number;
};
}License
MIT
