npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

orinocoflow

v0.3.3

Published

Minimalist TypeScript workflow engine for AI pipelines

Readme

orinocoflow

Minimalist TypeScript workflow engine for AI pipelines. Define workflows as JSON, supply handlers, get streaming execution events.

Orinocoflow birdy

What do you get?

  • Workflows as data — JSON/YAML files you can diff, version, visualise, and hand to an LLM
  • Versioned schema — optional orinocoflow_version on each workflow for engine compatibility; omit it if you do not need an explicit marker. (A generic version key is not read — use orinocoflow_version.)
  • No framework lock-in — zero AI dependencies; handlers are plain async functions
  • HITL without a database — suspend to a JSON snapshot, store it wherever you want, resume later
  • CLI toolingoflow viz, oflow compile, oflow simulate work without writing code
  • Declarative routing — conditional edges are data expressions, not hidden function logic
  • Built-in retry escalationmaxRetries + onExhausted on any conditional edge, tracked automatically
  • Node specs — optional YAML/JSON files that describe what each node type expects and produces; useful for documentation, test scaffolding, and LLM-assisted workflow authoring

Examples: Full runnable scripts, CLI commands, and file-by-file notes are in README-examples.md.

Install

Requirements: Node.js 18+.

npm install orinocoflow

Quick start

After npm install orinocoflow, save as main.mts (or main.ts with "module": "NodeNext" in tsconfig) and run with npx tsx main.mts or compile with tsc and run with node:

import { parse, runWorkflow } from "orinocoflow";

const workflow = parse({
  orinocoflow_version: "1.0",
  graph_id: "my_pipeline",
  entry_point: "fetch",
  nodes: [
    { id: "fetch", type: "integration" },
    { id: "draft", type: "llm" },
    { id: "publish", type: "local_script" },
  ],
  edges: [
    { from: "fetch", to: "draft", type: "standard" },
    { from: "draft", to: "publish", type: "standard" },
  ],
});

const result = await runWorkflow(workflow, { url: "https://example.com/docs/guide" }, {
  handlers: {
    integration: async (_node, state) => {
      const u = new URL(String(state.url));
      return { ...state, host: u.hostname, path: u.pathname || "/" };
    },
    llm: async (_node, state) => ({
      ...state,
      draft: `Page on ${state.host}: path has ${String(state.path).length} chars (${state.path})`,
    }),
    local_script: async (_node, state) => {
      console.log("Publishing →", state.draft);
      return { ...state, published: true };
    },
  },
});

if (result.status === "completed") {
  console.log("Final state:", result.state);
}

From a git clone of this repo (not from the npm tarball): the same logic lives in examples/quick-start.ts, which imports from the local src tree for development — run npx tsx examples/quick-start.ts from the repository root after npm install. For file-by-file notes and more demos, see README-examples.md.

Programmatic compile (files on disk)

The package exports orinocoflow/compile for loading workflows without the CLI:

import { compileFile, transformYamlToWorkflow } from "orinocoflow/compile";

const workflow = await compileFile("pipeline.yaml");
// or: transformYamlToWorkflow(parsedYamlDoc)  // object already in memory

compileFile accepts .yaml, .yml, or .json paths and returns a typed Workflow.

Run the examples

Examples are not in the npm tarball; clone the repo, npm install, and use cwd at the repo root. Full commands, file lists, and behavior notes: README-examples.md (includes quick-start, parallel, HN Roast, PR pipeline, odt-pipeline + mock.yaml, and node-specs/).

Illustrative CLI usage (after npm run build, oflow works like npx tsx src/cli/index.ts during development):

oflow compile examples/odt-pipeline.yaml
oflow viz examples/odt-pipeline.yaml
oflow simulate examples/odt-pipeline.yaml examples/mock.yaml

Mock data for simulate

oflow simulate <workflow> <mock-file> expects YAML or JSON with a top-level handlers object. Each key is a node id; the value is an object merged into workflow state when that node runs.

  • First visit to a node uses the key equal to its id (e.g. verify:).
  • Nth visit (loops, retries): use nodeId.N (e.g. verify.2:) to override state on the second invocation. See examples/mock.yaml and README-examples.md.

Streaming

Two interfaces — pick whichever fits your code style.

Callback (onEvent) — pass a handler in options, execution proceeds normally:

const result = await runWorkflow(workflow, initialState, {
  handlers,
  onEvent(event) {
    switch (event.type) {
      case "workflow_start":  console.log(`run ${event.workflowId} from ${event.entryPoint}`); break;
      case "node_start":    console.log(`starting ${event.nodeId}`); break;
      case "node_complete": console.log(`done in ${event.durationMs}ms`); break;
      case "edge_taken":    console.log(`→ ${event.from} to ${event.to}`); break;
      case "parallel_fork": console.log(`parallel ${event.from} → [${event.targets.join(", ")}] → ${event.join}`); break;
      case "parallel_join": console.log(`parallel join ${event.join}`); break;
      case "parallel_branch_error": console.error(`branch ${event.branchEntry}:`, event.error); break;
      case "workflow_complete": console.log("final state:", event.finalState); break;
      case "workflow_suspended": console.log(`suspended at ${event.nodeId}`); break;
      case "workflow_resume": console.log("resuming after interrupt"); break;
      case "error":         console.error(event.error); break;
    }
  },
});

