npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@zaby-ai/ique

v0.1.4

Published

Redis-backed workflow and queue engine

Readme

@zaby-ai/ique

A Redis-backed workflow and queue engine for Node.js.

@zaby-ai/ique combines durable, event-sourced workflow orchestration with a battle-tested job queue in a single, dependency-light package. Workflows are plain TypeScript classes decorated with @Workflow, @Activity, @Signal, @Query, and @Update. The queue layer offers priority, delay, deduplication, rate-limiting, flow dependencies, and sandboxed workers.


Table of Contents


Requirements

  • Node.js ≥ 18
  • Redis ≥ 6 (Redis Cloud / Redis Stack / Upstash all work)

Installation

Install from the public registry:

npm install @zaby-ai/ique
# or
pnpm add @zaby-ai/ique
# or
yarn add @zaby-ai/ique

Because @zaby-ai/ique uses TypeScript decorators you must also import reflect-metadata once at the top of your application entry-point and have the appropriate compiler options set (see TypeScript Setup).


Quick Start — Queue

import "reflect-metadata";
import { Queue, Worker } from "@zaby-ai/ique";

// 1. Create a queue (connects to Redis on 127.0.0.1:6379 by default)
const queue = new Queue<{ url: string }>("email-queue");

// 2. Create a worker to process jobs
const worker = new Worker<{ url: string }, { sent: boolean }>(
  queue,
  async (job) => {
    console.log(`Sending email to ${job.data.url}`);
    return { sent: true };
  },
  { autorun: true, concurrency: 5 },
);

worker.on("completed", (job, result) => console.log("Done", job.id, result));
worker.on("failed", (job, err)    => console.error("Failed", job?.id, err));

// 3. Enqueue a job
await queue.add({ url: "[email protected]" });

// Graceful shutdown
process.on("SIGTERM", async () => {
  await worker.close();
  await queue.close();
});

Quick Start — Workflows

import "reflect-metadata";
import {
  Activity,
  Query,
  Signal,
  Update,
  Workflow,
  WorkflowEngine,
  type ExecutionContext,
} from "@zaby-ai/ique";

@Workflow({ name: "order.fulfillment", maxDurationMs: 10 * 60_000 })
class OrderFulfillmentWorkflow {
  @Activity({ retries: 3, timeoutMs: 5_000 })
  async reserve(ctx: ExecutionContext): Promise<{ orderId: string }> {
    const orderId = ctx.input.orderId as string;
    ctx.setMemory("orderId", orderId);
    return { orderId };
  }

  @Activity({ retries: 2, timeoutMs: 8_000 })
  async ship(_ctx: ExecutionContext, previous: { orderId: string }): Promise<{ tracking: string }> {
    return { tracking: `TRK-${previous.orderId}` };
  }

  @Signal({ name: "cancel" })
  async onCancel(ctx: ExecutionContext): Promise<void> {
    ctx.setMemory("cancelled", true);
  }

  @Query({ name: "status" })
  async getStatus(ctx: ExecutionContext) {
    return {
      step:      ctx.state.currentStep,
      status:    ctx.state.status,
      cancelled: ctx.getMemory("cancelled") ?? false,
    };
  }

  @Update({ name: "addNote" })
  async addNote(ctx: ExecutionContext, payload: { note: string }) {
    const notes = ctx.getMemory<string[]>("notes") ?? [];
    notes.push(payload.note);
    ctx.setMemory("notes", notes);
    return { notes };
  }
}

const engine = new WorkflowEngine(); // defaults: localhost Redis, prefix "ique"

const { workflowId } = await engine.run(OrderFulfillmentWorkflow, {
  input: { orderId: "ORD-123" },
});

// Query workflow state synchronously
const status = await engine.queryWorkflow(OrderFulfillmentWorkflow, workflowId, "status");
console.log(status);

// Send a synchronous update
const result = await engine.updateWorkflow(
  OrderFulfillmentWorkflow,
  workflowId,
  "addNote",
  { note: "Fragile item — handle with care" },
);
console.log(result);

// Send a fire-and-forget signal
await engine.sendSignal(workflowId, "cancel");

await engine.close();

Configuration

Redis Connection

Both Queue and WorkflowEngine accept a RedisConnectionOptions object as their last constructor argument:

import type { RedisConnectionOptions } from "@zaby-ai/ique";

const redisOptions: RedisConnectionOptions = {
  host:     "redis.example.com",
  port:     6380,
  password: "secret",
  tls:      true,
  db:       0,
};

const queue  = new Queue("my-queue", {}, redisOptions);
const engine = new WorkflowEngine(redisOptions, "myapp"); // second arg = key prefix

