npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

bearmq

v0.6.1

Published

Redis-backed durable job queue: BullMQ infrastructure, Trigger.dev ergonomics, step-based replay

Downloads

362

Readme

bearmq 🐻

A Redis-backed durable job queue for TypeScript — BullMQ's infrastructure, Trigger.dev's ergonomics.

  • Runs straight off a plain Redis server. No orchestration server, no extra infra: workers are just Node processes.
  • Trigger.dev-style task() authoring with retries, lifecycle hooks, and typed payloads/results.
  • Durable, checkpointed execution: step.run() persists each step's result to Redis as it completes. On a retry or a worker crash, the run function replays from the top — completed steps return their cached result instantly, so execution effectively resumes where it left off.
  • Durable waits (wait.for / wait.until) that release the worker and survive restarts.
  • triggerAndWait child tasks, batchTrigger, cron schedules, rate limiting, priorities, idempotency keys, stalled-job recovery, graceful shutdown.

Quick start

# a Redis server is the only dependency
brew services start redis   # or: docker run -p 6379:6379 redis:7
import { configure, task } from "bearmq";

// once, anywhere before the first trigger/worker
// (or just set REDIS_URL / BEARMQ_REDIS_URL)
configure({ connection: "redis://localhost:6379" });

export const { trigger, worker } = task({
  id: "process-order",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
    randomize: false,
  },
  run: async (payload: { orderId: string }, { ctx, step, wait }) => {
    // cached after first success — never re-runs on retry
    const order = await step.run("fetch-order", () => db.orders.find(payload.orderId));

    const charge = await step.run("charge-card", () => stripe.charge(order));

    // releases the worker; survives restarts; consumes no retry attempts
    await wait.for({ hours: 1 });

    // if THIS throws, the retry resumes here — fetch-order and
    // charge-card return their memoized results instantly
    await step.run("send-receipt", () => email.send(order.email, charge.id));

    return { chargeId: charge.id };
  },
});

Wiring up the two sides

task() returns plain functions, so the queue side and the worker side are just imports from the same task file. Your API imports trigger; a separate worker process imports worker and calls it — that call starts processing.

// tasks/process-order.ts  — the task definition (shown above)
export const { trigger, worker } = task({ id: "process-order", run: async (payload) => { ... } });
// api.ts — enqueue only; nothing executes here
import { trigger } from "./tasks/process-order.js";

const handle = await trigger({ orderId: "ord_123" });
const result = await handle.result(); // optional: await the outcome
// worker.ts — run with `node worker.js` (or tsx); calling worker() starts it
import { worker } from "./tasks/process-order.js";

const w = worker({
  concurrency: 10,
  limiter: { max: 10, duration: 1000 }, // ≤10 job starts per second, queue-wide
  onError: (error) => console.error(error),
  onSuccess: (result) => console.log(result),
});

// optional graceful shutdown
process.on("SIGTERM", () => w.stop());

A worker belongs to the task it was destructured from — it only fetches and runs that task. A worker process that should handle several tasks starts one worker per task. With more than one task, destructured names collide, so either rename on import or export the whole handle:

// tasks/resize-image.ts — exporting the handle avoids name collisions
export const resizeImage = task({ id: "resize-image", run: async (payload) => { ... } });
// worker.ts — one process hosting several tasks
import { worker as processOrderWorker } from "./tasks/process-order.js";
import { resizeImage } from "./tasks/resize-image.js";

processOrderWorker({ concurrency: 10 });
resizeImage.worker({ concurrency: 2 });

The durability model (read this)

bearmq is at-least-once with step-level replay (the Inngest/Temporal model):

  • On every attempt the run function re-executes from the top. Completed steps return their persisted result without running; everything else runs again.
  • Side effects belong inside steps. Code between steps re-runs on every attempt.
  • A crash after a step's side effect but before its result is persisted re-runs that step — make steps idempotent.
  • Step keys are name + invocation counter, so loops and duplicate names replay correctly — as long as your code is deterministic up to each step. Don't branch on Math.random()/Date.now() outside a step.
  • Deploys that rename or reorder steps shift the keys of in-flight jobs; keep step names stable.
  • Step results cross Redis via superjson: Date, Map, Set, BigInt, undefined all survive. Functions/streams/sockets throw at persist time.

