@asaidimu/utils-pipeline
v1.3.17
Published
An execution engine utility.
Readme
@asaidimu/utils-pipeline
A production-grade, asynchronous, type-safe pipeline engine featuring conditional routing, checkpoint-based pause/resume mechanisms, concurrent execution, and atomic state transactions.
Features
- Type-safe State Management: Unified store with atomic updates across steps and stages.
- Conditional Routing: Dynamically jump between stages, terminate early, or suspend execution.
- Pause & Resume: Suspend a pipeline at any stage, persist its state and artifacts, and resume later—even after a process restart.
- Concurrent Execution: Parallelize steps within a stage or run multiple sub-pipelines simultaneously.
- Atomic Transactions: State patches are committed atomically at stage boundaries, ensuring consistency.
- Deep Observability: Lifecycle events for every pipeline, stage, and step with full ancestry paths.
- Abort Support: Built-in support for
AbortSignalto cancel long-running operations.
Installation
bun add @asaidimu/utils-pipeline
# or
npm install @asaidimu/utils-pipelineRouting Sequential Pipeline (RSP)
The Routing Sequential Pipeline is the primary engine for complex, stateful workflows.
Basic Usage
import {
PipelineFactory,
type RoutingPipelineDefinition,
} from "@asaidimu/utils-pipeline";
interface MyState {
counter: number;
}
const definition: RoutingPipelineDefinition<MyState> = {
id: "my-pipeline",
label: "My Business Process",
stages: [
{
id: "init",
order: 1,
label: "Initialization",
steps: {
setup: {
id: "setup",
label: "Setup Data",
action: async (ctx) => ({ counter: 1 }),
},
},
},
{
id: "process",
order: 2,
label: "Main Processing",
steps: {
work: {
id: "work",
label: "Do Work",
action: async (ctx) => {
const counter = await ctx.use((c) => c.select((s) => s.counter));
return { counter: state.counter + 1 };
},
},
},
// Conditional routing after this stage completes
router: (state) => (state.counter > 5 ? "end" : "process"),
},
{
id: "end",
order: 3,
label: "Finalize",
steps: {
cleanup: {
id: "cleanup",
label: "Cleanup",
action: async () => ({}),
},
},
},
],
};
const factory = new PipelineFactory(definition, {
storeFactory: async (runId) => createStore(runId), // Return a DataStore instance
initialStateFactory: () => ({ counter: 0 }),
});
// Prepare and run
const context = await factory.prepare();
const result = await context.run();
if (result.ok && result.value.status === "succeeded") {
console.log("Pipeline finished:", result.value.finalState);
}Pause and Resume
A router can signal a pause, which suspends the pipeline and writes a checkpoint.
// Inside a stage definition:
router: (state) => {
if (state.needsApproval) {
return { pause: "processing-stage", timeout: 86400000 }; // 24h timeout
}
return undefined; // natural advance
};
// Later, to resume:
const result = await factory.resume(runId);
if (result.ok) {
const context = result.value;
await context.run();
}Sequential Pipeline (Simple)
For simpler, linear workflows without conditional routing or persistence, use the standard Pipeline class.
import { Pipeline, Result } from "@asaidimu/utils-pipeline";
const pipeline = new Pipeline([
{ key: "step1", order: 0, action: async () => Result.ok("First") },
{ key: "step2", order: 1, action: async () => Result.ok("Second") },
]);
const result = await pipeline.execute("step1", null);API Reference
PipelineFactory
prepare(entry?, runId?): Creates a newRunContext.resume(runId): Reconstructs aRunContextfrom a persisted checkpoint.
RunContext
run(): Starts or continues execution.abort(): Signals the pipeline to terminate at the next stage boundary.on(event, handler): Subscribes to lifecycle events.
Lifecycle Events
pipeline:start/pipeline:success/pipeline:failure/pipeline:pausedstage:start/stage:success/stage:failure/stage:pausedstep:start/step:success/step:failurerouter:evaluated
Best Practices
- Atomic Steps: Each step should perform a single, focused operation.
- Pure Store Interaction: Steps should ideally only interact with the world via the provided store and context.
- Handle Cancellation: Long-running steps should check
pcxt.signalor pass it to underlying async calls. - Descriptive Labels: Use human-readable labels for stages and steps as they appear in event paths.
License
MIT