You can also pass a full Redis URL:

const redisOptions: RedisConnectionOptions = {
  url: "rediss://:<password>@redis.example.com:6380",
};

Environment Variables

These variables are read automatically when no explicit redisOptions are provided:

| Variable | Default | Description | |---|---|---| | REDIS_URL | — | Full Redis connection URL (takes precedence) | | REDIS_HOST | 127.0.0.1 | Redis host | | REDIS_PORT | 6379 | Redis port | | REDIS_PASSWORD | — | Redis password | | REDIS_DB | 0 | Redis database number | | REDIS_TLS | false | Enable TLS ("true" / "false") |


Queue API

Queue

const q = new Queue<JobData>(queueName, queueOptions, redisOptions);

| Method | Description | |---|---| | add(data, opts?) | Enqueue a single job | | add(name, data, opts?) | Enqueue a named job | | addBulk(items) | Enqueue multiple jobs atomically | | addRepeatable(data, opts) | Schedule a recurring job (everyMs) | | removeRepeatable(repeatId) | Cancel a recurring job | | getJob(id) | Fetch a job by ID | | getJobs(status, start?, end?) | List jobs by status | | getJobCounts(...states) | Count jobs per state | | drain(delayed?) | Remove all waiting (and optionally delayed) jobs | | clean(gracePeriodMs, limit, state?) | Remove old finished jobs | | obliterate({ force? }) | Delete the queue and all its data | | pause() / resume() | Pause or resume processing | | isPaused() | Check paused state | | setGlobalConcurrency(n) | Enforce a global concurrency limit in Redis | | setGlobalRateLimit(max, durationMs) | Enforce a rate limit | | retryJobs({ count?, state?, timestamp? }) | Re-enqueue failed/completed jobs | | close() | Close the Redis connection |

Job Options (QueueJobOptions)

await queue.add("send-email", data, {
  jobId:           "idempotency-key",   // deterministic job ID
  attempts:        3,                   // max retry attempts
  backoffMs:       2_000,               // delay between retries
  delayMs:         5_000,               // initial delay before first attempt
  priority:        10,                  // higher = fewer slot-mates jump ahead
  deduplication:   { id: "key", ttl: 60_000 },  // skip duplicate enqueue
  debounce:        { id: "key", ttl: 5_000  },   // coalesce rapid calls
  removeOnComplete: true,
  removeOnFail:     false,
});

Worker

const worker = new Worker<JobData, Result>(queue, handler, options);

await worker.start(); // starts polling (or use autorun: true)
await worker.stop();  // drain active jobs then stop
await worker.close(); // alias for stop()
await worker.pause(/* doNotWaitActive = false */);
worker.resume();
worker.setConcurrency(n);

Worker events: "active", "completed", "failed", "stalled", "paused", "resumed", "locksRenewed", "lockRenewalFailed".

Sandboxed Workers

Run untrusted or CPU-bound jobs in an isolated process or thread:

const worker = new Worker(queue, "./path/to/processor.ts", {
  sandboxMode:     "process",  // or "thread"
  sandboxTimeoutMs: 30_000,
  concurrency:     4,
  autorun:         true,
});

The processor file must export a default async function (job: SandboxJob) => Promise<Result>.

Job

Jobs expose the following interface at runtime inside a worker handler:

async (job) => {
  console.log(job.id, job.name, job.data, job.attemptsMade);
  await job.updateProgress(50);              // number or object
  await job.updateProgress({ step: "download", pct: 50 });
  await job.log("Step finished");
  // return value becomes job.result
  return { ok: true };
}

FlowProducer — Job Dependencies

Create trees of jobs where a parent only starts after all children complete:

import { FlowProducer } from "@zaby-ai/ique";

const flow = new FlowProducer({}, redisOptions);

await flow.add({
  name:      "process-order",
  queueName: "orders",
  data:      { orderId: "ORD-1" },
  children: [
    { name: "reserve-stock",  queueName: "inventory", data: { sku: "A" } },
    { name: "reserve-stock",  queueName: "inventory", data: { sku: "B" } },
  ],
});

await flow.close();

QueueEvents — Live Event Stream

Subscribe to queue events via Redis Streams:

import { QueueEvents } from "@zaby-ai/ique";

const events = new QueueEvents("email-queue", redisOptions);

events.on("completed", ({ jobId, returnValue }) => {
  console.log(`Job ${jobId} completed`, returnValue);
});