API

task(definition)

const { trigger, batchTrigger, triggerAndWait, worker, schedule, unschedule } = task({
  id: "my-task",
  queue: "my-task",          // optional, defaults to id (one queue per task)
  retry: { maxAttempts: 3, factor: 2, minTimeoutInMs: 1000, maxTimeoutInMs: 30_000, randomize: true },
  run: async (payload, { ctx, step, wait }) => { ... },
  onStart:   (ctx) => {},            // every attempt
  onSuccess: (result, ctx) => {},
  onFailure: (error, ctx) => {},     // permanent failure only
});

Payload and result types infer from run — no explicit generics needed.

Triggering

const handle = await trigger(payload, {
  delay: { minutes: 5 },        // or milliseconds as a number
  idempotencyKey: "order-123",  // same key within 24h → same job
  priority: 1,                  // lower runs sooner; beats the FIFO queue
});
handle.id;
await handle.status();                       // "waiting" | "active" | "completed" | ...
await handle.result({ timeoutMs: 30_000 });  // resolves with R, rejects on permanent failure

const handles = await batchTrigger([{ payload: a }, { payload: b, opts: { delay: 1000 } }]);

Child tasks

const parent = task({
  id: "parent",
  run: async (payload, { step }) => {
    // durably parks the parent (frees the worker slot) until the child finishes;
    // the child's typed result is memoized like any step
    const result = await childTask.triggerAndWait({ n: 1 });

    try {
      await flakyChild.triggerAndWait({});
    } catch (err) {
      if (err instanceof ChildFailedError) { /* child exhausted its retries */ }
    }
  },
});

Workers

worker comes from the task() you defined — there is no standalone Worker class to construct. Each call processes that one task's queue and starts immediately (pass autostart: false to start later via w.start()).

import { worker } from "./tasks/process-order.js";

const w = worker({
  concurrency: 10,
  limiter: { max: 100, duration: 60_000 },  // queue-global rate limit
  lockDurationMs: 30_000,                   // renewed at 1/3 interval; crash recovery latency
  autostart: true,
  onError: (error, ctx) => {},              // per-attempt failures + infra errors
  onSuccess: (result, ctx) => {},
});

await w.stop();                  // graceful: stops fetching, drains in-flight jobs

Workers killed without cleanup are handled automatically: their jobs' locks expire and any worker's stalled checker re-delivers them (replaying past completed steps). After maxStalledCount re-deliveries a job is failed.

Cron schedules

await schedule({ cron: "0 9 * * *", tz: "America/New_York", payload: { ... }, key: "daily-digest" });
await unschedule("daily-digest");

Any running worker for the task executes due schedules. Instances are deduplicated across workers via deterministic job ids, so running N workers never double-fires a schedule.

Operational notes

  • Retention: completed jobs (and their step memos) are trimmed automatically — defaults: keep 1000 / 24h, configurable via configure({ defaultKeepCompleted }).
  • Connections: one shared command connection per process plus one blocking connection per worker, created lazily — task() does no I/O at import time.
  • Redis: single-node Redis is the target. Keys are hash-tagged by queue (bear:{queue}:...) so a future cluster mode is possible, but cross-queue parent/child resume uses multi-slot writes today.
  • Very long steps: the lock is renewed on a timer, so long steps are safe while the process lives — but a hard-killed worker's recovered job can double-execute a step that was mid-flight. Generous lockDurationMs shrinks the window; idempotent steps remove it.

Development

pnpm install
brew services start redis   # tests need a real Redis (Lua + blocking commands)
pnpm test                   # vitest against localhost:6379 (REDIS_URL to override)
pnpm build                  # tsup → dist/
npx tsx examples/smoke.ts   # end-to-end durability demo: kill -9 a worker mid-run