@verist/pipeline
v0.0.6
Published
Pipeline composition for Verist workflow steps
Maintainers
Readme
@verist/pipeline
Sequential pipeline composition for Verist workflow steps.
Installation
bun add @verist/pipeline @verist/coreUsage
import { z } from "zod";
import { defineStep, createContextFactory } from "@verist/core";
import { definePipeline, runPipeline } from "@verist/pipeline";
const parse = defineStep({
name: "parse",
input: z.object({ text: z.string() }),
delta: z.object({ markdown: z.string() }),
run: async (input) => ({
delta: { markdown: input.text },
events: [{ type: "parsed" }],
}),
});
const extract = defineStep({
name: "extract",
input: z.object({ markdown: z.string() }),
delta: z.object({ claims: z.array(z.string()) }),
run: async (input) => ({
delta: { claims: [input.markdown] },
events: [{ type: "extracted" }],
}),
});
const pipeline = definePipeline({
name: "process-document",
workflowVersion: "1.0.0",
stages: [
{ step: parse },
{ step: extract, wire: (prev) => ({ markdown: prev.markdown }) },
],
});
const result = await runPipeline({
pipeline,
input: { text: "Hello" },
contextFactory: createContextFactory({}),
workflowId: "doc-processing",
});
if (result.ok) {
console.log(result.output);
} else if (result.suspendedAt) {
console.log(`Suspended at ${result.suspendedAt}`);
} else {
console.log(`Failed at ${result.error.stepName}`);
}Stage Options
interface PipelineStageConfig {
step: Step<any, any, any>;
wire?: (prevDelta: unknown, pipelineInput: unknown) => unknown;
onError?: "fail" | "continue"; // default "fail"
}Behavior
- Stages execute sequentially and share the same
runId. - If
runIdis omitted, it defaults tocrypto.randomUUID()(Node 20+, Bun, Deno, browsers). - Control commands (
invoke,fanout) are not allowed and throw immediately. - Blocking commands (
suspend,review) stop the pipeline and setsuspendedAt. At most one blocking command per stage. onError: "continue"acknowledges the error and proceeds. The pipeline emits apipeline.stage_erroraudit event to maintain the evidence trail. The previous delta is carried forward, and the error is recorded inStageResult.error.
Result Shape
interface PipelineResult<TOutput = unknown> {
ok: boolean;
runId: string;
stages: StageResult[];
output?: TOutput;
error?: PipelineError;
suspendedAt?: string;
}