@bratsos/workflow-engine-host-node
v0.3.0
Published
Node.js host for @bratsos/workflow-engine command kernel
Readme
@bratsos/workflow-engine-host-node
Node.js host for the @bratsos/workflow-engine command kernel. Provides process loops, signal handling, and continuous job processing.
Installation
npm install @bratsos/workflow-engine-host-nodeQuick Start
import { createKernel } from "@bratsos/workflow-engine/kernel";
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
import { createPrismaJobQueue } from "@bratsos/workflow-engine";
const kernel = createKernel({ /* ... */ });
const jobTransport = createPrismaJobQueue(prisma);
const host = createNodeHost({
kernel,
jobTransport,
workerId: "worker-1",
});
await host.start();API
createNodeHost(config): NodeHost
Creates a new Node host instance.
NodeHostConfig
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| kernel | Kernel | required | Kernel instance to dispatch commands to |
| jobTransport | JobTransport | required | Job transport for dequeue/complete/suspend/fail |
| workerId | string | required | Unique worker identifier |
| orchestrationIntervalMs | number | 10_000 | Interval for claim/poll/reap/flush orchestration tick |
| jobPollIntervalMs | number | 1_000 | Interval for polling job queue when empty |
| staleLeaseThresholdMs | number | 60_000 | Time before a job lease is considered stale |
| maxClaimsPerTick | number | 10 | Max pending runs to claim per orchestration tick |
| maxSuspendedChecksPerTick | number | 10 | Max suspended stages to poll per tick |
| maxOutboxFlushPerTick | number | 100 | Max outbox events to flush per tick |
NodeHost
| Method | Returns | Description |
|--------|---------|-------------|
| start() | Promise<void> | Start polling loops and register SIGTERM/SIGINT handlers |
| stop() | Promise<void> | Graceful shutdown -- clears timers and signal handlers |
| getStats() | HostStats | Runtime statistics |
HostStats
interface HostStats {
workerId: string;
jobsProcessed: number;
orchestrationTicks: number;
isRunning: boolean;
uptimeMs: number;
}How It Works
The host runs two concurrent loops:
Orchestration timer (every
orchestrationIntervalMs):run.claimPending-- claim pending runs, enqueue first-stage jobsstage.pollSuspended-- check if suspended stages are ready to resumelease.reapStale-- release stale job leases from crashed workersoutbox.flush-- publish pending events through EventSink
Job processing loop (continuous):
- Dequeue next job from
jobTransport - Dispatch
job.executeto the kernel - On completion: mark complete, dispatch
run.transition - On suspension: mark suspended with next poll time
- On failure: mark failed with retry flag
- Sleep
jobPollIntervalMswhen queue is empty
- Dequeue next job from
Signal handlers (SIGTERM, SIGINT) automatically call stop() for graceful shutdown.
Worker Process Pattern
// worker.ts
import { host } from "./setup";
await host.start();
// Host runs until SIGTERM/SIGINT or host.stop() is callednpx tsx worker.tsMulti-Worker
Multiple workers can share the same database. Each needs a unique workerId:
// worker-1
createNodeHost({ kernel, jobTransport, workerId: "worker-1" });
// worker-2
createNodeHost({ kernel, jobTransport, workerId: "worker-2" });Run claiming uses FOR UPDATE SKIP LOCKED in PostgreSQL to prevent race conditions.
License
MIT
