npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kognitivedev/workflows

v0.2.11

Published

Graph-based workflow engine with suspend/resume and persistence

Readme

@kognitivedev/workflows

Graph-based workflow engine with suspend/resume, persistence, streaming, and visualization.

Table of Contents


Installation

bun add @kognitivedev/workflows zod

Quick Start

import { createWorkflow, createStep } from "@kognitivedev/workflows";

const greet = createStep({
  id: "greet",
  execute: async (input: { name: string }) => ({ message: `Hello, ${input.name}!` }),
});

const workflow = createWorkflow({ name: "greeting" }).then(greet).build();

const result = await workflow.execute({ name: "Alice" });
console.log(result.output);  // { message: "Hello, Alice!" }
console.log(result.status);  // "completed"
console.log(result.runId);   // unique run identifier

Steps

A step is the smallest unit of work. Create one with createStep():

import { createStep } from "@kognitivedev/workflows";
import { z } from "zod";

const processOrder = createStep({
  id: "process-order",
  description: "Validate and process an incoming order",
  inputSchema: z.object({ orderId: z.string(), amount: z.number() }),
  outputSchema: z.object({ confirmed: z.boolean(), total: z.number() }),
  execute: async (input, ctx) => {
    return { confirmed: true, total: input.amount * 1.1 };
  },
  retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});

The execute function receives the input and a StepContext:

  • ctx.stateMap<string, unknown> with outputs from all previous steps
  • ctx.suspend(data?) — pauses execution, returns resume data when continued
  • ctx.emit(event, data) — emits custom events for streaming
  • ctx.abortSignalAbortSignal for cancellation
  • ctx.resourceId — resource/project identifier
  • ctx.traceId — distributed tracing ID

Control Flow

Sequential

Chain steps with .then(). Each step receives the previous step's output:

const workflow = createWorkflow({ name: "transform" })
  .then(addPrefix)
  .then(addSuffix)
  .build();

Branching

Route execution based on a condition. Returns a key to select a branch:

const workflow = createWorkflow({ name: "router" })
  .then(classifyStep)
  .branch(
    (input) => input.category,                         // returns "billing" or "support"
    { billing: billingStep, support: supportStep }
  )
  .build();

If the key doesn't match any branch, the step is skipped and execution continues.

Parallel

Execute steps concurrently. All receive the same input, output is an array:

const workflow = createWorkflow({ name: "analyze" })
  .parallel([sentimentStep, toxicityStep, languageStep])
  .build();

const result = await workflow.execute({ text: "Hello" });
// result.output = [sentimentResult, toxicityResult, languageResult]

ForEach

Apply a step to each item in an array:

const workflow = createWorkflow({ name: "batch" })
  .foreach(
    (input: number[]) => input,  // selector extracts the array
    doubleStep                    // applied to each item
  )
  .build();

const result = await workflow.execute([1, 2, 3]);
// result.output = [2, 4, 6]

Nested Workflows

Embed a child workflow. Suspend/resume propagates to the parent:

const childWorkflow = createWorkflow({ name: "doubler" }).then(doubleStep).build();

const parentWorkflow = createWorkflow({ name: "parent" })
  .then(addOneStep)
  .workflow(childWorkflow)           // optionally: .workflow(child, { id: "custom-id" })
  .build();

const result = await parentWorkflow.execute({ value: 5 });
// 5 + 1 = 6, then 6 * 2 = 12

Combining

All control flow operators can be freely combined:

const workflow = createWorkflow({ name: "pipeline" })
  .then(validateOrder)
  .parallel([checkInventory, calculateTax])
  .then(mergeResults)
  .branch((s) => s.status, { in_stock: fulfillStep, backordered: backorderStep })
  .then(sendConfirmation)
  .build();

Suspend and Resume

Any step can pause execution by calling ctx.suspend(). The workflow returns with status: "suspended" and a snapshot. Resume later with new data:

const reviewStep = createStep({
  id: "human-review",
  execute: async (input, ctx) => {
    const feedback = await ctx.suspend({ draft: input.draft });
    return { feedback, draft: input.draft };
  },
});

