bearmq
v0.6.1
Published
Redis-backed durable job queue: BullMQ infrastructure, Trigger.dev ergonomics, step-based replay
Downloads
362
Maintainers
Readme
bearmq 🐻
A Redis-backed durable job queue for TypeScript — BullMQ's infrastructure, Trigger.dev's ergonomics.
- Runs straight off a plain Redis server. No orchestration server, no extra infra: workers are just Node processes.
- Trigger.dev-style
task()authoring with retries, lifecycle hooks, and typed payloads/results. - Durable, checkpointed execution:
step.run()persists each step's result to Redis as it completes. On a retry or a worker crash, the run function replays from the top — completed steps return their cached result instantly, so execution effectively resumes where it left off. - Durable waits (
wait.for/wait.until) that release the worker and survive restarts. triggerAndWaitchild tasks,batchTrigger, cron schedules, rate limiting, priorities, idempotency keys, stalled-job recovery, graceful shutdown.
Quick start
# a Redis server is the only dependency
brew services start redis # or: docker run -p 6379:6379 redis:7import { configure, task } from "bearmq";
// once, anywhere before the first trigger/worker
// (or just set REDIS_URL / BEARMQ_REDIS_URL)
configure({ connection: "redis://localhost:6379" });
export const { trigger, worker } = task({
id: "process-order",
retry: {
maxAttempts: 10,
factor: 1.8,
minTimeoutInMs: 500,
maxTimeoutInMs: 30_000,
randomize: false,
},
run: async (payload: { orderId: string }, { ctx, step, wait }) => {
// cached after first success — never re-runs on retry
const order = await step.run("fetch-order", () => db.orders.find(payload.orderId));
const charge = await step.run("charge-card", () => stripe.charge(order));
// releases the worker; survives restarts; consumes no retry attempts
await wait.for({ hours: 1 });
// if THIS throws, the retry resumes here — fetch-order and
// charge-card return their memoized results instantly
await step.run("send-receipt", () => email.send(order.email, charge.id));
return { chargeId: charge.id };
},
});Wiring up the two sides
task() returns plain functions, so the queue side and the worker side are just imports from the same task file. Your API imports trigger; a separate worker process imports worker and calls it — that call starts processing.
// tasks/process-order.ts — the task definition (shown above)
export const { trigger, worker } = task({ id: "process-order", run: async (payload) => { ... } });// api.ts — enqueue only; nothing executes here
import { trigger } from "./tasks/process-order.js";
const handle = await trigger({ orderId: "ord_123" });
const result = await handle.result(); // optional: await the outcome// worker.ts — run with `node worker.js` (or tsx); calling worker() starts it
import { worker } from "./tasks/process-order.js";
const w = worker({
concurrency: 10,
limiter: { max: 10, duration: 1000 }, // ≤10 job starts per second, queue-wide
onError: (error) => console.error(error),
onSuccess: (result) => console.log(result),
});
// optional graceful shutdown
process.on("SIGTERM", () => w.stop());A worker belongs to the task it was destructured from — it only fetches and runs that task. A worker process that should handle several tasks starts one worker per task. With more than one task, destructured names collide, so either rename on import or export the whole handle:
// tasks/resize-image.ts — exporting the handle avoids name collisions
export const resizeImage = task({ id: "resize-image", run: async (payload) => { ... } });// worker.ts — one process hosting several tasks
import { worker as processOrderWorker } from "./tasks/process-order.js";
import { resizeImage } from "./tasks/resize-image.js";
processOrderWorker({ concurrency: 10 });
resizeImage.worker({ concurrency: 2 });The durability model (read this)
bearmq is at-least-once with step-level replay (the Inngest/Temporal model):
- On every attempt the run function re-executes from the top. Completed steps return their persisted result without running; everything else runs again.
- Side effects belong inside steps. Code between steps re-runs on every attempt.
- A crash after a step's side effect but before its result is persisted re-runs that step — make steps idempotent.
- Step keys are
name + invocation counter, so loops and duplicate names replay correctly — as long as your code is deterministic up to each step. Don't branch onMath.random()/Date.now()outside a step. - Deploys that rename or reorder steps shift the keys of in-flight jobs; keep step names stable.
- Step results cross Redis via superjson:
Date,Map,Set,BigInt,undefinedall survive. Functions/streams/sockets throw at persist time.
API
task(definition)
const { trigger, batchTrigger, triggerAndWait, worker, schedule, unschedule } = task({
id: "my-task",
queue: "my-task", // optional, defaults to id (one queue per task)
retry: { maxAttempts: 3, factor: 2, minTimeoutInMs: 1000, maxTimeoutInMs: 30_000, randomize: true },
run: async (payload, { ctx, step, wait }) => { ... },
onStart: (ctx) => {}, // every attempt
onSuccess: (result, ctx) => {},
onFailure: (error, ctx) => {}, // permanent failure only
});Payload and result types infer from run — no explicit generics needed.
Triggering
const handle = await trigger(payload, {
delay: { minutes: 5 }, // or milliseconds as a number
idempotencyKey: "order-123", // same key within 24h → same job
priority: 1, // lower runs sooner; beats the FIFO queue
});
handle.id;
await handle.status(); // "waiting" | "active" | "completed" | ...
await handle.result({ timeoutMs: 30_000 }); // resolves with R, rejects on permanent failure
const handles = await batchTrigger([{ payload: a }, { payload: b, opts: { delay: 1000 } }]);Child tasks
const parent = task({
id: "parent",
run: async (payload, { step }) => {
// durably parks the parent (frees the worker slot) until the child finishes;
// the child's typed result is memoized like any step
const result = await childTask.triggerAndWait({ n: 1 });
try {
await flakyChild.triggerAndWait({});
} catch (err) {
if (err instanceof ChildFailedError) { /* child exhausted its retries */ }
}
},
});Workers
worker comes from the task() you defined — there is no standalone Worker class to construct. Each call processes that one task's queue and starts immediately (pass autostart: false to start later via w.start()).
import { worker } from "./tasks/process-order.js";
const w = worker({
concurrency: 10,
limiter: { max: 100, duration: 60_000 }, // queue-global rate limit
lockDurationMs: 30_000, // renewed at 1/3 interval; crash recovery latency
autostart: true,
onError: (error, ctx) => {}, // per-attempt failures + infra errors
onSuccess: (result, ctx) => {},
});
await w.stop(); // graceful: stops fetching, drains in-flight jobsWorkers killed without cleanup are handled automatically: their jobs' locks expire and any worker's stalled checker re-delivers them (replaying past completed steps). After maxStalledCount re-deliveries a job is failed.
Cron schedules
await schedule({ cron: "0 9 * * *", tz: "America/New_York", payload: { ... }, key: "daily-digest" });
await unschedule("daily-digest");Any running worker for the task executes due schedules. Instances are deduplicated across workers via deterministic job ids, so running N workers never double-fires a schedule.
Operational notes
- Retention: completed jobs (and their step memos) are trimmed automatically — defaults: keep 1000 / 24h, configurable via
configure({ defaultKeepCompleted }). - Connections: one shared command connection per process plus one blocking connection per worker, created lazily —
task()does no I/O at import time. - Redis: single-node Redis is the target. Keys are hash-tagged by queue (
bear:{queue}:...) so a future cluster mode is possible, but cross-queue parent/child resume uses multi-slot writes today. - Very long steps: the lock is renewed on a timer, so long steps are safe while the process lives — but a hard-killed worker's recovered job can double-execute a step that was mid-flight. Generous
lockDurationMsshrinks the window; idempotent steps remove it.
Development
pnpm install
brew services start redis # tests need a real Redis (Lua + blocking commands)
pnpm test # vitest against localhost:6379 (REDIS_URL to override)
pnpm build # tsup → dist/
npx tsx examples/smoke.ts # end-to-end durability demo: kill -9 a worker mid-run