@zaby-ai/ique
v0.1.4
Published
Redis-backed workflow and queue engine
Maintainers
Readme
@zaby-ai/ique
A Redis-backed workflow and queue engine for Node.js.
@zaby-ai/ique combines durable, event-sourced workflow orchestration with a battle-tested job queue in a single, dependency-light package. Workflows are plain TypeScript classes decorated with @Workflow, @Activity, @Signal, @Query, and @Update. The queue layer offers priority, delay, deduplication, rate-limiting, flow dependencies, and sandboxed workers.
Table of Contents
- Requirements
- Installation
- Quick Start — Queue
- Quick Start — Workflows
- Configuration
- Queue API
- Workflow API
- Recovery and Observability
- CLI
- Scripts Reference
- TypeScript Setup
Requirements
- Node.js ≥ 18
- Redis ≥ 6 (Redis Cloud / Redis Stack / Upstash all work)
Installation
Install from the public registry:
npm install @zaby-ai/ique
# or
pnpm add @zaby-ai/ique
# or
yarn add @zaby-ai/iqueBecause @zaby-ai/ique uses TypeScript decorators you must also import reflect-metadata once at the top of your application entry-point and have the appropriate compiler options set (see TypeScript Setup).
Quick Start — Queue
import "reflect-metadata";
import { Queue, Worker } from "@zaby-ai/ique";
// 1. Create a queue (connects to Redis on 127.0.0.1:6379 by default)
const queue = new Queue<{ url: string }>("email-queue");
// 2. Create a worker to process jobs
const worker = new Worker<{ url: string }, { sent: boolean }>(
queue,
async (job) => {
console.log(`Sending email to ${job.data.url}`);
return { sent: true };
},
{ autorun: true, concurrency: 5 },
);
worker.on("completed", (job, result) => console.log("Done", job.id, result));
worker.on("failed", (job, err) => console.error("Failed", job?.id, err));
// 3. Enqueue a job
await queue.add({ url: "[email protected]" });
// Graceful shutdown
process.on("SIGTERM", async () => {
await worker.close();
await queue.close();
});Quick Start — Workflows
import "reflect-metadata";
import {
Activity,
Query,
Signal,
Update,
Workflow,
WorkflowEngine,
type ExecutionContext,
} from "@zaby-ai/ique";
@Workflow({ name: "order.fulfillment", maxDurationMs: 10 * 60_000 })
class OrderFulfillmentWorkflow {
@Activity({ retries: 3, timeoutMs: 5_000 })
async reserve(ctx: ExecutionContext): Promise<{ orderId: string }> {
const orderId = ctx.input.orderId as string;
ctx.setMemory("orderId", orderId);
return { orderId };
}
@Activity({ retries: 2, timeoutMs: 8_000 })
async ship(_ctx: ExecutionContext, previous: { orderId: string }): Promise<{ tracking: string }> {
return { tracking: `TRK-${previous.orderId}` };
}
@Signal({ name: "cancel" })
async onCancel(ctx: ExecutionContext): Promise<void> {
ctx.setMemory("cancelled", true);
}
@Query({ name: "status" })
async getStatus(ctx: ExecutionContext) {
return {
step: ctx.state.currentStep,
status: ctx.state.status,
cancelled: ctx.getMemory("cancelled") ?? false,
};
}
@Update({ name: "addNote" })
async addNote(ctx: ExecutionContext, payload: { note: string }) {
const notes = ctx.getMemory<string[]>("notes") ?? [];
notes.push(payload.note);
ctx.setMemory("notes", notes);
return { notes };
}
}
const engine = new WorkflowEngine(); // defaults: localhost Redis, prefix "ique"
const { workflowId } = await engine.run(OrderFulfillmentWorkflow, {
input: { orderId: "ORD-123" },
});
// Query workflow state synchronously
const status = await engine.queryWorkflow(OrderFulfillmentWorkflow, workflowId, "status");
console.log(status);
// Send a synchronous update
const result = await engine.updateWorkflow(
OrderFulfillmentWorkflow,
workflowId,
"addNote",
{ note: "Fragile item — handle with care" },
);
console.log(result);
// Send a fire-and-forget signal
await engine.sendSignal(workflowId, "cancel");
await engine.close();Configuration
Redis Connection
Both Queue and WorkflowEngine accept a RedisConnectionOptions object as their last constructor argument:
import type { RedisConnectionOptions } from "@zaby-ai/ique";
const redisOptions: RedisConnectionOptions = {
host: "redis.example.com",
port: 6380,
password: "secret",
tls: true,
db: 0,
};
const queue = new Queue("my-queue", {}, redisOptions);
const engine = new WorkflowEngine(redisOptions, "myapp"); // second arg = key prefixYou can also pass a full Redis URL:
const redisOptions: RedisConnectionOptions = {
url: "rediss://:<password>@redis.example.com:6380",
};Environment Variables
These variables are read automatically when no explicit redisOptions are provided:
| Variable | Default | Description |
|---|---|---|
| REDIS_URL | — | Full Redis connection URL (takes precedence) |
| REDIS_HOST | 127.0.0.1 | Redis host |
| REDIS_PORT | 6379 | Redis port |
| REDIS_PASSWORD | — | Redis password |
| REDIS_DB | 0 | Redis database number |
| REDIS_TLS | false | Enable TLS ("true" / "false") |
Queue API
Queue
const q = new Queue<JobData>(queueName, queueOptions, redisOptions);| Method | Description |
|---|---|
| add(data, opts?) | Enqueue a single job |
| add(name, data, opts?) | Enqueue a named job |
| addBulk(items) | Enqueue multiple jobs atomically |
| addRepeatable(data, opts) | Schedule a recurring job (everyMs) |
| removeRepeatable(repeatId) | Cancel a recurring job |
| getJob(id) | Fetch a job by ID |
| getJobs(status, start?, end?) | List jobs by status |
| getJobCounts(...states) | Count jobs per state |
| drain(delayed?) | Remove all waiting (and optionally delayed) jobs |
| clean(gracePeriodMs, limit, state?) | Remove old finished jobs |
| obliterate({ force? }) | Delete the queue and all its data |
| pause() / resume() | Pause or resume processing |
| isPaused() | Check paused state |
| setGlobalConcurrency(n) | Enforce a global concurrency limit in Redis |
| setGlobalRateLimit(max, durationMs) | Enforce a rate limit |
| retryJobs({ count?, state?, timestamp? }) | Re-enqueue failed/completed jobs |
| close() | Close the Redis connection |
Job Options (QueueJobOptions)
await queue.add("send-email", data, {
jobId: "idempotency-key", // deterministic job ID
attempts: 3, // max retry attempts
backoffMs: 2_000, // delay between retries
delayMs: 5_000, // initial delay before first attempt
priority: 10, // higher = fewer slot-mates jump ahead
deduplication: { id: "key", ttl: 60_000 }, // skip duplicate enqueue
debounce: { id: "key", ttl: 5_000 }, // coalesce rapid calls
removeOnComplete: true,
removeOnFail: false,
});Worker
const worker = new Worker<JobData, Result>(queue, handler, options);
await worker.start(); // starts polling (or use autorun: true)
await worker.stop(); // drain active jobs then stop
await worker.close(); // alias for stop()
await worker.pause(/* doNotWaitActive = false */);
worker.resume();
worker.setConcurrency(n);Worker events: "active", "completed", "failed", "stalled", "paused", "resumed", "locksRenewed", "lockRenewalFailed".
Sandboxed Workers
Run untrusted or CPU-bound jobs in an isolated process or thread:
const worker = new Worker(queue, "./path/to/processor.ts", {
sandboxMode: "process", // or "thread"
sandboxTimeoutMs: 30_000,
concurrency: 4,
autorun: true,
});The processor file must export a default async function (job: SandboxJob) => Promise<Result>.
Job
Jobs expose the following interface at runtime inside a worker handler:
async (job) => {
console.log(job.id, job.name, job.data, job.attemptsMade);
await job.updateProgress(50); // number or object
await job.updateProgress({ step: "download", pct: 50 });
await job.log("Step finished");
// return value becomes job.result
return { ok: true };
}FlowProducer — Job Dependencies
Create trees of jobs where a parent only starts after all children complete:
import { FlowProducer } from "@zaby-ai/ique";
const flow = new FlowProducer({}, redisOptions);
await flow.add({
name: "process-order",
queueName: "orders",
data: { orderId: "ORD-1" },
children: [
{ name: "reserve-stock", queueName: "inventory", data: { sku: "A" } },
{ name: "reserve-stock", queueName: "inventory", data: { sku: "B" } },
],
});
await flow.close();QueueEvents — Live Event Stream
Subscribe to queue events via Redis Streams:
import { QueueEvents } from "@zaby-ai/ique";
const events = new QueueEvents("email-queue", redisOptions);
events.on("completed", ({ jobId, returnValue }) => {
console.log(`Job ${jobId} completed`, returnValue);
});
events.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed:`, failedReason);
});
events.on("progress", ({ jobId, data }) => {
console.log(`Job ${jobId} progress:`, data);
});
await events.close();Workflow API
Decorators
All decorators require reflect-metadata to be imported once at the process entry-point.
| Decorator | Target | Purpose |
|---|---|---|
| @Workflow(opts) | class | Marks a class as a workflow definition |
| @Activity(opts?) | method | Marks a step that is persisted and retried |
| @Signal(opts?) | method | Fire-and-forget input that mutates state |
| @Query(opts?) | method | Synchronous read-only state projection |
| @Update(opts?) | method | Synchronous state mutation with a response |
| @Compensate({ forActivity }) | method | Rollback handler run when its paired activity fails |
@Workflow options
@Workflow({
name: "my.workflow", // required — unique workflow name
maxDurationMs: 10 * 60_000, // optional execution timeout
heartbeatTtlMs: 30_000, // optional stale-detection threshold
maxEventHistory: 2_000, // optional event stream trim size
})@Activity options
@Activity({
name: "customName", // optional override
retries: 3, // default: 0
timeoutMs: 5_000, // per-attempt timeout
heartbeatTimeoutMs: 60_000, // stale heartbeat threshold
})WorkflowEngine
const engine = new WorkflowEngine(redisOptions?, prefix?);| Method | Description |
|---|---|
| run(Class, input?) | Start a new workflow run |
| resume(Class, workflowId) | Resume an interrupted workflow |
| getState(workflowId) | Fetch the current workflow state |
| cancelWorkflow(workflowId, reason?) | Request cancellation |
| sendSignal(workflowId, name, payload?) | Fire an asynchronous signal |
| queryWorkflow(Class, workflowId, name, payload?) | Invoke a @Query handler |
| updateWorkflow(Class, workflowId, name, payload?) | Invoke an @Update handler |
| runReplayHistory(Class, workflowId) | Validate history against current class definition |
| runReplayHistories(Class, ids) | Validate multiple histories (async generator) |
| findStaleRunningWorkflows() | List workflows that missed their heartbeat |
| recoverStaleWorkflows(limit?) | Resume stale workflows |
| startHeartbeatMonitor(opts?) | Start periodic stale-workflow recovery |
| stopHeartbeatMonitor() | Stop the monitor |
| close() | Close all Redis connections |
WorkflowExecutionInput
await engine.run(MyWorkflow, {
input: { orderId: "ORD-1" }, // available as ctx.input
memory: { retryBudget: 5 }, // pre-seeded ctx memory
});ExecutionContext
Every workflow method receives an ExecutionContext as its first argument:
// Identity
ctx.workflowId // string
ctx.state // full WorkflowState snapshot
ctx.input // the input passed to engine.run()
// Per-run memory (persisted between steps)
ctx.getMemory<T>(key)
ctx.setMemory(key, value)
ctx.memory // entire memory map
// Versioning
ctx.getVersion?.("change-id", minVersion, maxVersion) // → number
ctx.patched?.("patch-id") // → boolean
// Flow control
ctx.continueAsNew?.(newInput) // restart from scratch with new input
ctx.isCancelled?.() // → boolean
ctx.throwIfCancelled?.() // throws if cancellation was requested
// Child workflows
await ctx.startChildWorkflow?.(ChildClass, { input: { ... } })
// Runtime info (available inside @Query and @Update)
ctx.runtime?.historyLength // number
ctx.runtime?.isReplaying // booleanSignals
Signals are fire-and-forget inputs delivered to a running workflow. They mutate ctx memory and are replayed automatically on resume.
@Signal({ name: "approve" })
async onApprove(ctx: ExecutionContext, payload: { userId: string }): Promise<void> {
ctx.setMemory("approvedBy", payload.userId);
}
// Caller side
await engine.sendSignal(workflowId, "approve", { userId: "ops-team" });Queries
Queries read state synchronously without mutating it. They work on both running and completed workflows.
@Query({ name: "progress" })
async getProgress(ctx: ExecutionContext) {
return {
step: ctx.state.currentStep,
status: ctx.state.status,
};
}
// Caller side
const progress = await engine.queryWorkflow(MyWorkflow, workflowId, "progress");Updates
Updates are synchronous mutations with a response value. State changes are persisted immediately.
@Update({ name: "approve" })
async approve(
ctx: ExecutionContext,
payload: { userId: string },
): Promise<{ ok: boolean }> {
ctx.setMemory("approvedBy", payload.userId);
return { ok: true };
}
// Caller side
const { ok } = await engine.updateWorkflow(
MyWorkflow,
workflowId,
"approve",
{ userId: "ops-team" },
) as { ok: boolean };Built-in __workflow_metadata Query
Every workflow supports a built-in query that returns its definition metadata and runtime summary — no extra code required:
const metadata = await engine.queryWorkflow(
MyWorkflow,
workflowId,
"__workflow_metadata",
) as {
workflowId: string;
workflowName: string;
status: string;
historyLength: number;
definition: {
activityDefinitions: Array<{ name: string }>;
signalDefinitions: Array<{ name: string }>;
queryDefinitions: Array<{ name: string }>;
updateDefinitions: Array<{ name: string }>;
};
};Child Workflows
Start a sub-workflow from inside a parent activity and store the reference:
@Activity({ retries: 1 })
async stepOne(ctx: ExecutionContext) {
const child = await ctx.startChildWorkflow?.(ChildWorkflowClass, {
input: { parentId: ctx.workflowId },
});
ctx.setMemory("childId", child?.workflowId);
return { started: true };
}Versioning and continueAsNew
Use ctx.getVersion to safely introduce breaking changes to running workflows:
@Activity()
async process(ctx: ExecutionContext) {
const v = ctx.getVersion?.("add-validation", 1, 2) ?? 1;
if (v >= 2) {
// new code path
} else {
// legacy path
}
}Use continueAsNew to restart long-running workflows and keep the event history bounded:
ctx.continueAsNew?.({ input: ctx.input, memory: ctx.memory });Saga / Compensation
Annotate rollback methods with @Compensate to automatically undo an activity if a later step fails:
@Activity({ retries: 2 })
async chargeCard(_ctx: ExecutionContext): Promise<{ chargeId: string }> {
return { chargeId: "ch_abc" };
}
@Compensate({ forActivity: "chargeCard" })
async refundCard(ctx: ExecutionContext): Promise<void> {
ctx.setMemory("refunded", true);
}Compensation runs in reverse activity order automatically on workflow failure.
Deterministic Replay
Validate that a workflow's persisted history is still consistent with the current code — useful after refactors and deployments:
// Validate a single workflow
await engine.runReplayHistory(MyWorkflow, workflowId);
// Validate many workflows (async generator)
const ids = ["wf-1", "wf-2", "wf-3"];
for await (const result of engine.runReplayHistories(MyWorkflow, ids)) {
if (result.error) {
console.error(`${result.workflowId}: ${result.error.message}`);
}
}DeterminismViolationError is thrown (or yielded) when:
- Activity steps have been reordered or removed
- A new signal name is encountered that was not registered
- Terminal state does not match the last history event
Recovery and Observability
Heartbeat Monitor
Automatically recover stale workflows that missed a heartbeat:
engine.startHeartbeatMonitor({
intervalMs: 30_000, // polling interval
limit: 200, // max workflows to scan per tick
});
// Stop when shutting down
engine.stopHeartbeatMonitor();Manual Recovery
// List stale workflows
const stale = await engine.findStaleRunningWorkflows();
// Recover up to 50 stale workflows immediately
const recovered = await engine.recoverStaleWorkflows(50);Workflow Inspector
import { WorkflowInspector } from "@zaby-ai/ique";
const inspector = new WorkflowInspector(redisOptions);
const state = await inspector.getState(workflowId);
const events = await inspector.getEvents(workflowId);
await inspector.close();CLI
After building (npm run build), a small CLI is available:
# Inspect a workflow
npx ique-cli status <workflowId>
# List stale workflows
npx ique-cli stale
# Send a signal
npx ique-cli signal <workflowId> <signalName>Redis is configured via the environment variables listed in the Configuration section.
Scripts Reference
npm run build # compile TypeScript to dist/
npm run typecheck # type-check without emitting
npm test # run all tests (requires Redis)
npm run test:unit # run unit tests only (no Redis required)
npm run verify:local # typecheck + unit tests + build
npm run verify:all # typecheck + all tests + build
npm run example:workflow # run the long-running workflow example
npm run example:interactive # run the interactive verification example
npm run example:redis # basic Redis smoke testStress / parity loop (runs LOOPS full verify cycles):
LOOPS=10 npm run parity:loopTypeScript Setup
Enable decorator support in tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"strict": true,
"outDir": "dist"
}
}Import reflect-metadata once at the very top of your application entry-point, before any class that uses decorators:
import "reflect-metadata";
// rest of your app...License
MIT