const workflow = createWorkflow({ name: "review" })
  .then(draftStep)
  .then(reviewStep)
  .then(reviseStep)
  .build();

// Start — suspends at reviewStep
const result = await workflow.execute({ topic: "AI agents" });
console.log(result.status);   // "suspended"
console.log(result.snapshot);  // WorkflowSnapshot with state + suspend data

// Resume with human feedback
const resumed = await workflow.resume(result.snapshot!, "Looks great, approved");
console.log(resumed.status);  // "completed"
console.log(resumed.output);  // revised result

When a nested workflow suspends, the parent also suspends. Resume on the parent and it routes to the child automatically.


Streaming

Use workflow.stream() to receive real-time events as steps complete:

const { result, stream } = await workflow.stream(input, {
  streamModes: ["values", "debug", "custom"],
});

const reader = stream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value.event, value.data);
}

const finalResult = await result;

Stream modes:

| Mode | Event | Description | |------|-------|-------------| | "values" | values | Full state snapshot after each step | | "updates" | updates | Only the new step's output delta | | "debug" | debug | Step entry/exit events with timing | | "custom" | custom | Application events from ctx.emit() |

A result event is always emitted at the end.

Emitting custom events:

const step = createStep({
  id: "process",
  execute: async (input, ctx) => {
    ctx.emit("progress", { percent: 50 });
    // ...
    ctx.emit("progress", { percent: 100 });
    return result;
  },
});

Resume with streaming: workflow.resumeStream(snapshot, resumeData, options) works identically.


Retry Policies

Steps can automatically retry on failure with exponential backoff:

const step = createStep({
  id: "flaky-api",
  execute: async (input) => {
    const res = await fetch(input.url);
    if (!res.ok) throw new Error(`HTTP ${res.status}`);
    return res.json();
  },
  retryPolicy: { maxRetries: 3, baseDelayMs: 500 },
});

When retries occur, result.steps[i].retryMetadata contains { totalAttempts, attempts[] } with error details per attempt.


Error Handling

Step errors are caught and recorded — the workflow continues to the next node:

const result = await workflow.execute(input);
const step = result.steps[0];

if (step.status === "error") {
  console.log(step.error!.message);  // error message
  console.log(step.error!.type);     // "TypeError", "Error", etc.
  console.log(step.error!.code);     // custom error code if set
  console.log(step.error!.stack);    // full stack trace
}

Agent Steps

agentStep() creates a step that invokes an AI agent (must have a generate() method):

import { agentStep } from "@kognitivedev/workflows";

// Text
const classifyStep = agentStep({
  id: "classify",
  agent: classifierAgent,
  buildMessages: (input: { text: string }) => [
    { role: "user", content: `Classify: ${input.text}` },
  ],
});

// Vision / multimodal
const visionStep = agentStep({
  id: "describe-image",
  agent: visionAgent,
  buildMessages: (input: { imageUrl: string }) => [{
    role: "user",
    content: [
      { type: "text", text: "Describe this image." },
      { type: "image", image: input.imageUrl },
    ],
  }],
});

// Custom output extraction
const extractStep = agentStep({
  id: "extract",
  agent: extractorAgent,
  buildMessages: (input) => [{ role: "user", content: input.text }],
  extractOutput: (result) => JSON.parse(result.text),
});

Default output is { text, toolCalls, usage?, steps?, runId? }.


Persistence

WorkflowStore Interface

interface WorkflowStore {
  saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
  getSnapshot(runId: string): Promise<WorkflowSnapshot | null>;
  listSnapshots(workflowName: string): Promise<WorkflowSnapshot[]>;
  deleteSnapshot(runId: string): Promise<void>;
  saveEvent?(event: WorkflowTimelineEvent): Promise<void>;
}

InMemoryWorkflowStore

For development and testing:

import { InMemoryWorkflowStore } from "@kognitivedev/workflows";

const store = new InMemoryWorkflowStore();
const result = await workflow.execute(input, { store });

await store.getSnapshot(result.runId);
await store.listSnapshots("my-workflow");
await store.deleteSnapshot(result.runId);
store.clear();        // remove all
console.log(store.size); // count