Async iterable (runWorkflowStream) — iterate events with for await. If the run aborts or hits a fatal error, the async iterator rejects (same errors as runWorkflow, e.g. WorkflowAbortedError).

import { parse, runWorkflowStream } from "orinocoflow";

for await (const event of runWorkflowStream(workflow, initialState, { handlers })) {
  switch (event.type) {
    case "workflow_start":  console.log(`run ${event.workflowId} from ${event.entryPoint}`); break;
    case "node_start":    console.log(`starting ${event.nodeId}`); break;
    case "node_complete": console.log(`done in ${event.durationMs}ms`); break;
    case "edge_taken":    console.log(`→ ${event.from} to ${event.to}`); break;
    case "parallel_fork": console.log(`parallel ${event.from} → [${event.targets.join(", ")}] → ${event.join}`); break;
    case "parallel_join": console.log(`parallel join ${event.join}`); break;
    case "parallel_branch_error": console.error(`branch ${event.branchEntry}:`, event.error); break;
    case "workflow_complete": console.log("final state:", event.finalState); break;
    case "workflow_suspended": console.log(`suspended at ${event.nodeId}`); break;
    case "workflow_resume": console.log("resuming after interrupt"); break;
    case "error":         console.error(event.error); break;
  }
}

JSON schema

Workflow

{
  "orinocoflow_version": "1.0",
  "graph_id": "my_pipeline",
  "entry_point": "node_id",
  "nodes": [ ... ],
  "edges": [ ... ]
}

orinocoflow_version is optional; when present it documents which engine/schema shape you target. Required fields are graph_id, entry_point, nodes, and edges.

Node

Any object with id and type. Extra fields are passed through to your handler.

{ "id": "draft_content", "type": "llm", "model": "gpt-4" }

Built-in node type: "sub_workflow" — requires a workflow_id field pointing to an entry in the registry option.

Edges

Standard — always routes to to:

{ "from": "fetch", "to": "draft", "type": "standard" }

Conditional — evaluates condition against the current state, routes to true or false branch:

{
  "from": "draft",
  "type": "conditional",
  "condition": { "field": "confidence_score", "operator": "<", "value": 75 },
  "routes": { "true": "human_review", "false": "publish" }
}

Supported operators: < > <= >= === !== includes startsWith endsWith

Parallel (fork / join) — run multiple standard-only linear branches concurrently, then continue once at join:

{
  "from": "fanout",
  "type": "parallel",
  "targets": ["branch_a", "branch_b"],
  "join": "merge"
}

Each target must reach join through a straight chain of standard edges (no conditionals or nested parallel on those paths). Only those branch tips may have a standard edge into join — no other shortcuts. compile / validateParallelWorkflow enforce this. Branches use structuredClone of state; use parallelMerge: "strict" (default, top-level key conflicts throw) or "overwrite". On the first branch failure, sibling branches are aborted between nodes (best-effort); the trace may include parallel_branch_error. If a branch does not converge to join at runtime, the engine throws ParallelBranchDidNotConvergeError.

Node spec (optional)

A node spec describes the contract for a node type — what config it accepts, what state fields it reads, and what it writes. Specs are purely documentary; the runtime does not enforce them.

node_type: fetch
description: Fetches the top HN story and its top comments
outputs:
  - name: story_title
    type: string
    description: Title of the top HN story
  - name: comments
    type: array
    description: Up to 5 top comments as plain strings
node_type: llm
description: Drafts a roast of the story using Claude
config:
  model:
    type: string
    required: false
    description: Model override (defaults to claude-haiku)
inputs:
  - name: story_title
    type: string
    required: true
  - name: comments
    type: array
    required: true
outputs:
  - name: roast
    type: string
    description: Short punchy hot take

Parse and validate a spec with parseNodeSpec:

import { parseNodeSpec } from "orinocoflow";
import { parse as yamlParse } from "yaml";
import { readFileSync } from "node:fs";

const spec = parseNodeSpec(yamlParse(readFileSync("node-specs/llm.yaml", "utf8")));
console.log(spec.node_type);   // "llm"
console.log(spec.outputs);     // [{ name: "roast", type: "string", ... }]

See examples/node-specs/ for complete examples covering the fetch, llm, interrupt, and output node types from the HN Roast workflow.

API

// Parse raw JSON/YAML into a typed Workflow
parse(raw: unknown): Workflow

// Validate parallel regions and single-outgoing rule (also runs inside compile / runWorkflow)
validateParallelWorkflow(workflow: Workflow): void

