npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@bratsos/workflow-engine

v0.7.0

Published

Type-safe, distributed workflow engine for AI-orchestrated processes with suspend/resume, parallel execution, and cost tracking

Readme

@bratsos/workflow-engine

A type-safe, distributed workflow engine for AI-orchestrated processes. Features long-running job support, suspend/resume semantics, parallel execution, and integrated AI cost tracking.


Table of Contents


Features

| Feature | Description | |---------|-------------| | Type-Safe | Full TypeScript inference from input to output across all stages | | Async-First | Native support for long-running operations (batch jobs that take hours/days) | | AI-Native | Built-in tracking of prompts, responses, tokens, and costs | | Event-Driven | Transactional outbox pattern for reliable event delivery | | Parallel Execution | Run independent stages concurrently | | Resume Capability | Automatic state persistence and recovery from failures | | Distributed | Job queue with priority support and stale lock recovery | | Environment-Agnostic | Pure command kernel runs on Node.js, serverless, edge, or any runtime |


Requirements

  • TypeScript >= 5.0.0
  • Zod >= 4.0.0
  • PostgreSQL >= 14 (for Prisma persistence)

Optional Peer Dependencies

# For Google AI
npm install @google/genai

# For OpenAI
npm install openai

# For Anthropic
npm install @anthropic-ai/sdk

# For Prisma persistence (recommended)
npm install @prisma/client

Installation

# Core library
npm install @bratsos/workflow-engine zod

# Node.js host (long-running worker processes)
npm install @bratsos/workflow-engine-host-node

# Serverless host (Cloudflare Workers, AWS Lambda, Vercel Edge, etc.)
npm install @bratsos/workflow-engine-host-serverless

Getting Started

1. Database Setup

The engine requires persistence tables. Add these to your Prisma schema:

// schema.prisma

enum Status {
  PENDING
  RUNNING
  SUSPENDED
  COMPLETED
  FAILED
  CANCELLED
  SKIPPED
}

model WorkflowRun {
  id            String   @id @default(cuid())
  createdAt     DateTime @default(now())
  updatedAt     DateTime @updatedAt
  version       Int      @default(1)
  workflowId    String
  workflowName  String
  workflowType  String
  status        Status   @default(PENDING)
  startedAt     DateTime?
  completedAt   DateTime?
  duration      Int?
  input         Json
  output        Json?
  config        Json           @default("{}")
  totalCost     Float          @default(0)
  totalTokens   Int            @default(0)
  priority      Int            @default(5)
  metadata      Json?

  stages        WorkflowStage[]
  logs          WorkflowLog[]
  artifacts     WorkflowArtifact[]

  @@index([status])
  @@index([workflowId])
}

model WorkflowStage {
  id              String              @id @default(cuid())
  createdAt       DateTime            @default(now())
  updatedAt       DateTime            @updatedAt
  version         Int                 @default(1)
  workflowRunId   String
  workflowRun     WorkflowRun         @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
  stageId         String
  stageName       String
  stageNumber     Int
  executionGroup  Int
  status          Status              @default(PENDING)
  startedAt       DateTime?
  completedAt     DateTime?
  duration        Int?
  inputData       Json?
  outputData      Json?
  config          Json?
  suspendedState  Json?
  resumeData      Json?
  nextPollAt      DateTime?
  pollInterval    Int?
  maxWaitUntil    DateTime?
  metrics         Json?
  embeddingInfo   Json?
  errorMessage    String?

  logs            WorkflowLog[]

  @@unique([workflowRunId, stageId])
  @@index([status])
  @@index([nextPollAt])
}

model WorkflowLog {
  id              String          @id @default(cuid())
  createdAt       DateTime        @default(now())
  workflowRunId   String?
  workflowRun     WorkflowRun?    @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
  workflowStageId String?
  workflowStage   WorkflowStage?  @relation(fields: [workflowStageId], references: [id], onDelete: Cascade)
  level           String
  message         String
  metadata        Json?

  @@index([workflowRunId])
  @@index([workflowStageId])
}

model WorkflowArtifact {
  id            String   @id @default(cuid())
  createdAt     DateTime @default(now())
  workflowRunId String
  workflowRun   WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
  key           String
  type          String
  data          Json
  size          Int

  @@unique([workflowRunId, key])
  @@index([workflowRunId])
}

