@kognitivedev/workflows
v0.2.11
Published
Graph-based workflow engine with suspend/resume and persistence
Maintainers
Readme
@kognitivedev/workflows
Graph-based workflow engine with suspend/resume, persistence, streaming, and visualization.
Table of Contents
- Installation
- Quick Start
- Steps
- Control Flow
- Suspend and Resume
- Streaming
- Retry Policies
- Error Handling
- Agent Steps
- Persistence
- Visualization
- State Management
- API Reference
Installation
bun add @kognitivedev/workflows zodQuick Start
import { createWorkflow, createStep } from "@kognitivedev/workflows";
const greet = createStep({
id: "greet",
execute: async (input: { name: string }) => ({ message: `Hello, ${input.name}!` }),
});
const workflow = createWorkflow({ name: "greeting" }).then(greet).build();
const result = await workflow.execute({ name: "Alice" });
console.log(result.output); // { message: "Hello, Alice!" }
console.log(result.status); // "completed"
console.log(result.runId); // unique run identifierSteps
A step is the smallest unit of work. Create one with createStep():
import { createStep } from "@kognitivedev/workflows";
import { z } from "zod";
const processOrder = createStep({
id: "process-order",
description: "Validate and process an incoming order",
inputSchema: z.object({ orderId: z.string(), amount: z.number() }),
outputSchema: z.object({ confirmed: z.boolean(), total: z.number() }),
execute: async (input, ctx) => {
return { confirmed: true, total: input.amount * 1.1 };
},
retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});The execute function receives the input and a StepContext:
ctx.state—Map<string, unknown>with outputs from all previous stepsctx.suspend(data?)— pauses execution, returns resume data when continuedctx.emit(event, data)— emits custom events for streamingctx.abortSignal—AbortSignalfor cancellationctx.resourceId— resource/project identifierctx.traceId— distributed tracing ID
Control Flow
Sequential
Chain steps with .then(). Each step receives the previous step's output:
const workflow = createWorkflow({ name: "transform" })
.then(addPrefix)
.then(addSuffix)
.build();Branching
Route execution based on a condition. Returns a key to select a branch:
const workflow = createWorkflow({ name: "router" })
.then(classifyStep)
.branch(
(input) => input.category, // returns "billing" or "support"
{ billing: billingStep, support: supportStep }
)
.build();If the key doesn't match any branch, the step is skipped and execution continues.
Parallel
Execute steps concurrently. All receive the same input, output is an array:
const workflow = createWorkflow({ name: "analyze" })
.parallel([sentimentStep, toxicityStep, languageStep])
.build();
const result = await workflow.execute({ text: "Hello" });
// result.output = [sentimentResult, toxicityResult, languageResult]ForEach
Apply a step to each item in an array:
const workflow = createWorkflow({ name: "batch" })
.foreach(
(input: number[]) => input, // selector extracts the array
doubleStep // applied to each item
)
.build();
const result = await workflow.execute([1, 2, 3]);
// result.output = [2, 4, 6]Nested Workflows
Embed a child workflow. Suspend/resume propagates to the parent:
const childWorkflow = createWorkflow({ name: "doubler" }).then(doubleStep).build();
const parentWorkflow = createWorkflow({ name: "parent" })
.then(addOneStep)
.workflow(childWorkflow) // optionally: .workflow(child, { id: "custom-id" })
.build();
const result = await parentWorkflow.execute({ value: 5 });
// 5 + 1 = 6, then 6 * 2 = 12Combining
All control flow operators can be freely combined:
const workflow = createWorkflow({ name: "pipeline" })
.then(validateOrder)
.parallel([checkInventory, calculateTax])
.then(mergeResults)
.branch((s) => s.status, { in_stock: fulfillStep, backordered: backorderStep })
.then(sendConfirmation)
.build();Suspend and Resume
Any step can pause execution by calling ctx.suspend(). The workflow returns with status: "suspended" and a snapshot. Resume later with new data:
const reviewStep = createStep({
id: "human-review",
execute: async (input, ctx) => {
const feedback = await ctx.suspend({ draft: input.draft });
return { feedback, draft: input.draft };
},
});
const workflow = createWorkflow({ name: "review" })
.then(draftStep)
.then(reviewStep)
.then(reviseStep)
.build();
// Start — suspends at reviewStep
const result = await workflow.execute({ topic: "AI agents" });
console.log(result.status); // "suspended"
console.log(result.snapshot); // WorkflowSnapshot with state + suspend data
// Resume with human feedback
const resumed = await workflow.resume(result.snapshot!, "Looks great, approved");
console.log(resumed.status); // "completed"
console.log(resumed.output); // revised resultWhen a nested workflow suspends, the parent also suspends. Resume on the parent and it routes to the child automatically.
Streaming
Use workflow.stream() to receive real-time events as steps complete:
const { result, stream } = await workflow.stream(input, {
streamModes: ["values", "debug", "custom"],
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value.event, value.data);
}
const finalResult = await result;Stream modes:
| Mode | Event | Description |
|------|-------|-------------|
| "values" | values | Full state snapshot after each step |
| "updates" | updates | Only the new step's output delta |
| "debug" | debug | Step entry/exit events with timing |
| "custom" | custom | Application events from ctx.emit() |
A result event is always emitted at the end.
Emitting custom events:
const step = createStep({
id: "process",
execute: async (input, ctx) => {
ctx.emit("progress", { percent: 50 });
// ...
ctx.emit("progress", { percent: 100 });
return result;
},
});Resume with streaming: workflow.resumeStream(snapshot, resumeData, options) works identically.
Retry Policies
Steps can automatically retry on failure with exponential backoff:
const step = createStep({
id: "flaky-api",
execute: async (input) => {
const res = await fetch(input.url);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});When retries occur, result.steps[i].retryMetadata contains { totalAttempts, attempts[] } with error details per attempt.
Error Handling
Step errors are caught and recorded — the workflow continues to the next node:
const result = await workflow.execute(input);
const step = result.steps[0];
if (step.status === "error") {
console.log(step.error!.message); // error message
console.log(step.error!.type); // "TypeError", "Error", etc.
console.log(step.error!.code); // custom error code if set
console.log(step.error!.stack); // full stack trace
}Agent Steps
agentStep() creates a step that invokes an AI agent (must have a generate() method):
import { agentStep } from "@kognitivedev/workflows";
// Text
const classifyStep = agentStep({
id: "classify",
agent: classifierAgent,
buildMessages: (input: { text: string }) => [
{ role: "user", content: `Classify: ${input.text}` },
],
});
// Vision / multimodal
const visionStep = agentStep({
id: "describe-image",
agent: visionAgent,
buildMessages: (input: { imageUrl: string }) => [{
role: "user",
content: [
{ type: "text", text: "Describe this image." },
{ type: "image", image: input.imageUrl },
],
}],
});
// Custom output extraction
const extractStep = agentStep({
id: "extract",
agent: extractorAgent,
buildMessages: (input) => [{ role: "user", content: input.text }],
extractOutput: (result) => JSON.parse(result.text),
});Default output is { text, toolCalls, usage?, steps?, runId? }.
Persistence
WorkflowStore Interface
interface WorkflowStore {
saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
getSnapshot(runId: string): Promise<WorkflowSnapshot | null>;
listSnapshots(workflowName: string): Promise<WorkflowSnapshot[]>;
deleteSnapshot(runId: string): Promise<void>;
saveEvent?(event: WorkflowTimelineEvent): Promise<void>;
}InMemoryWorkflowStore
For development and testing:
import { InMemoryWorkflowStore } from "@kognitivedev/workflows";
const store = new InMemoryWorkflowStore();
const result = await workflow.execute(input, { store });
await store.getSnapshot(result.runId);
await store.listSnapshots("my-workflow");
await store.deleteSnapshot(result.runId);
store.clear(); // remove all
console.log(store.size); // countPgWorkflowStore (PostgreSQL)
Production-ready persistence with Drizzle ORM. Auto-checkpoints at every step:
import { PgWorkflowStore } from "@kognitivedev/workflows";
const store = new PgWorkflowStore({ db, schema, projectId: "my-project" });
const result = await workflow.execute(input, { store });
// Browse checkpoints
const checkpoints = await store.getCheckpoints(result.runId);
const checkpoint = await store.getCheckpointAt(result.runId, 2);
// Rollback to step index 1 (deletes later checkpoints)
const snapshot = await store.rollbackToStep(result.runId, 1);
const resumed = await workflow.resume(snapshot, newData, { store });
// Run tracking
const run = await store.getRun(result.runId);
const runs = await store.listRuns("my-workflow", { status: "completed", limit: 10 });
// Timeline events (auto-emitted: "node_enter", "node_exit")
const events = await store.getEvents(result.runId);Visualization
toGraph() returns React Flow-compatible node/edge data:
const graph = workflow.toGraph();
console.log(graph.nodes);
// [{ id: "__start", type: "step", label: "Start" },
// { id: "step1", type: "step", label: "step1" },
// { id: "__end", type: "step", label: "End" }]
console.log(graph.edges);
// [{ source: "__start", target: "step1" },
// { source: "step1", target: "__end" }]Node types: step, branch, parallel, foreach, workflow. Branch edges include label with the branch key. Nested workflows include data.graph with the child's graph.
Lower-level: toGraphJSON(name, nodes) converts raw WorkflowNode[] directly.
State Management
Workflow state is a Map<string, unknown> that accumulates step outputs:
__input— the original workflow inputstepId— each step's output is stored under its ID
const step = createStep({
id: "send-welcome",
execute: async (input, ctx) => {
const user = ctx.state.get("fetch-user") as { name: string };
return { sent: true, to: user.name };
},
});State values must be JSON-serializable when using persistence.
API Reference
Exports
| Export | Type | Description |
|--------|------|-------------|
| createWorkflow(config) | function | Create a workflow builder |
| createStep(definition) | function | Create a step definition |
| agentStep(config) | function | Create an agent-powered step |
| executeWorkflowWithStream() | function | Low-level streaming execution |
| toGraphJSON(name, nodes) | function | Convert nodes to graph |
| InMemoryWorkflowStore | class | In-memory persistence |
| PgWorkflowStore | class | PostgreSQL persistence |
Workflow Methods
| Method | Returns | Description |
|--------|---------|-------------|
| execute(input, options?) | Promise<WorkflowRunResult> | Run workflow |
| stream(input, options?) | Promise<{ result, stream }> | Run with streaming |
| resume(snapshot, data?, options?) | Promise<WorkflowRunResult> | Resume suspended workflow |
| resumeStream(snapshot, data?, options?) | Promise<{ result, stream }> | Resume with streaming |
| toGraph() | WorkflowGraph | Get visualization graph |
Key Types
| Type | Description |
|------|-------------|
| StepDefinition<TInput, TOutput> | Step config: id, description?, inputSchema?, outputSchema?, execute, retryPolicy? |
| StepContext | Step runtime: state, suspend(), emit(), abortSignal, resourceId, traceId? |
| WorkflowRunResult<T> | Result: output, steps[], runId, status, snapshot? |
| StepResult | Per-step: stepId, output, durationMs, status, error?, retryMetadata? |
| WorkflowSnapshot | Suspended state: workflowName, runId, currentStepId, state, suspendedAt, resumeData? |
| WorkflowExecuteOptions | Options: runId?, store?, abortSignal?, resourceId?, traceId? |
| RetryPolicy | Retry config: maxRetries, baseDelayMs |
| CheckpointData | Checkpoint: stepId, stepIndex, stateSnapshot, stepStatus, error?, retryMetadata? |
| WorkflowRunData | Run record: workflowName, runId, status, input?, output?, totalSteps, totalDurationMs? |
| WorkflowGraph | Graph: nodes: GraphNode[], edges: GraphEdge[] |
| StreamMode | "values" \| "updates" \| "debug" \| "custom" |
License
MIT