// Parse raw JSON/YAML into a typed NodeSpec
parseNodeSpec(raw: unknown): NodeSpec

// Run workflow — returns WorkflowResult (completed or suspended)
runWorkflow(workflow, initialState, options): Promise<WorkflowResult>

// Resume a suspended workflow from a SuspendedExecution snapshot
resumeWorkflow(snapshot, resumeOptions): Promise<WorkflowResult>

// Stream workflow events via for-await
runWorkflowStream(workflow, initialState, options): AsyncIterable<WorkflowEvent>

WorkflowResult

Both runWorkflow and resumeWorkflow return a result with a trace: an array of every WorkflowEvent emitted during that call (useful for logging, tests, or UI replay).

  • { status: "completed"; state; trace }
  • { status: "suspended"; snapshot; trace }

RunOptions

{
  handlers:  Record<string, (node: WorkflowNode, state: WorkflowState) => Promise<WorkflowState>>;
  registry?: Record<string, unknown>;              // workflow_id → raw JSON for sub_workflow nodes
  maxSteps?: number;                               // cycle guard, default 1000
  signal?:   AbortSignal;                          // cancellation
  onEvent?:  (event: WorkflowEvent) => void;       // callback-based streaming
  parallelMerge?: "strict" | "overwrite";           // after parallel fork, default "strict"
}

Handlers are matched by node.type, falling back to node.id.

ResumeOptions

Same execution knobs as RunOptions where relevant, plus optional state to merge onto the snapshot:

{
  handlers:  Record<string, (node: WorkflowNode, state: WorkflowState) => Promise<WorkflowState>>;
  state?:     WorkflowState;                       // merged on top of snapshot.state (wins on key clashes)
  registry?: Record<string, unknown>;
  maxSteps?: number;
  signal?:   AbortSignal;
  onEvent?:  (event: WorkflowEvent) => void;
}

Advanced / low-level exports

For custom tooling or tests, the package also exports error classes (NodeNotFoundError, HandlerError, WorkflowCycleError, WorkflowAbortedError, InvalidEdgeError, SubWorkflowNotFoundError, WorkflowConfigurationError), Zod schemas (WorkflowSchema, EdgeSchema, …), and router helpers evaluateOperator / resolveNextNode.

Sub-workflows

Register sub-workflow JSON in registry, then reference via workflow_id on a sub_workflow node. The child workflow streams events with the parent node ID as a prefix ("parent_node/child_node"), and its final state merges into the parent's state.

await runWorkflow(mainWorkflow, {}, {
  handlers,
  registry: {
    "review_context_01": reviewContextJson,
  },
});

Human-in-the-loop (pause / resume)

orinocoflow does not store sessions. Add an interrupt node anywhere in your workflow to pause execution. runWorkflow returns { status: "suspended", snapshot } — a plain JSON-serializable SuspendedExecution object. You are responsible for storing it and retrieving it when the external event arrives.

The SessionStore interface defines the contract (get / set / delete by session id). orinocoflow ships MemorySessionStore (Map-backed) for dev/testing.

Resume with merged state:

const resumed = await resumeWorkflow(snapshot, {
  handlers,
  state: { approved: true }, // merged onto snapshot.state
});

interrupt nodes require no handler — the engine handles suspension automatically. When a run suspends at an interrupt that was reached via a single edge, the snapshot includes enteredViaEdge mirroring the last edge_taken into that node (unprefixed from/to, edge type, and any retry metadata). A full store-and-resume walkthrough (workflow JSON + parse + runWorkflow + MemorySessionStore) is in README-examples.md — Human-in-the-loop pattern. A live demo with network + Claude: HN Roast.

Retry limits on conditional edges

Add maxRetries and onExhausted to any ConditionalEdge to automatically escalate after N loopbacks:

{
  "from": "qe",
  "type": "conditional",
  "condition": { "field": "passed", "operator": "===", "value": true },
  "routes": { "true": "done", "false": "coder" },
  "maxRetries": 3,
  "onExhausted": "human_review"
}

The engine tracks retry counts in state.__retries__ (a reserved namespace). Once the loopback branch has been taken maxRetries times, routing falls through to onExhausted instead. Retry state survives pause/resume cycles automatically since it lives in the workflow state.

The edge_taken event includes retriesExhausted: true and onExhausted: "<nodeId>" when the limit is hit.

Development

From a clone of this repo:

npm install
npm run build    # generates dist/; required for bin/oflow and local package resolution
npm test         # vitest
npm run typecheck

The published npm tarball contains dist/, README.md, and README-examples.md (prepublishOnly runs build). Contributors use npx tsx … paths as in Run the examples when iterating without reinstalling.

Cancellation

const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);

await runWorkflow(workflow, state, { handlers, signal: controller.signal });
// throws WorkflowAbortedError if cancelled mid-run