events.on("failed", ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed:`, failedReason);
});

events.on("progress", ({ jobId, data }) => {
  console.log(`Job ${jobId} progress:`, data);
});

await events.close();

Workflow API

Decorators

All decorators require reflect-metadata to be imported once at the process entry-point.

| Decorator | Target | Purpose | |---|---|---| | @Workflow(opts) | class | Marks a class as a workflow definition | | @Activity(opts?) | method | Marks a step that is persisted and retried | | @Signal(opts?) | method | Fire-and-forget input that mutates state | | @Query(opts?) | method | Synchronous read-only state projection | | @Update(opts?) | method | Synchronous state mutation with a response | | @Compensate({ forActivity }) | method | Rollback handler run when its paired activity fails |

@Workflow options

@Workflow({
  name:           "my.workflow",   // required — unique workflow name
  maxDurationMs:  10 * 60_000,     // optional execution timeout
  heartbeatTtlMs: 30_000,          // optional stale-detection threshold
  maxEventHistory: 2_000,          // optional event stream trim size
})

@Activity options

@Activity({
  name:               "customName",  // optional override
  retries:            3,             // default: 0
  timeoutMs:          5_000,         // per-attempt timeout
  heartbeatTimeoutMs: 60_000,        // stale heartbeat threshold
})

WorkflowEngine

const engine = new WorkflowEngine(redisOptions?, prefix?);

| Method | Description | |---|---| | run(Class, input?) | Start a new workflow run | | resume(Class, workflowId) | Resume an interrupted workflow | | getState(workflowId) | Fetch the current workflow state | | cancelWorkflow(workflowId, reason?) | Request cancellation | | sendSignal(workflowId, name, payload?) | Fire an asynchronous signal | | queryWorkflow(Class, workflowId, name, payload?) | Invoke a @Query handler | | updateWorkflow(Class, workflowId, name, payload?) | Invoke an @Update handler | | runReplayHistory(Class, workflowId) | Validate history against current class definition | | runReplayHistories(Class, ids) | Validate multiple histories (async generator) | | findStaleRunningWorkflows() | List workflows that missed their heartbeat | | recoverStaleWorkflows(limit?) | Resume stale workflows | | startHeartbeatMonitor(opts?) | Start periodic stale-workflow recovery | | stopHeartbeatMonitor() | Stop the monitor | | close() | Close all Redis connections |

WorkflowExecutionInput

await engine.run(MyWorkflow, {
  input:  { orderId: "ORD-1" },       // available as ctx.input
  memory: { retryBudget: 5 },         // pre-seeded ctx memory
});

ExecutionContext

Every workflow method receives an ExecutionContext as its first argument:

// Identity
ctx.workflowId          // string
ctx.state               // full WorkflowState snapshot
ctx.input               // the input passed to engine.run()

// Per-run memory (persisted between steps)
ctx.getMemory<T>(key)
ctx.setMemory(key, value)
ctx.memory              // entire memory map

// Versioning
ctx.getVersion?.("change-id", minVersion, maxVersion)  // → number
ctx.patched?.("patch-id")                               // → boolean

// Flow control
ctx.continueAsNew?.(newInput)   // restart from scratch with new input
ctx.isCancelled?.()             // → boolean
ctx.throwIfCancelled?.()        // throws if cancellation was requested

// Child workflows
await ctx.startChildWorkflow?.(ChildClass, { input: { ... } })

// Runtime info (available inside @Query and @Update)
ctx.runtime?.historyLength   // number
ctx.runtime?.isReplaying     // boolean

Signals

Signals are fire-and-forget inputs delivered to a running workflow. They mutate ctx memory and are replayed automatically on resume.

@Signal({ name: "approve" })
async onApprove(ctx: ExecutionContext, payload: { userId: string }): Promise<void> {
  ctx.setMemory("approvedBy", payload.userId);
}

// Caller side
await engine.sendSignal(workflowId, "approve", { userId: "ops-team" });

Queries

Queries read state synchronously without mutating it. They work on both running and completed workflows.

@Query({ name: "progress" })
async getProgress(ctx: ExecutionContext) {
  return {
    step:   ctx.state.currentStep,
    status: ctx.state.status,
  };
}

// Caller side
const progress = await engine.queryWorkflow(MyWorkflow, workflowId, "progress");

Updates

Updates are synchronous mutations with a response value. State changes are persisted immediately.

@Update({ name: "approve" })
async approve(
  ctx: ExecutionContext,
  payload: { userId: string },
): Promise<{ ok: boolean }> {
  ctx.setMemory("approvedBy", payload.userId);
  return { ok: true };
}

// Caller side
const { ok } = await engine.updateWorkflow(
  MyWorkflow,
  workflowId,
  "approve",
  { userId: "ops-team" },
) as { ok: boolean };

Built-in __workflow_metadata Query

Every workflow supports a built-in query that returns its definition metadata and runtime summary — no extra code required:

const metadata = await engine.queryWorkflow(
  MyWorkflow,
  workflowId,
  "__workflow_metadata",
) as {
  workflowId: string;
  workflowName: string;
  status: string;
  historyLength: number;
  definition: {
    activityDefinitions:  Array<{ name: string }>;
    signalDefinitions:    Array<{ name: string }>;
    queryDefinitions:     Array<{ name: string }>;
    updateDefinitions:    Array<{ name: string }>;
  };
};

Child Workflows

Start a sub-workflow from inside a parent activity and store the reference:

@Activity({ retries: 1 })
async stepOne(ctx: ExecutionContext) {
  const child = await ctx.startChildWorkflow?.(ChildWorkflowClass, {
    input: { parentId: ctx.workflowId },
  });
  ctx.setMemory("childId", child?.workflowId);
  return { started: true };
}

Versioning and continueAsNew

Use ctx.getVersion to safely introduce breaking changes to running workflows:

@Activity()
async process(ctx: ExecutionContext) {
  const v = ctx.getVersion?.("add-validation", 1, 2) ?? 1;
  if (v >= 2) {
    // new code path
  } else {
    // legacy path
  }
}

Use continueAsNew to restart long-running workflows and keep the event history bounded:

ctx.continueAsNew?.({ input: ctx.input, memory: ctx.memory });

Saga / Compensation

Annotate rollback methods with @Compensate to automatically undo an activity if a later step fails:

@Activity({ retries: 2 })
async chargeCard(_ctx: ExecutionContext): Promise<{ chargeId: string }> {
  return { chargeId: "ch_abc" };
}

@Compensate({ forActivity: "chargeCard" })
async refundCard(ctx: ExecutionContext): Promise<void> {
  ctx.setMemory("refunded", true);
}

Compensation runs in reverse activity order automatically on workflow failure.

Deterministic Replay

Validate that a workflow's persisted history is still consistent with the current code — useful after refactors and deployments:

// Validate a single workflow
await engine.runReplayHistory(MyWorkflow, workflowId);

// Validate many workflows (async generator)
const ids = ["wf-1", "wf-2", "wf-3"];
for await (const result of engine.runReplayHistories(MyWorkflow, ids)) {
  if (result.error) {
    console.error(`${result.workflowId}: ${result.error.message}`);
  }
}

DeterminismViolationError is thrown (or yielded) when:

  • Activity steps have been reordered or removed
  • A new signal name is encountered that was not registered
  • Terminal state does not match the last history event

Recovery and Observability

Heartbeat Monitor

Automatically recover stale workflows that missed a heartbeat:

engine.startHeartbeatMonitor({
  intervalMs: 30_000,   // polling interval
  limit:      200,      // max workflows to scan per tick
});

// Stop when shutting down
engine.stopHeartbeatMonitor();

Manual Recovery

// List stale workflows
const stale = await engine.findStaleRunningWorkflows();

// Recover up to 50 stale workflows immediately
const recovered = await engine.recoverStaleWorkflows(50);

Workflow Inspector

import { WorkflowInspector } from "@zaby-ai/ique";

const inspector = new WorkflowInspector(redisOptions);
const state = await inspector.getState(workflowId);
const events = await inspector.getEvents(workflowId);
await inspector.close();

CLI

After building (npm run build), a small CLI is available:

# Inspect a workflow
npx ique-cli status <workflowId>

# List stale workflows
npx ique-cli stale

# Send a signal
npx ique-cli signal <workflowId> <signalName>

Redis is configured via the environment variables listed in the Configuration section.


Scripts Reference

npm run build        # compile TypeScript to dist/
npm run typecheck    # type-check without emitting
npm test             # run all tests (requires Redis)
npm run test:unit    # run unit tests only (no Redis required)
npm run verify:local # typecheck + unit tests + build
npm run verify:all   # typecheck + all tests + build

npm run example:workflow     # run the long-running workflow example
npm run example:interactive  # run the interactive verification example
npm run example:redis        # basic Redis smoke test

Stress / parity loop (runs LOOPS full verify cycles):

LOOPS=10 npm run parity:loop

TypeScript Setup

Enable decorator support in tsconfig.json:

{
  "compilerOptions": {
    "target":                     "ES2022",
    "module":                     "NodeNext",
    "moduleResolution":           "NodeNext",
    "experimentalDecorators":     true,
    "emitDecoratorMetadata":      true,
    "strict":                     true,
    "outDir":                     "dist"
  }
}

Import reflect-metadata once at the very top of your application entry-point, before any class that uses decorators:

import "reflect-metadata";
// rest of your app...

License

MIT