model AICall {
  id            String   @id @default(cuid())
  createdAt     DateTime @default(now())
  topic         String
  callType      String
  modelKey      String
  modelId       String
  prompt        String   @db.Text
  response      String   @db.Text
  inputTokens   Int
  outputTokens  Int
  cost          Float

  @@index([topic])
}

model JobQueue {
  id            String    @id @default(cuid())
  createdAt     DateTime  @default(now())
  updatedAt     DateTime  @updatedAt
  workflowRunId String
  stageId       String
  status        Status    @default(PENDING)
  priority      Int       @default(5)
  attempt       Int       @default(0)
  maxAttempts   Int       @default(3)
  workerId      String?
  lockedAt      DateTime?
  nextPollAt    DateTime?
  payload       Json?
  lastError     String?

  @@index([status, priority])
  @@index([nextPollAt])
}

model OutboxEvent {
  id              String    @id @default(cuid())
  createdAt       DateTime  @default(now())
  workflowRunId   String
  sequence        Int
  eventType       String
  payload         Json
  causationId     String
  occurredAt      DateTime
  publishedAt     DateTime?
  retryCount      Int       @default(0)
  dlqAt           DateTime?

  @@unique([workflowRunId, sequence])
  @@index([publishedAt])
  @@map("outbox_events")
}

model IdempotencyKey {
  id          String   @id @default(cuid())
  createdAt   DateTime @default(now())
  key         String
  commandType String
  result      Json

  @@unique([key, commandType])
  @@map("idempotency_keys")
}

Run the migration:

npx prisma migrate dev --name add-workflow-tables
npx prisma generate

2. Define Your First Stage

import { defineStage } from "@bratsos/workflow-engine";
import { z } from "zod";

export const extractTextStage = defineStage({
  id: "extract-text",
  name: "Extract Text",
  schemas: {
    input: z.object({ url: z.string().url() }),
    output: z.object({ text: z.string(), wordCount: z.number() }),
    config: z.object({ maxLength: z.number().default(50000) }),
  },
  async execute(ctx) {
    const response = await fetch(ctx.input.url);
    const text = (await response.text()).slice(0, ctx.config.maxLength);
    ctx.log("INFO", "Extraction complete", { length: text.length });
    return {
      output: { text, wordCount: text.split(/\s+/).length },
    };
  },
});

3. Build a Workflow

import { WorkflowBuilder } from "@bratsos/workflow-engine";
import { z } from "zod";
import { extractTextStage } from "./stages/extract-text";
import { summarizeStage } from "./stages/summarize";

export const documentProcessorWorkflow = new WorkflowBuilder(
  "document-processor",
  "Document Processor",
  "Extracts and summarizes documents",
  z.object({ url: z.string().url() }),
  z.object({ url: z.string().url() }),
)
  .pipe(extractTextStage)
  .pipe(summarizeStage)
  .build();

4. Create the Kernel

The kernel is the core command dispatcher. It's environment-agnostic -- no timers, no signals, no global state.

