@bratsos/workflow-engine
v0.7.0
Published
Type-safe, distributed workflow engine for AI-orchestrated processes with suspend/resume, parallel execution, and cost tracking
Maintainers
Readme
@bratsos/workflow-engine
A type-safe, distributed workflow engine for AI-orchestrated processes. Features long-running job support, suspend/resume semantics, parallel execution, and integrated AI cost tracking.
Table of Contents
- Features
- Requirements
- Installation
- Getting Started
- Core Concepts
- Common Patterns
- Best Practices
- API Reference
- Troubleshooting
Features
| Feature | Description | |---------|-------------| | Type-Safe | Full TypeScript inference from input to output across all stages | | Async-First | Native support for long-running operations (batch jobs that take hours/days) | | AI-Native | Built-in tracking of prompts, responses, tokens, and costs | | Event-Driven | Transactional outbox pattern for reliable event delivery | | Parallel Execution | Run independent stages concurrently | | Resume Capability | Automatic state persistence and recovery from failures | | Distributed | Job queue with priority support and stale lock recovery | | Environment-Agnostic | Pure command kernel runs on Node.js, serverless, edge, or any runtime |
Requirements
- TypeScript >= 5.0.0
- Zod >= 4.0.0
- PostgreSQL >= 14 (for Prisma persistence)
Optional Peer Dependencies
# For Google AI
npm install @google/genai
# For OpenAI
npm install openai
# For Anthropic
npm install @anthropic-ai/sdk
# For Prisma persistence (recommended)
npm install @prisma/clientInstallation
# Core library
npm install @bratsos/workflow-engine zod
# Node.js host (long-running worker processes)
npm install @bratsos/workflow-engine-host-node
# Serverless host (Cloudflare Workers, AWS Lambda, Vercel Edge, etc.)
npm install @bratsos/workflow-engine-host-serverlessGetting Started
1. Database Setup
The engine requires persistence tables. Add these to your Prisma schema:
// schema.prisma
enum Status {
PENDING
RUNNING
SUSPENDED
COMPLETED
FAILED
CANCELLED
SKIPPED
}
model WorkflowRun {
id String @id @default(cuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
version Int @default(1)
workflowId String
workflowName String
workflowType String
status Status @default(PENDING)
startedAt DateTime?
completedAt DateTime?
duration Int?
input Json
output Json?
config Json @default("{}")
totalCost Float @default(0)
totalTokens Int @default(0)
priority Int @default(5)
metadata Json?
stages WorkflowStage[]
logs WorkflowLog[]
artifacts WorkflowArtifact[]
@@index([status])
@@index([workflowId])
}
model WorkflowStage {
id String @id @default(cuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
version Int @default(1)
workflowRunId String
workflowRun WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
stageId String
stageName String
stageNumber Int
executionGroup Int
status Status @default(PENDING)
startedAt DateTime?
completedAt DateTime?
duration Int?
inputData Json?
outputData Json?
config Json?
suspendedState Json?
resumeData Json?
nextPollAt DateTime?
pollInterval Int?
maxWaitUntil DateTime?
metrics Json?
embeddingInfo Json?
errorMessage String?
logs WorkflowLog[]
@@unique([workflowRunId, stageId])
@@index([status])
@@index([nextPollAt])
}
model WorkflowLog {
id String @id @default(cuid())
createdAt DateTime @default(now())
workflowRunId String?
workflowRun WorkflowRun? @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
workflowStageId String?
workflowStage WorkflowStage? @relation(fields: [workflowStageId], references: [id], onDelete: Cascade)
level String
message String
metadata Json?
@@index([workflowRunId])
@@index([workflowStageId])
}
model WorkflowArtifact {
id String @id @default(cuid())
createdAt DateTime @default(now())
workflowRunId String
workflowRun WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade)
key String
type String
data Json
size Int
@@unique([workflowRunId, key])
@@index([workflowRunId])
}
model AICall {
id String @id @default(cuid())
createdAt DateTime @default(now())
topic String
callType String
modelKey String
modelId String
prompt String @db.Text
response String @db.Text
inputTokens Int
outputTokens Int
cost Float
@@index([topic])
}
model JobQueue {
id String @id @default(cuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
workflowRunId String
stageId String
status Status @default(PENDING)
priority Int @default(5)
attempt Int @default(0)
maxAttempts Int @default(3)
workerId String?
lockedAt DateTime?
nextPollAt DateTime?
payload Json?
lastError String?
@@index([status, priority])
@@index([nextPollAt])
}
model OutboxEvent {
id String @id @default(cuid())
createdAt DateTime @default(now())
workflowRunId String
sequence Int
eventType String
payload Json
causationId String
occurredAt DateTime
publishedAt DateTime?
retryCount Int @default(0)
dlqAt DateTime?
@@unique([workflowRunId, sequence])
@@index([publishedAt])
@@map("outbox_events")
}
model IdempotencyKey {
id String @id @default(cuid())
createdAt DateTime @default(now())
key String
commandType String
result Json
@@unique([key, commandType])
@@map("idempotency_keys")
}Run the migration:
npx prisma migrate dev --name add-workflow-tables
npx prisma generate2. Define Your First Stage
import { defineStage } from "@bratsos/workflow-engine";
import { z } from "zod";
export const extractTextStage = defineStage({
id: "extract-text",
name: "Extract Text",
schemas: {
input: z.object({ url: z.string().url() }),
output: z.object({ text: z.string(), wordCount: z.number() }),
config: z.object({ maxLength: z.number().default(50000) }),
},
async execute(ctx) {
const response = await fetch(ctx.input.url);
const text = (await response.text()).slice(0, ctx.config.maxLength);
ctx.log("INFO", "Extraction complete", { length: text.length });
return {
output: { text, wordCount: text.split(/\s+/).length },
};
},
});3. Build a Workflow
import { WorkflowBuilder } from "@bratsos/workflow-engine";
import { z } from "zod";
import { extractTextStage } from "./stages/extract-text";
import { summarizeStage } from "./stages/summarize";
export const documentProcessorWorkflow = new WorkflowBuilder(
"document-processor",
"Document Processor",
"Extracts and summarizes documents",
z.object({ url: z.string().url() }),
z.object({ url: z.string().url() }),
)
.pipe(extractTextStage)
.pipe(summarizeStage)
.build();4. Create the Kernel
The kernel is the core command dispatcher. It's environment-agnostic -- no timers, no signals, no global state.
import { createKernel } from "@bratsos/workflow-engine/kernel";
import {
createPrismaWorkflowPersistence,
createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { PrismaClient } from "@prisma/client";
import { documentProcessorWorkflow } from "./workflows/document-processor";
const prisma = new PrismaClient();
const kernel = createKernel({
persistence: createPrismaWorkflowPersistence(prisma),
blobStore: myBlobStore, // BlobStore implementation
jobTransport: createPrismaJobQueue(prisma),
eventSink: myEventSink, // EventSink implementation
scheduler: myScheduler, // Scheduler implementation
clock: { now: () => new Date() },
registry: {
getWorkflow: (id) =>
id === "document-processor" ? documentProcessorWorkflow : undefined,
},
});5. Choose a Host
Option A: Node.js Worker (Recommended for Production)
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
const host = createNodeHost({
kernel,
jobTransport: createPrismaJobQueue(prisma),
workerId: "worker-1",
orchestrationIntervalMs: 10_000,
jobPollIntervalMs: 1_000,
});
// Start polling loops + signal handlers
await host.start();
// Queue a workflow
await kernel.dispatch({
type: "run.create",
idempotencyKey: crypto.randomUUID(),
workflowId: "document-processor",
input: { url: "https://example.com/doc.pdf" },
});Option B: Serverless (Cloudflare Workers, Lambda, etc.)
import { createServerlessHost } from "@bratsos/workflow-engine-host-serverless";
const host = createServerlessHost({
kernel,
jobTransport,
workerId: "my-worker",
});
// Handle a single job from a queue message
const result = await host.handleJob(msg);
// Run maintenance from a cron trigger
const tick = await host.runMaintenanceTick();Core Concepts
Stages
A stage is the atomic unit of work. Every stage has typed input, output, and config schemas.
Stage Modes:
| Mode | Use Case |
|------|----------|
| sync (default) | Most stages - execute and return immediately |
| async-batch | Long-running batch APIs (OpenAI Batch, Google Batch, etc.) |
Workflows
Workflows are built as a linear pipeline of execution groups. Each group contains one or more stages. Sequential stages (.pipe()) form single-stage groups. Parallel stages (.parallel()) form multi-stage groups where all stages run concurrently.
new WorkflowBuilder(id, name, description, inputSchema, outputSchema)
.pipe(stageA) // Group 0: stageA runs first
.pipe(stageB) // Group 1: stageB runs after stageA
.parallel([stageC, stageD]) // Group 2: stageC and stageD run concurrently
.pipe(stageE) // Group 3: stageE runs after both complete
.build();The output of each execution group is stored in the workflow context keyed by stage ID. For parallel groups, the merged output is an object keyed by each stage's ID:
// After group 2 completes, stageE receives:
ctx.require("stageC") // output of stageC
ctx.require("stageD") // output of stageDWhen a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.
Kernel
The Kernel is a pure command dispatcher. All operations are expressed as typed commands:
// Create a run
const { workflowRunId } = await kernel.dispatch({
type: "run.create",
idempotencyKey: "unique-key",
workflowId: "my-workflow",
input: { data: "hello" },
});
// Cancel a run
await kernel.dispatch({
type: "run.cancel",
workflowRunId,
reason: "User requested",
});
// Rerun from a specific stage
await kernel.dispatch({
type: "run.rerunFrom",
workflowRunId,
fromStageId: "extract-text",
});The kernel depends on 7 port interfaces (injected at creation):
| Port | Purpose |
|------|---------|
| Persistence | Runs, stages, logs, outbox, idempotency CRUD |
| BlobStore | Large payload storage (put/get/has/delete/list) |
| JobTransport | Job queue (enqueue/dequeue/complete/suspend/fail) |
| EventSink | Async event publishing |
| Scheduler | Deferred command triggers |
| Clock | Injectable time source |
| WorkflowRegistry | Workflow definition lookup |
Hosts
Hosts wrap the kernel with environment-specific process management:
Node Host (@bratsos/workflow-engine-host-node): Long-running worker process with polling loops, signal handling (SIGTERM/SIGINT), and continuous job dequeuing.
Serverless Host (@bratsos/workflow-engine-host-serverless): Stateless single-invocation methods for queue-driven environments. Consumers wire platform-specific glue (ack/retry/waitUntil) around the host methods.
Persistence
| Interface | Purpose |
|-----------|---------|
| Persistence | Workflow runs, stages, logs, outbox, idempotency |
| JobTransport | Distributed job queue with priority and retries |
| BlobStore | Large payload storage |
| AICallLogger | AI call tracking with cost aggregation |
Built-in implementations:
createPrismaWorkflowPersistence(prisma)- PostgreSQL via PrismacreatePrismaJobQueue(prisma)- PostgreSQL withFOR UPDATE SKIP LOCKEDcreatePrismaAICallLogger(prisma)- PostgreSQL
Common Patterns
Accessing Previous Stage Output
Use ctx.require() for type-safe access to any previous stage's output:
export const analyzeStage = defineStage({
id: "analyze",
name: "Analyze Content",
schemas: {
input: "none",
output: AnalysisOutputSchema,
config: ConfigSchema,
},
async execute(ctx) {
const extracted = ctx.require("extract-text"); // Throws if missing
const summary = ctx.optional("summarize"); // Returns undefined if missing
return { output: { /* ... */ } };
},
});Parallel Execution
Parallel stages run concurrently in the same execution group. Their outputs are keyed by stage ID in the workflow context:
const workflow = new WorkflowBuilder(/* ... */)
.pipe(extractStage)
.parallel([
sentimentAnalysisStage, // id: "sentiment"
keywordExtractionStage, // id: "keywords"
languageDetectionStage, // id: "language"
])
.pipe(aggregateResultsStage)
.build();
// In aggregateResultsStage:
async execute(ctx) {
const sentiment = ctx.require("sentiment"); // output of sentimentAnalysisStage
const keywords = ctx.require("keywords"); // output of keywordExtractionStage
const language = ctx.require("language"); // output of languageDetectionStage
// ...
}Stage ID Utilities
Use createStageIds or defineStageIds for type-safe stage ID constants with autocomplete:
import { createStageIds, defineStageIds } from "@bratsos/workflow-engine";
// From an existing workflow
const STAGES = createStageIds(myWorkflow);
STAGES.EXTRACT_TEXT // "extract-text" (autocomplete + type-safe)
STAGES.SUMMARIZE // "summarize"
// Or define upfront
const STAGES = defineStageIds(["extract-text", "summarize"] as const);AI Integration
import { createAIHelper } from "@bratsos/workflow-engine";
async execute(ctx) {
const ai = createAIHelper(
`workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
aiCallLogger,
);
const { text, cost } = await ai.generateText("gemini-2.5-flash", "Summarize: " + ctx.input.text);
const { object: analysis } = await ai.generateObject(
"gemini-2.5-flash",
"Analyze: " + ctx.input.text,
z.object({ sentiment: z.enum(["positive", "negative", "neutral"]) })
);
return { output: { text, analysis } };
}Long-Running Batch Jobs
import { defineAsyncBatchStage } from "@bratsos/workflow-engine";
export const batchStage = defineAsyncBatchStage({
id: "batch-process",
name: "Batch Processing",
mode: "async-batch",
schemas: { input: InputSchema, output: OutputSchema, config: ConfigSchema },
async execute(ctx) {
if (ctx.resumeState) {
return { output: await fetchBatchResults(ctx.resumeState.batchId) };
}
const batch = await submitBatch(ctx.input.prompts);
return {
suspended: true,
state: {
batchId: batch.id,
submittedAt: new Date().toISOString(),
pollInterval: 3600000,
maxWaitTime: 86400000,
},
pollConfig: { pollInterval: 3600000, maxWaitTime: 86400000, nextPollAt: new Date(Date.now() + 3600000) },
};
},
async checkCompletion(state) {
const status = await checkBatchStatus(state.batchId);
if (status === "completed") {
const output = await fetchBatchResults(state.batchId);
return { ready: true, output };
}
if (status === "failed") return { ready: false, error: "Batch failed" };
return { ready: false };
},
});Config Presets
import { withAIConfig, withStandardConfig } from "@bratsos/workflow-engine";
import { z } from "zod";
const MyConfigSchema = withAIConfig(z.object({ customField: z.string() }));Best Practices
Schema Design
// Good: Strict schemas with descriptions and defaults
const ConfigSchema = z.object({
modelKey: z.string().default("gemini-2.5-flash").describe("AI model to use"),
maxRetries: z.number().min(0).max(10).default(3),
});Logging
async execute(ctx) {
ctx.log("INFO", "Starting processing", { itemCount: items.length });
for (const [index, item] of items.entries()) {
ctx.onProgress({
progress: (index + 1) / items.length,
message: `Processing item ${index + 1}/${items.length}`,
});
}
}Error Handling
async execute(ctx) {
try {
const result = await processDocument(ctx.input);
return { output: result };
} catch (error) {
ctx.log("ERROR", "Processing failed", {
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}API Reference
Kernel Commands
| Command | Description | Key Fields |
|---------|-------------|------------|
| run.create | Create a new workflow run | idempotencyKey, workflowId, input, config?, priority? |
| run.claimPending | Claim pending runs for processing | workerId, maxClaims? |
| run.transition | Advance to next stage group | workflowRunId |
| run.cancel | Cancel a running workflow (cascades to stages + jobs) | workflowRunId, reason? |
| run.rerunFrom | Rerun from a specific stage (cleans up artifacts) | workflowRunId, fromStageId |
| job.execute | Execute a single stage (multi-phase transactions) | idempotencyKey?, workflowRunId, workflowId, stageId, config |
| stage.pollSuspended | Poll suspended stages (per-stage transactions) | maxChecks? (returns resumedWorkflowRunIds) |
| lease.reapStale | Release stale job leases | staleThresholdMs |
| run.reapStuck | Fail runs stuck RUNNING with no activity | stuckThresholdMs? |
| outbox.flush | Publish pending events | maxEvents? |
| plugin.replayDLQ | Replay dead-letter queue events | maxEvents? |
Idempotency behavior:
- Replaying the same
idempotencyKeyreturns cached results. - If the same key is already executing, dispatch throws
IdempotencyInProgressError.
Transaction behavior:
- Most commands execute inside a single database transaction (handler + outbox events).
job.executeuses multi-phase transactions: Phase 1 commitsRUNNINGstatus immediately, Phase 2 runsstageDef.execute()outside any transaction, Phase 3 commits the final status. This avoids holding a database connection during long-running stage execution.stage.pollSuspendeduses per-stage transactions:checkCompletion()runs outside any transaction (external HTTP calls to batch providers), then DB updates + outbox events are committed in a short transaction per stage. This prevents P2028 timeout errors when batch APIs are slow.
Cancellation semantics:
run.cancelis authoritative: it marks the run asCANCELLED, cascades to all non-terminal stages (setting them toCANCELLEDand clearingnextPollAt), and cancels all queued/suspended jobs viajobTransport.cancelByRun().stage.pollSuspendedskips stages whose run has been cancelled.job.executere-checks run status after stage execution. If the run was cancelled during execution, the result is discarded and aghost: trueflag is returned. Hosts use this flag to prevent retries.
Node Host Config
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| kernel | Kernel | required | Kernel instance |
| jobTransport | JobTransport | required | Job queue |
| workerId | string | required | Unique worker ID |
| orchestrationIntervalMs | number | 10000 | Orchestration poll interval |
| jobPollIntervalMs | number | 1000 | Job dequeue interval |
| staleLeaseThresholdMs | number | 60000 | Stale lease timeout |
Serverless Host
| Method | Description |
|--------|-------------|
| handleJob(msg) | Execute a single pre-dequeued job. Returns { outcome, error? } |
| processAvailableJobs(opts?) | Dequeue and process jobs. Returns { processed, succeeded, failed } |
| runMaintenanceTick() | Claim, poll, reap, flush in one call. Returns structured result |
Core Exports
// Stage definition
import { defineStage, defineAsyncBatchStage } from "@bratsos/workflow-engine";
// Workflow building
import { WorkflowBuilder, Workflow } from "@bratsos/workflow-engine";
// Kernel
import { createKernel, type Kernel, type KernelConfig } from "@bratsos/workflow-engine/kernel";
// Kernel types
import type { KernelCommand, CommandResult, KernelEvent } from "@bratsos/workflow-engine/kernel";
// Port interfaces
import type { Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";
// Plugins
import { definePlugin, createPluginRunner } from "@bratsos/workflow-engine/kernel";
// Persistence (Prisma)
import { createPrismaWorkflowPersistence, createPrismaJobQueue, createPrismaAICallLogger } from "@bratsos/workflow-engine";
// AI Helper
import { createAIHelper, type AIHelper } from "@bratsos/workflow-engine";
// Stage ID utilities
import { createStageIds, defineStageIds, isValidStageId, assertValidStageId } from "@bratsos/workflow-engine";
// Testing
import { InMemoryWorkflowPersistence, InMemoryJobQueue } from "@bratsos/workflow-engine/testing";
import { FakeClock, InMemoryBlobStore, CollectingEventSink, NoopScheduler } from "@bratsos/workflow-engine/kernel/testing";Troubleshooting
"Workflow not found in registry"
Ensure the workflow is registered in the registry passed to createKernel:
const kernel = createKernel({
// ...
registry: {
getWorkflow(id) {
const workflows = { "my-workflow": myWorkflow };
return workflows[id];
},
},
});"Stage X depends on Y which was not found"
Verify all dependencies are included in the workflow:
.pipe(extractStage) // Must be piped before
.pipe(analyzeStage) // analyze can now access extract's outputJobs stuck in "RUNNING"
A worker likely crashed. The stale lease recovery (lease.reapStale command) automatically releases jobs. In Node host, this runs on each orchestration tick. For serverless, call runMaintenanceTick() from a cron trigger.
License
MIT
