@quarry-systems/drift-parallel-exec
v0.0.1-alpha.2
Published
Parallel execution plugin for Drift - enables dependency-aware parallel task execution
Downloads
761
Maintainers
Readme
@quarry-systems/drift-parallel-exec
Parallel execution plugin for Drift - enables dependency-aware parallel task execution with automatic result merging.
Migration note (drift-core v0.4.0+):
ParallelExecutorhas moved intodrift-core. This package re-exports it unchanged for backward compatibility and continues to provide the plugin wrapper for service injection. For new projects, consider using the graph-level fork/join API (.fork()/.join()) built intoManagedCyclicGraph— see Fork/Join (Graph-Level) below.
Installation
npm install @quarry-systems/drift-parallel-execFeatures
- Dependency-aware scheduling: Tasks are grouped by dependency level and executed in parallel within each level
- Automatic result merging: Results from parallel tasks are automatically merged into the context
- Error handling: Continue on error or fail-fast modes
- Concurrency control: Limit the number of concurrent tasks
- Timeout support: Set per-task timeouts
- Debug logging: Optional detailed logging for debugging
Usage
Standalone (Recommended)
The simplest way to use parallel execution is with the executeParallel helper:
import { executeParallel, type ParallelTask } from '@quarry-systems/drift-parallel-exec';
import { propose, commit } from '@quarry-systems/drift-core';
import type { Ctx } from '@quarry-systems/drift-contracts';
// Define your tasks
const tasks: ParallelTask[] = [
{
id: 'task-1',
handler: async (ctx) => {
// Do work...
let next = propose(ctx, {
op: 'set',
path: 'data.results.task-1',
value: { completed: true, value: 'result-1' },
ts: Date.now(),
by: 'task-1',
});
return commit(next, 'task-1');
},
dependencies: [], // No dependencies - runs first
},
{
id: 'task-2',
handler: async (ctx) => {
// Do work...
let next = propose(ctx, {
op: 'set',
path: 'data.results.task-2',
value: { completed: true, value: 'result-2' },
ts: Date.now(),
by: 'task-2',
});
return commit(next, 'task-2');
},
dependencies: ['task-1'], // Runs after task-1
},
{
id: 'task-3',
handler: async (ctx) => {
// Do work...
let next = propose(ctx, {
op: 'set',
path: 'data.results.task-3',
value: { completed: true, value: 'result-3' },
ts: Date.now(),
by: 'task-3',
});
return commit(next, 'task-3');
},
dependencies: ['task-1'], // Also runs after task-1 (parallel with task-2)
},
];
// Execute
const result = await executeParallel(tasks, initialContext, {
debug: true,
continueOnError: true,
});
console.log(`Completed: ${result.successCount}/${tasks.length}`);
console.log('Results:', result.ctx.data.results);With ParallelExecutor Class
For more control, use the ParallelExecutor class directly:
import { ParallelExecutor } from '@quarry-systems/drift-parallel-exec';
import { propose, commit } from '@quarry-systems/drift-core';
const executor = new ParallelExecutor({
mergeStrategy: 'results-only',
continueOnError: true,
maxConcurrency: 4,
taskTimeout: 30000, // 30 seconds
debug: true,
});
const result = await executor.execute(tasks, initialContext);As a Drift Plugin
Register as a plugin to access the executor via Drift's service injection:
import { Manager } from '@quarry-systems/drift-core';
import { createParallelExecutionPlugin } from '@quarry-systems/drift-parallel-exec';
const plugin = createParallelExecutionPlugin({
debug: true,
continueOnError: true,
});
// The plugin provides a 'parallelExecutor' service
const executor = plugin.services?.['parallelExecutor'];Fork/Join (Graph-Level)
As of drift-core v0.4.0, graph-level parallelism is a first-class feature. Rather than placing all parallel work inside a single node's action handler, you can express it directly in the graph structure:
import { ManagedCyclicGraph } from '@quarry-systems/drift-core';
const graph = new ManagedCyclicGraph('parallel-workflow')
.node('start', { label: 'Start' })
// Creates fork node + edges to all three branches
.fork('fan-out', ['branch-a', 'branch-b', 'branch-c'], {
mergeStrategy: 'results-only'
})
.node('branch-a', { label: 'Branch A' })
.node('branch-b', { label: 'Branch B' })
.node('branch-c', { label: 'Branch C' })
// Creates join node + edges from all three branches
.join('merge', ['branch-a', 'branch-b', 'branch-c'])
.node('end', { label: 'End' })
.edge('start', 'fan-out')
.edge('merge', 'end')
.start('start')
.build();Advantages over task-level parallelism:
- Each branch is a real graph node with its own execution trace entry
ForkStart,BranchStart,BranchEnd, andForkCompleteevents are emitted automatically- No plugin required —
ParallelExecutoris used internally by the Manager - Graph structure is serializable and introspectable
When to continue using ParallelExecutor directly:
- Tasks have dependencies on each other (task DAG)
- Branch count is determined at runtime
- You need
snapshotPerLevelcheckpointing within a single step
For full documentation see the Parallel Execution guide.
Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| mergeStrategy | 'deep' \| 'shallow' \| 'results-only' | 'results-only' | How to merge task results into context |
| continueOnError | boolean | true | Continue executing other tasks if one fails |
| maxConcurrency | number | 0 (unlimited) | Maximum concurrent tasks per level |
| taskTimeout | number | 0 (no timeout) | Timeout per task in milliseconds |
| debug | boolean | false | Enable debug logging |
| snapshotPerLevel | boolean | false | Save snapshot after each dependency level |
| callbacks | ParallelExecutionCallbacks | undefined | Integration callbacks for tracking |
Import Path
ParallelExecutor is now also available directly from drift-core:
// ✅ Preferred (no plugin dependency needed)
import { ParallelExecutor, executeParallel } from '@quarry-systems/drift-core';
import type { ParallelTask } from '@quarry-systems/drift-core';
// ✅ Also works (re-exported for backward compatibility)
import { ParallelExecutor, executeParallel } from '@quarry-systems/drift-parallel-exec';Manager Integration
For full integration with Drift's Manager (snapshots, events, tracking), use createManagedParallelExecutor:
import { createManagedParallelExecutor } from '@quarry-systems/drift-parallel-exec';
import type { Manager } from '@quarry-systems/drift-core';
// Inside a node handler
async function parallelTasksHandler(node, ctx, manager: Manager) {
const runId = ctx.runId;
const nodeId = node.id;
const executor = createManagedParallelExecutor({
debug: true,
continueOnError: true,
// Save snapshot after each dependency level completes
saveSnapshot: async (ctx, metadata) => {
await manager.save(runId, nodeId, ctx, 'running', metadata.level, undefined, undefined, {
parallelProgress: {
level: metadata.level,
completedTasks: metadata.completedTasks,
pendingTasks: metadata.pendingTasks,
},
});
},
// Emit events for task tracking
onTaskStart: async (taskId, ctx) => {
manager.emit('ParallelTaskStart', { runId, nodeId, taskId }, ctx);
},
onTaskComplete: async (taskId, success, duration, error) => {
manager.emit('ParallelTaskComplete', {
runId, nodeId, taskId, success, duration, error: error?.message
}, ctx);
},
onLevelComplete: async (level, totalLevels, ctx) => {
manager.emit('ParallelLevelComplete', {
runId, nodeId, level, totalLevels
}, ctx);
},
});
const result = await executor.execute(tasks, ctx);
return result.ctx;
}This gives you:
- ✅ Snapshots after each dependency level (resume capability)
- ✅ Task-level events for monitoring
- ✅ Level-complete events for progress tracking
- ✅ Full integration with Drift's event system
Merge Strategies
results-only: Only merge thectx.data.resultsobject (recommended for most use cases)shallow: Shallow merge all keys inctx.datadeep: Deep merge with special handling for theresultsobject
Example: LLM Workflow
This plugin is ideal for parallel LLM task execution:
import { propose, commit } from '@quarry-systems/drift-core';
const tasks: ParallelTask[] = [
{
id: 'analyze',
handler: async (ctx) => {
const result = await llm.complete('Analyze this code...');
let next = propose(ctx, {
op: 'set', path: 'data.results.analyze', value: result, ts: Date.now(), by: 'analyze',
});
return commit(next, 'analyze');
},
dependencies: [],
},
{
id: 'review',
handler: async (ctx) => {
const result = await llm.complete('Review this code...');
let next = propose(ctx, {
op: 'set', path: 'data.results.review', value: result, ts: Date.now(), by: 'review',
});
return commit(next, 'review');
},
dependencies: [],
},
{
id: 'synthesize',
handler: async (ctx) => {
// Access results from previous tasks
const analysis = ctx.data.results['analyze'];
const review = ctx.data.results['review'];
const result = await llm.complete(`Synthesize: ${analysis}, ${review}`);
let next = propose(ctx, {
op: 'set', path: 'data.results.synthesize', value: result, ts: Date.now(), by: 'synthesize',
});
return commit(next, 'synthesize');
},
dependencies: ['analyze', 'review'], // Waits for both
},
];
const result = await executeParallel(tasks, ctx, { debug: true });
// All 3 tasks complete, with 'analyze' and 'review' running in parallelLicense
Dual-licensed under AGPL-3.0 and Commercial License. See LICENSE files for details.
Building
Run nx build drift-parallel-exec to build the library.
Running unit tests
Run nx test drift-parallel-exec to execute the unit tests via Vitest.