import { createKernel } from "@bratsos/workflow-engine/kernel";
import {
  createPrismaWorkflowPersistence,
  createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { PrismaClient } from "@prisma/client";
import { documentProcessorWorkflow } from "./workflows/document-processor";

const prisma = new PrismaClient();

const kernel = createKernel({
  persistence: createPrismaWorkflowPersistence(prisma),
  blobStore: myBlobStore,         // BlobStore implementation
  jobTransport: createPrismaJobQueue(prisma),
  eventSink: myEventSink,         // EventSink implementation
  scheduler: myScheduler,         // Scheduler implementation
  clock: { now: () => new Date() },
  registry: {
    getWorkflow: (id) =>
      id === "document-processor" ? documentProcessorWorkflow : undefined,
  },
});

5. Choose a Host

Option A: Node.js Worker (Recommended for Production)

import { createNodeHost } from "@bratsos/workflow-engine-host-node";

const host = createNodeHost({
  kernel,
  jobTransport: createPrismaJobQueue(prisma),
  workerId: "worker-1",
  orchestrationIntervalMs: 10_000,
  jobPollIntervalMs: 1_000,
});

// Start polling loops + signal handlers
await host.start();

// Queue a workflow
await kernel.dispatch({
  type: "run.create",
  idempotencyKey: crypto.randomUUID(),
  workflowId: "document-processor",
  input: { url: "https://example.com/doc.pdf" },
});

Option B: Serverless (Cloudflare Workers, Lambda, etc.)

import { createServerlessHost } from "@bratsos/workflow-engine-host-serverless";

const host = createServerlessHost({
  kernel,
  jobTransport,
  workerId: "my-worker",
});

// Handle a single job from a queue message
const result = await host.handleJob(msg);

// Run maintenance from a cron trigger
const tick = await host.runMaintenanceTick();

Core Concepts

Stages

A stage is the atomic unit of work. Every stage has typed input, output, and config schemas.

Stage Modes:

| Mode | Use Case | |------|----------| | sync (default) | Most stages - execute and return immediately | | async-batch | Long-running batch APIs (OpenAI Batch, Google Batch, etc.) |

Workflows

Workflows are built as a linear pipeline of execution groups. Each group contains one or more stages. Sequential stages (.pipe()) form single-stage groups. Parallel stages (.parallel()) form multi-stage groups where all stages run concurrently.

new WorkflowBuilder(id, name, description, inputSchema, outputSchema)
  .pipe(stageA)              // Group 0: stageA runs first
  .pipe(stageB)              // Group 1: stageB runs after stageA
  .parallel([stageC, stageD]) // Group 2: stageC and stageD run concurrently
  .pipe(stageE)              // Group 3: stageE runs after both complete
  .build();

The output of each execution group is stored in the workflow context keyed by stage ID. For parallel groups, the merged output is an object keyed by each stage's ID:

// After group 2 completes, stageE receives:
ctx.require("stageC") // output of stageC
ctx.require("stageD") // output of stageD

When a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.

Kernel

The Kernel is a pure command dispatcher. All operations are expressed as typed commands:

// Create a run
const { workflowRunId } = await kernel.dispatch({
  type: "run.create",
  idempotencyKey: "unique-key",
  workflowId: "my-workflow",
  input: { data: "hello" },
});

// Cancel a run
await kernel.dispatch({
  type: "run.cancel",
  workflowRunId,
  reason: "User requested",
});

// Rerun from a specific stage
await kernel.dispatch({
  type: "run.rerunFrom",
  workflowRunId,
  fromStageId: "extract-text",
});

The kernel depends on 7 port interfaces (injected at creation):

| Port | Purpose | |------|---------| | Persistence | Runs, stages, logs, outbox, idempotency CRUD | | BlobStore | Large payload storage (put/get/has/delete/list) | | JobTransport | Job queue (enqueue/dequeue/complete/suspend/fail) | | EventSink | Async event publishing | | Scheduler | Deferred command triggers | | Clock | Injectable time source | | WorkflowRegistry | Workflow definition lookup |

Hosts

Hosts wrap the kernel with environment-specific process management:

Node Host (@bratsos/workflow-engine-host-node): Long-running worker process with polling loops, signal handling (SIGTERM/SIGINT), and continuous job dequeuing.

Serverless Host (@bratsos/workflow-engine-host-serverless): Stateless single-invocation methods for queue-driven environments. Consumers wire platform-specific glue (ack/retry/waitUntil) around the host methods.

Persistence

| Interface | Purpose | |-----------|---------| | Persistence | Workflow runs, stages, logs, outbox, idempotency | | JobTransport | Distributed job queue with priority and retries | | BlobStore | Large payload storage | | AICallLogger | AI call tracking with cost aggregation |

Built-in implementations:

  • createPrismaWorkflowPersistence(prisma) - PostgreSQL via Prisma
  • createPrismaJobQueue(prisma) - PostgreSQL with FOR UPDATE SKIP LOCKED
  • createPrismaAICallLogger(prisma) - PostgreSQL

Common Patterns

Accessing Previous Stage Output

Use ctx.require() for type-safe access to any previous stage's output:

export const analyzeStage = defineStage({
  id: "analyze",
  name: "Analyze Content",
  schemas: {
    input: "none",
    output: AnalysisOutputSchema,
    config: ConfigSchema,
  },
  async execute(ctx) {
    const extracted = ctx.require("extract-text");  // Throws if missing
    const summary = ctx.optional("summarize");       // Returns undefined if missing
    return { output: { /* ... */ } };
  },
});

Parallel Execution

Parallel stages run concurrently in the same execution group. Their outputs are keyed by stage ID in the workflow context:

const workflow = new WorkflowBuilder(/* ... */)
  .pipe(extractStage)
  .parallel([
    sentimentAnalysisStage,   // id: "sentiment"
    keywordExtractionStage,   // id: "keywords"
    languageDetectionStage,   // id: "language"
  ])
  .pipe(aggregateResultsStage)
  .build();

// In aggregateResultsStage:
async execute(ctx) {
  const sentiment = ctx.require("sentiment");   // output of sentimentAnalysisStage
  const keywords = ctx.require("keywords");     // output of keywordExtractionStage
  const language = ctx.require("language");     // output of languageDetectionStage
  // ...
}

Stage ID Utilities

Use createStageIds or defineStageIds for type-safe stage ID constants with autocomplete:

import { createStageIds, defineStageIds } from "@bratsos/workflow-engine";

// From an existing workflow
const STAGES = createStageIds(myWorkflow);
STAGES.EXTRACT_TEXT    // "extract-text" (autocomplete + type-safe)
STAGES.SUMMARIZE       // "summarize"

// Or define upfront
const STAGES = defineStageIds(["extract-text", "summarize"] as const);

AI Integration

import { createAIHelper } from "@bratsos/workflow-engine";

async execute(ctx) {
  const ai = createAIHelper(
    `workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
    aiCallLogger,
  );

  const { text, cost } = await ai.generateText("gemini-2.5-flash", "Summarize: " + ctx.input.text);

  const { object: analysis } = await ai.generateObject(
    "gemini-2.5-flash",
    "Analyze: " + ctx.input.text,
    z.object({ sentiment: z.enum(["positive", "negative", "neutral"]) })
  );

  return { output: { text, analysis } };
}

Long-Running Batch Jobs

import { defineAsyncBatchStage } from "@bratsos/workflow-engine";

export const batchStage = defineAsyncBatchStage({
  id: "batch-process",
  name: "Batch Processing",
  mode: "async-batch",
  schemas: { input: InputSchema, output: OutputSchema, config: ConfigSchema },

  async execute(ctx) {
    if (ctx.resumeState) {
      return { output: await fetchBatchResults(ctx.resumeState.batchId) };
    }

    const batch = await submitBatch(ctx.input.prompts);
    return {
      suspended: true,
      state: {
        batchId: batch.id,
        submittedAt: new Date().toISOString(),
        pollInterval: 3600000,
        maxWaitTime: 86400000,
      },
      pollConfig: { pollInterval: 3600000, maxWaitTime: 86400000, nextPollAt: new Date(Date.now() + 3600000) },
    };
  },

  async checkCompletion(state) {
    const status = await checkBatchStatus(state.batchId);
    if (status === "completed") {
      const output = await fetchBatchResults(state.batchId);
      return { ready: true, output };
    }
    if (status === "failed") return { ready: false, error: "Batch failed" };
    return { ready: false };
  },
});

Config Presets

import { withAIConfig, withStandardConfig } from "@bratsos/workflow-engine";
import { z } from "zod";

const MyConfigSchema = withAIConfig(z.object({ customField: z.string() }));

Best Practices

Schema Design

// Good: Strict schemas with descriptions and defaults
const ConfigSchema = z.object({
  modelKey: z.string().default("gemini-2.5-flash").describe("AI model to use"),
  maxRetries: z.number().min(0).max(10).default(3),
});

Logging

async execute(ctx) {
  ctx.log("INFO", "Starting processing", { itemCount: items.length });

  for (const [index, item] of items.entries()) {
    ctx.onProgress({
      progress: (index + 1) / items.length,
      message: `Processing item ${index + 1}/${items.length}`,
    });
  }
}

Error Handling

async execute(ctx) {
  try {
    const result = await processDocument(ctx.input);
    return { output: result };
  } catch (error) {
    ctx.log("ERROR", "Processing failed", {
      error: error instanceof Error ? error.message : String(error),
    });
    throw error;
  }
}

API Reference

Kernel Commands

| Command | Description | Key Fields | |---------|-------------|------------| | run.create | Create a new workflow run | idempotencyKey, workflowId, input, config?, priority? | | run.claimPending | Claim pending runs for processing | workerId, maxClaims? | | run.transition | Advance to next stage group | workflowRunId | | run.cancel | Cancel a running workflow (cascades to stages + jobs) | workflowRunId, reason? | | run.rerunFrom | Rerun from a specific stage (cleans up artifacts) | workflowRunId, fromStageId | | job.execute | Execute a single stage (multi-phase transactions) | idempotencyKey?, workflowRunId, workflowId, stageId, config | | stage.pollSuspended | Poll suspended stages (per-stage transactions) | maxChecks? (returns resumedWorkflowRunIds) | | lease.reapStale | Release stale job leases | staleThresholdMs | | run.reapStuck | Fail runs stuck RUNNING with no activity | stuckThresholdMs? | | outbox.flush | Publish pending events | maxEvents? | | plugin.replayDLQ | Replay dead-letter queue events | maxEvents? |

Idempotency behavior:

  • Replaying the same idempotencyKey returns cached results.
  • If the same key is already executing, dispatch throws IdempotencyInProgressError.

Transaction behavior:

  • Most commands execute inside a single database transaction (handler + outbox events).
  • job.execute uses multi-phase transactions: Phase 1 commits RUNNING status immediately, Phase 2 runs stageDef.execute() outside any transaction, Phase 3 commits the final status. This avoids holding a database connection during long-running stage execution.
  • stage.pollSuspended uses per-stage transactions: checkCompletion() runs outside any transaction (external HTTP calls to batch providers), then DB updates + outbox events are committed in a short transaction per stage. This prevents P2028 timeout errors when batch APIs are slow.

Cancellation semantics:

  • run.cancel is authoritative: it marks the run as CANCELLED, cascades to all non-terminal stages (setting them to CANCELLED and clearing nextPollAt), and cancels all queued/suspended jobs via jobTransport.cancelByRun().
  • stage.pollSuspended skips stages whose run has been cancelled.
  • job.execute re-checks run status after stage execution. If the run was cancelled during execution, the result is discarded and a ghost: true flag is returned. Hosts use this flag to prevent retries.

Node Host Config

| Option | Type | Default | Description | |--------|------|---------|-------------| | kernel | Kernel | required | Kernel instance | | jobTransport | JobTransport | required | Job queue | | workerId | string | required | Unique worker ID | | orchestrationIntervalMs | number | 10000 | Orchestration poll interval | | jobPollIntervalMs | number | 1000 | Job dequeue interval | | staleLeaseThresholdMs | number | 60000 | Stale lease timeout |

Serverless Host

| Method | Description | |--------|-------------| | handleJob(msg) | Execute a single pre-dequeued job. Returns { outcome, error? } | | processAvailableJobs(opts?) | Dequeue and process jobs. Returns { processed, succeeded, failed } | | runMaintenanceTick() | Claim, poll, reap, flush in one call. Returns structured result |

Core Exports

// Stage definition
import { defineStage, defineAsyncBatchStage } from "@bratsos/workflow-engine";

// Workflow building
import { WorkflowBuilder, Workflow } from "@bratsos/workflow-engine";

// Kernel
import { createKernel, type Kernel, type KernelConfig } from "@bratsos/workflow-engine/kernel";

// Kernel types
import type { KernelCommand, CommandResult, KernelEvent } from "@bratsos/workflow-engine/kernel";

// Port interfaces
import type { Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";

// Plugins
import { definePlugin, createPluginRunner } from "@bratsos/workflow-engine/kernel";

// Persistence (Prisma)
import { createPrismaWorkflowPersistence, createPrismaJobQueue, createPrismaAICallLogger } from "@bratsos/workflow-engine";

// AI Helper
import { createAIHelper, type AIHelper } from "@bratsos/workflow-engine";

// Stage ID utilities
import { createStageIds, defineStageIds, isValidStageId, assertValidStageId } from "@bratsos/workflow-engine";

// Testing
import { InMemoryWorkflowPersistence, InMemoryJobQueue } from "@bratsos/workflow-engine/testing";
import { FakeClock, InMemoryBlobStore, CollectingEventSink, NoopScheduler } from "@bratsos/workflow-engine/kernel/testing";

Troubleshooting

"Workflow not found in registry"

Ensure the workflow is registered in the registry passed to createKernel:

const kernel = createKernel({
  // ...
  registry: {
    getWorkflow(id) {
      const workflows = { "my-workflow": myWorkflow };
      return workflows[id];
    },
  },
});

"Stage X depends on Y which was not found"

Verify all dependencies are included in the workflow:

.pipe(extractStage)   // Must be piped before
.pipe(analyzeStage)   // analyze can now access extract's output

Jobs stuck in "RUNNING"

A worker likely crashed. The stale lease recovery (lease.reapStale command) automatically releases jobs. In Node host, this runs on each orchestration tick. For serverless, call runMaintenanceTick() from a cron trigger.


License

MIT