@kognitivedev/workflows
v0.2.28
Published
Graph-based workflow engine with suspend/resume and persistence
Maintainers
Readme
@kognitivedev/workflows
Graph-based workflow engine with suspend/resume, persistence, streaming, and visualization.
Table of Contents
- Installation
- Quick Start
- Steps
- Control Flow
- Suspend and Resume
- Streaming
- Retry Policies
- Error Handling
- Agent Steps
- Persistence
- Visualization
- State Management
- API Reference
Installation
bun add @kognitivedev/workflows zodQuick Start
import { createWorkflow, createStep } from "@kognitivedev/workflows";
const greet = createStep({
id: "greet",
execute: async (input: { name: string }) => ({ message: `Hello, ${input.name}!` }),
});
const workflow = createWorkflow({ name: "greeting" }).then(greet).build();
const result = await workflow.execute({ name: "Alice" });
console.log(result.output); // { message: "Hello, Alice!" }
console.log(result.status); // "completed"
console.log(result.runId); // unique run identifierSteps
A step is the smallest unit of work. Create one with createStep():
import { createStep } from "@kognitivedev/workflows";
import { z } from "zod";
const processOrder = createStep({
id: "process-order",
description: "Validate and process an incoming order",
inputSchema: z.object({ orderId: z.string(), amount: z.number() }),
outputSchema: z.object({ confirmed: z.boolean(), total: z.number() }),
execute: async (input, ctx) => {
return { confirmed: true, total: input.amount * 1.1 };
},
retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});The execute function receives the input and a StepContext:
ctx.state—Map<string, unknown>with outputs from all previous stepsctx.suspend(data?)— pauses execution, returns resume data when continuedctx.emit(event, data)— emits custom events for streamingctx.abortSignal—AbortSignalfor cancellationctx.resourceId— resource/project identifierctx.traceId— distributed tracing ID
Control Flow
Sequential
Chain steps with .then(). Each step receives the previous step's output:
const workflow = createWorkflow({ name: "transform" })
.then(addPrefix)
.then(addSuffix)
.build();Branching
Route execution based on a condition. Returns a key to select a branch:
const workflow = createWorkflow({ name: "router" })
.then(classifyStep)
.branch(
(input) => input.category, // returns "billing" or "support"
{ billing: billingStep, support: supportStep }
)
.build();If the key doesn't match any branch, the step is skipped and execution continues.
Parallel
Execute steps concurrently. All receive the same input, output is an array:
const workflow = createWorkflow({ name: "analyze" })
.parallel([sentimentStep, toxicityStep, languageStep])
.build();
const result = await workflow.execute({ text: "Hello" });
// result.output = [sentimentResult, toxicityResult, languageResult]ForEach
Apply a step to each item in an array:
const workflow = createWorkflow({ name: "batch" })
.foreach(
(input: number[]) => input, // selector extracts the array
doubleStep // applied to each item
)
.build();
const result = await workflow.execute([1, 2, 3]);
// result.output = [2, 4, 6]Nested Workflows
Embed a child workflow. Suspend/resume propagates to the parent:
const childWorkflow = createWorkflow({ name: "doubler" }).then(doubleStep).build();
const parentWorkflow = createWorkflow({ name: "parent" })
.then(addOneStep)
.workflow(childWorkflow) // optionally: .workflow(child, { id: "custom-id" })
.build();
const result = await parentWorkflow.execute({ value: 5 });
// 5 + 1 = 6, then 6 * 2 = 12Combining
All control flow operators can be freely combined:
const workflow = createWorkflow({ name: "pipeline" })
.then(validateOrder)
.parallel([checkInventory, calculateTax])
.then(mergeResults)
.branch((s) => s.status, { in_stock: fulfillStep, backordered: backorderStep })
.then(sendConfirmation)
.build();Suspend and Resume
Any step can pause execution by calling ctx.suspend(). The workflow returns with status: "suspended" and a snapshot. Resume later with new data:
const reviewStep = createStep({
id: "human-review",
execute: async (input, ctx) => {
const feedback = await ctx.suspend({ draft: input.draft });
return { feedback, draft: input.draft };
},
});
const workflow = createWorkflow({ name: "review" })
.then(draftStep)
.then(reviewStep)
.then(reviseStep)
.build();
// Start — suspends at reviewStep
const result = await workflow.execute({ topic: "AI agents" });
console.log(result.status); // "suspended"
console.log(result.snapshot); // WorkflowSnapshot with state + suspend data
// Resume with human feedback
const resumed = await workflow.resume(result.snapshot!, "Looks great, approved");
console.log(resumed.status); // "completed"
console.log(resumed.output); // revised resultWhen a nested workflow suspends, the parent also suspends. Resume on the parent and it routes to the child automatically.
Streaming
Use workflow.stream() to receive real-time events as steps complete:
const { result, stream } = await workflow.stream(input, {
streamModes: ["values", "debug", "custom"],
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value.event, value.data);
}
const finalResult = await result;Stream modes:
| Mode | Event | Description |
|------|-------|-------------|
| "values" | values | Full state snapshot after each step |
| "updates" | updates | Only the new step's output delta |
| "debug" | debug | Step entry/exit events with timing |
| "custom" | custom | Application events from ctx.emit() |
A result event is always emitted at the end.
Emitting custom events:
const step = createStep({
id: "process",
execute: async (input, ctx) => {
ctx.emit("progress", { percent: 50 });
// ...
ctx.emit("progress", { percent: 100 });
return result;
},
});Resume with streaming: workflow.resumeStream(snapshot, resumeData, options) works identically.
Retry Policies
Steps can automatically retry on failure with exponential backoff:
const step = createStep({
id: "flaky-api",
execute: async (input) => {
const res = await fetch(input.url);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});When retries occur, result.steps[i].retryMetadata contains { totalAttempts, attempts[] } with error details per attempt.
Error Handling
Step errors are caught and recorded — the workflow continues to the next node:
const result = await workflow.execute(input);
const step = result.steps[0];
if (step.status === "error") {
console.log(step.error!.message); // error message
console.log(step.error!.type); // "TypeError", "Error", etc.
console.log(step.error!.code); // custom error code if set
console.log(step.error!.stack); // full stack trace
}Agent Steps
agentStep() creates a step that invokes an AI agent (must have a generate() method):
import { agentStep } from "@kognitivedev/workflows";
// Text
const classifyStep = agentStep({
id: "classify",
agent: classifierAgent,
buildMessages: (input: { text: string }) => [
{ role: "user", content: `Classify: ${input.text}` },
],
});
// Vision / multimodal
const visionStep = agentStep({
id: "describe-image",
agent: visionAgent,
buildMessages: (input: { imageUrl: string }) => [{
role: "user",
content: [
{ type: "text", text: "Describe this image." },
{ type: "image", image: input.imageUrl },
],
}],
});
// Custom output extraction
const extractStep = agentStep({
id: "extract",
agent: extractorAgent,
buildMessages: (input) => [{ role: "user", content: input.text }],
extractOutput: (result) => JSON.parse(result.text),
});Default output is { text, toolCalls, usage?, steps?, runId? }.
OCR Steps
createOcrStep() creates a workflow step that uploads a file to the managed documents API and waits for a parsing result.
import { createOcrStep } from "@kognitivedev/workflows";
const ocrStep = createOcrStep<{ data: Uint8Array; filename: string; mimeType?: string }, { text: string }>({
id: "ocr-cloud",
buildSource: (input) => ({
data: input.data,
filename: input.filename,
mimeType: input.mimeType,
}),
clientConfig: {
baseUrl: "http://localhost:3001",
apiKey: process.env.KOGNITIVE_API_KEY,
},
cloud: {
tier: "cost_effective",
},
});The step uses the cloud documents SDK and does not perform local OCR.
Cloud mode requires apiKey and sends the file as multipart form data to the OCR endpoint.
Persistence
WorkflowStore Interface
interface WorkflowStore {
saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
getSnapshot(runId: string): Promise<WorkflowSnapshot | null>;
listSnapshots(workflowName: string): Promise<WorkflowSnapshot[]>;
deleteSnapshot(runId: string): Promise<void>;
saveEvent?(event: WorkflowTimelineEvent): Promise<void>;
}InMemoryWorkflowStore
For development and testing:
import { InMemoryWorkflowStore } from "@kognitivedev/workflows";
const store = new InMemoryWorkflowStore();
const result = await workflow.execute(input, { store });
await store.getSnapshot(result.runId);
await store.listSnapshots("my-workflow");
await store.deleteSnapshot(result.runId);
store.clear(); // remove all
console.log(store.size); // countPgWorkflowStore (PostgreSQL)
Production-ready persistence with Drizzle ORM. Auto-checkpoints at every step:
import { PgWorkflowStore } from "@kognitivedev/workflows";
const store = new PgWorkflowStore({ db, schema, projectId: "my-project" });
const result = await workflow.execute(input, { store });
// Browse checkpoints
const checkpoints = await store.getCheckpoints(result.runId);
const checkpoint = await store.getCheckpointAt(result.runId, 2);
// Rollback to step index 1 (deletes later checkpoints)
const snapshot = await store.rollbackToStep(result.runId, 1);
const resumed = await workflow.resume(snapshot, newData, { store });
// Run tracking
const run = await store.getRun(result.runId);
const runs = await store.listRuns("my-workflow", { status: "completed", limit: 10 });
// Timeline events (auto-emitted: "node_enter", "node_exit")
const events = await store.getEvents(result.runId);Visualization
toGraph() returns React Flow-compatible node/edge data:
const graph = workflow.toGraph();
console.log(graph.nodes);
// [{ id: "__start", type: "step", label: "Start" },
// { id: "step1", type: "step", label: "step1" },
// { id: "__end", type: "step", label: "End" }]
console.log(graph.edges);
// [{ source: "__start", target: "step1" },
// { source: "step1", target: "__end" }]Node types: step, branch, parallel, foreach, workflow. Branch edges include label with the branch key. Nested workflows include data.graph with the child's graph.
Lower-level: toGraphJSON(name, nodes) converts raw WorkflowNode[] directly.
State Management
Workflow state is a Map<string, unknown> that accumulates step outputs:
__input— the original workflow inputstepId— each step's output is stored under its ID
const step = createStep({
id: "send-welcome",
execute: async (input, ctx) => {
const user = ctx.state.get("fetch-user") as { name: string };
return { sent: true, to: user.name };
},
});State values must be JSON-serializable when using persistence.
API Reference
Exports
| Export | Type | Description |
|--------|------|-------------|
| createWorkflow(config) | function | Create a workflow builder |
| createStep(definition) | function | Create a step definition |
| agentStep(config) | function | Create an agent-powered step |
| createOcrStep(config) | function | Create an OCR step (manual processor or cloud extraction) |
| executeWorkflowWithStream() | function | Low-level streaming execution |
| toGraphJSON(name, nodes) | function | Convert nodes to graph |
| InMemoryWorkflowStore | class | In-memory persistence |
| PgWorkflowStore | class | PostgreSQL persistence |
Workflow Methods
| Method | Returns | Description |
|--------|---------|-------------|
| execute(input, options?) | Promise<WorkflowRunResult> | Run workflow |
| stream(input, options?) | Promise<{ result, stream }> | Run with streaming |
| resume(snapshot, data?, options?) | Promise<WorkflowRunResult> | Resume suspended workflow |
| resumeStream(snapshot, data?, options?) | Promise<{ result, stream }> | Resume with streaming |
| toGraph() | WorkflowGraph | Get visualization graph |
Key Types
| Type | Description |
|------|-------------|
| StepDefinition<TInput, TOutput> | Step config: id, description?, inputSchema?, outputSchema?, execute, retryPolicy? |
| StepContext | Step runtime: state, suspend(), emit(), abortSignal, resourceId, traceId? |
| WorkflowRunResult<T> | Result: output, steps[], runId, status, snapshot? |
| StepResult | Per-step: stepId, output, durationMs, status, error?, retryMetadata? |
| WorkflowSnapshot | Suspended state: workflowName, runId, currentStepId, state, suspendedAt, resumeData? |
| WorkflowExecuteOptions | Options: runId?, store?, abortSignal?, resourceId?, traceId? |
| RetryPolicy | Retry config: maxRetries, baseDelayMs |
| CheckpointData | Checkpoint: stepId, stepIndex, stateSnapshot, stepStatus, error?, retryMetadata? |
| WorkflowRunData | Run record: workflowName, runId, status, input?, output?, totalSteps, totalDurationMs? |
| WorkflowGraph | Graph: nodes: GraphNode[], edges: GraphEdge[] |
| StreamMode | "values" \| "updates" \| "debug" \| "custom" |
License
MIT