PgWorkflowStore (PostgreSQL)

Production-ready persistence with Drizzle ORM. Auto-checkpoints at every step:

import { PgWorkflowStore } from "@kognitivedev/workflows";

const store = new PgWorkflowStore({ db, schema, projectId: "my-project" });

const result = await workflow.execute(input, { store });

// Browse checkpoints
const checkpoints = await store.getCheckpoints(result.runId);
const checkpoint = await store.getCheckpointAt(result.runId, 2);

// Rollback to step index 1 (deletes later checkpoints)
const snapshot = await store.rollbackToStep(result.runId, 1);
const resumed = await workflow.resume(snapshot, newData, { store });

// Run tracking
const run = await store.getRun(result.runId);
const runs = await store.listRuns("my-workflow", { status: "completed", limit: 10 });

// Timeline events (auto-emitted: "node_enter", "node_exit")
const events = await store.getEvents(result.runId);

Visualization

toGraph() returns React Flow-compatible node/edge data:

const graph = workflow.toGraph();

console.log(graph.nodes);
// [{ id: "__start", type: "step", label: "Start" },
//  { id: "step1", type: "step", label: "step1" },
//  { id: "__end", type: "step", label: "End" }]

console.log(graph.edges);
// [{ source: "__start", target: "step1" },
//  { source: "step1", target: "__end" }]

Node types: step, branch, parallel, foreach, workflow. Branch edges include label with the branch key. Nested workflows include data.graph with the child's graph.

Lower-level: toGraphJSON(name, nodes) converts raw WorkflowNode[] directly.


State Management

Workflow state is a Map<string, unknown> that accumulates step outputs:

  • __input — the original workflow input
  • stepId — each step's output is stored under its ID
const step = createStep({
  id: "send-welcome",
  execute: async (input, ctx) => {
    const user = ctx.state.get("fetch-user") as { name: string };
    return { sent: true, to: user.name };
  },
});

State values must be JSON-serializable when using persistence.


API Reference

Exports

| Export | Type | Description | |--------|------|-------------| | createWorkflow(config) | function | Create a workflow builder | | createStep(definition) | function | Create a step definition | | agentStep(config) | function | Create an agent-powered step | | executeWorkflowWithStream() | function | Low-level streaming execution | | toGraphJSON(name, nodes) | function | Convert nodes to graph | | InMemoryWorkflowStore | class | In-memory persistence | | PgWorkflowStore | class | PostgreSQL persistence |

Workflow Methods

| Method | Returns | Description | |--------|---------|-------------| | execute(input, options?) | Promise<WorkflowRunResult> | Run workflow | | stream(input, options?) | Promise<{ result, stream }> | Run with streaming | | resume(snapshot, data?, options?) | Promise<WorkflowRunResult> | Resume suspended workflow | | resumeStream(snapshot, data?, options?) | Promise<{ result, stream }> | Resume with streaming | | toGraph() | WorkflowGraph | Get visualization graph |

Key Types

| Type | Description | |------|-------------| | StepDefinition<TInput, TOutput> | Step config: id, description?, inputSchema?, outputSchema?, execute, retryPolicy? | | StepContext | Step runtime: state, suspend(), emit(), abortSignal, resourceId, traceId? | | WorkflowRunResult<T> | Result: output, steps[], runId, status, snapshot? | | StepResult | Per-step: stepId, output, durationMs, status, error?, retryMetadata? | | WorkflowSnapshot | Suspended state: workflowName, runId, currentStepId, state, suspendedAt, resumeData? | | WorkflowExecuteOptions | Options: runId?, store?, abortSignal?, resourceId?, traceId? | | RetryPolicy | Retry config: maxRetries, baseDelayMs | | CheckpointData | Checkpoint: stepId, stepIndex, stateSnapshot, stepStatus, error?, retryMetadata? | | WorkflowRunData | Run record: workflowName, runId, status, input?, output?, totalSteps, totalDurationMs? | | WorkflowGraph | Graph: nodes: GraphNode[], edges: GraphEdge[] | | StreamMode | "values" \| "updates" \| "debug" \| "custom" |


License

MIT