redis-streamq
v0.1.1
Published
A small, dependency-light, type-safe event queue for Node.js backed by Redis Streams. Reliable at-least-once delivery, delayed jobs, retries with backoff, dead-letter queue, concurrency and graceful shutdown.
Downloads
273
Maintainers
Readme
redis-streamq
A small, type-safe event/job queue for Node.js backed by Redis Streams.
Use Redis as a standalone, reliable queue without pulling in a heavyweight
framework. You bring a Redis server and an ioredis connection; this library
gives you clean producer/consumer abstractions with the reliability features you
actually need in production.
import { Queue, Worker } from "redis-streamq";
const queue = new Queue("emails", { connection: "redis://localhost:6379" });
await queue.add("welcome", { to: "[email protected]" });
new Worker("emails", async (job) => {
await sendEmail(job.data.to);
}, { connection: "redis://localhost:6379" });Why
- Reliable, at-least-once delivery built on Redis Streams consumer groups with explicit acknowledgements — a crashed worker never silently drops a job.
- Retries with backoff (fixed or exponential), per-job or per-worker.
- Dead-letter queue for jobs that exhaust their attempts, with inspection and one-call replay.
- Delayed / scheduled jobs via a sorted set that is promoted automatically.
- Stalled-job recovery — work abandoned by a dead worker is reclaimed and
retried, bounded by
maxStalledCountso poison messages can't loop forever. - Concurrency control and graceful shutdown (in-flight jobs finish and ack before the process exits).
- Fully typed generic payloads and a typed event emitter.
- Tiny surface area.
ioredisis the only (peer) dependency.
Where does redis-streamq fit?
Node.js has several excellent queue libraries. Each solves a slightly different problem. redis-streamq is for when you already run Redis and want a small, reliable queue built on Redis Streams — without adopting a larger job framework or standing up separate infrastructure.
What redis-streamq gives you
- A focused API:
Queue(produce) andWorker(consume). - At-least-once delivery via Redis Streams consumer groups and explicit acks.
- Production basics in one place: retries with backoff, a dead-letter queue (with inspection and replay), delayed jobs, stalled-job recovery, concurrency control, and graceful shutdown.
- One peer dependency (
ioredis) — you bring the Redis client and connection setup you already use (TLS, Sentinel, Cluster, etc.). - TypeScript-first generics and a typed event emitter.
When redis-streamq is a good fit
- You want Streams semantics (consumer groups, pending entries, reclaim) with a minimal learning curve.
- Your workload needs reliable background jobs, not cron scheduling, job flows, or a built-in dashboard.
- You prefer a small dependency footprint and a library you can read through in an afternoon.
When another tool may suit you better
These are all solid choices — pick based on what your project needs:
- BullMQ — a full-featured Redis job system with repeatable/cron jobs, flows, rate limiting, and a rich ecosystem (e.g. Bull Board). A great option when you need those capabilities.
- bee-queue — a Redis-backed job queue with a straightforward API, well suited to many common job-processing workloads.
- Raw Redis lists (
LPUSH/BRPOP) — a simple pattern when you want the lightest possible Redis primitive and can design reliability yourself. - Dedicated brokers (RabbitMQ, Amazon SQS, etc.) — when messaging, routing, and broker-managed scaling are central to your architecture rather than extending an existing Redis deployment.
If your needs grow beyond what redis-streamq offers, moving to a fuller job system is a natural step — concepts like retries, dead-lettering, and delayed work translate directly.
Requirements
- Node.js >= 24
- Redis >= 6.2 (uses
XAUTOCLAIM, available since 6.2) ioredis^5.3.0(peer dependency)
Installation
npm install redis-streamq ioredis
# or
pnpm add redis-streamq ioredis
# or
yarn add redis-streamq ioredisioredis is a peer dependency so the queue shares the exact client (and
version) your app already uses.
Quick start
Producer
import { Queue } from "redis-streamq";
interface EmailJob {
to: string;
subject: string;
}
const queue = new Queue<EmailJob>("emails", {
connection: "redis://localhost:6379",
});
await queue.add("welcome", { to: "[email protected]", subject: "Hi!" });
await queue.close();Consumer
import { Worker } from "redis-streamq";
const worker = new Worker<EmailJob>(
"emails",
async (job) => {
await sendEmail(job.data.to, job.data.subject);
},
{ connection: "redis://localhost:6379", concurrency: 10 },
);
worker.on("completed", (job) => console.log("sent", job.id));
worker.on("failed", (job, err) => console.warn("failed", job?.id, err.message));
// Graceful shutdown
process.on("SIGTERM", () => worker.close().then(() => process.exit(0)));The worker starts automatically. Pass autorun: false and call worker.run()
yourself if you'd rather control when processing begins.
Connecting to Redis
connection accepts three forms:
// 1. A connection string
new Queue("q", { connection: "redis://:password@localhost:6379/0" });
// 2. ioredis options
new Queue("q", { connection: { host: "localhost", port: 6379, db: 1 } });
// 3. An existing ioredis client (NOT closed for you on close())
import { Redis } from "ioredis";
const client = new Redis(process.env.REDIS_URL);
new Queue("q", { connection: client });When you pass a string or options object, the library creates and owns the
client(s) and closes them on close(). When you pass your own client, you keep
ownership. Workers always create one extra connection internally for blocking
reads (a blocking command monopolizes its connection).
TLS / Redis Cluster / Sentinel: construct an
ioredisclient however you need and pass it in asconnection.
Common scenarios
Delayed and scheduled jobs
// Run 30 seconds from now
await queue.add("reminder", { id: 1 }, { delay: 30_000 });
// "Schedule" by computing the delay
const runAt = new Date("2026-01-01T00:00:00Z");
await queue.add("new-year", {}, { delay: runAt.getTime() - Date.now() });Retries and backoff
Configure defaults on the worker, override per job:
const worker = new Worker("q", processor, {
connection,
attempts: 5, // default max attempts
backoff: { type: "exponential", delay: 1000, maxDelay: 60_000 },
});
// Per-job override at enqueue time
await queue.add("critical", data, {
attempts: 10,
backoff: { type: "fixed", delay: 2000 },
});fixed: waitsdelayms between every attempt.exponential: waitsdelay * 2 ** (attemptsMade - 1)ms, capped bymaxDelay.
A job is retried until attemptsMade reaches maxAttempts, after which it is
moved to the dead-letter queue.
Dead-letter queue
// Inspect failed jobs
const dead = await queue.getDeadLetters(50);
for (const entry of dead) {
console.log(entry.job.id, entry.failedReason, entry.deadLetteredAt);
}
// Count them
const { deadLetter } = await queue.getCounts();
// Requeue everything for another try (resets attempt counters)
const requeued = await queue.retryDeadLetter();Concurrency
new Worker("q", processor, { connection, concurrency: 25 });The worker never has more than concurrency jobs in flight. Scale horizontally
by running multiple worker processes — they form a single consumer group and
share the load.
Bulk enqueue
await queue.addBulk([
{ name: "task", data: { i: 1 } },
{ name: "task", data: { i: 2 }, opts: { delay: 5000 } },
]);Graceful shutdown
async function shutdown() {
await worker.close(); // stops fetching, waits for in-flight jobs, acks, disconnects
await queue.close();
process.exit(0);
}
process.on("SIGINT", () => void shutdown());
process.on("SIGTERM", () => void shutdown());Stalled / crashed worker recovery
If a worker dies mid-job (e.g. the pod is killed), the job stays in the consumer
group's pending list. Another worker reclaims it after stalledInterval ms and
re-processes it. After maxStalledCount reclaims the job is dead-lettered to
prevent an infinite crash loop.
new Worker("q", processor, {
connection,
stalledInterval: 30_000, // consider a job stalled after 30s unacked
maxStalledCount: 3, // give up after 3 reclaims
});Events
Worker is a typed event emitter:
| Event | Arguments | When |
| --------------- | ------------------------------- | ----------------------------------------------- |
| ready | — | Connected and consumer group is ready |
| active | (job) | A job is about to be processed |
| completed | (job, result) | A job finished successfully |
| failed | (job, error) | An attempt failed (may still be retried) |
| dead-lettered | (job, error) | Attempts exhausted; moved to the dead-letter queue |
| stalled | (jobId) | A stalled job was reclaimed |
| error | (error) | A non-fatal error in the run loop |
| closed | — | The worker has fully stopped |
worker.on("completed", (job, result) => {});
worker.on("failed", (job, err) => {});
worker.on("error", (err) => {});API reference
new Queue<T>(name, options)
| Option | Type | Default | Description |
| ------------------- | -------------------------- | ------- | ------------------------------------ |
| connection | string \| RedisOptions \| Redis | — | How to connect to Redis (required) |
| prefix | string | "req" | Key prefix for namespacing |
| defaultJobOptions | JobOptions | {} | Defaults applied to every add |
Methods: add(name, data, opts?), addBulk(jobs), getCounts(),
getWaitingCount(), getDelayedCount(), getDeadLetterCount(),
getDeadLetters(count?), retryDeadLetter(), drain(opts?), close().
new Worker<T, R>(name, processor, options)
| Option | Type | Default | Description |
| ----------------- | --------------- | ------- | --------------------------------------------- |
| connection | see above | — | How to connect to Redis (required) |
| prefix | string | "req" | Must match the producing queue |
| concurrency | number | 1 | Max jobs processed at once |
| attempts | number | 3 | Default max attempts |
| backoff | BackoffOptions| exp 1s | Default backoff strategy |
| blockingTimeout | number (ms) | 5000 | Blocking read timeout per fetch loop |
| stalledInterval | number (ms) | 30000 | Unacked time before a job is considered stalled |
| maxStalledCount | number | 3 | Max reclaims before dead-lettering |
| autorun | boolean | true | Start processing immediately |
Methods: run(), close(), isRunning(), plus the events above.
JobOptions
| Field | Type | Description |
| --------- | --------------------------------- | ---------------------------------------- |
| delay | number (ms) | Delay before the job becomes available |
| attempts| number | Max attempts for this job |
| backoff | BackoffOptions | Backoff for this job |
| jobId | string | Custom id (for tracing; not dedup) |
Job<T>
{ id, name, data, attemptsMade, maxAttempts, backoff, timestamp } — the object
passed to your processor.
How it works
Each queue uses a small set of Redis keys, all derived from {prefix}:{name}:
| Key | Type | Purpose |
| ------------------------- | ------ | ---------------------------------------- |
| {prefix}:{name}:stream | stream | Ready-to-process jobs (consumer group) |
| {prefix}:{name}:delayed | zset | Delayed / retrying jobs (score = run-at) |
| {prefix}:{name}:dead | stream | Dead-lettered jobs |
- Producing an immediate job is an
XADD; a delayed job is aZADDinto the sorted set. - Workers form one consumer group and read with
XREADGROUP ... BLOCK. - A background tick promotes due delayed jobs into the stream (
ZRANGEBYSCORE→XADD) atomically via a Lua script. - On success the job is
XACK'd andXDEL'd. On failure it is rescheduled into the delayed set (retry) or appended to the dead-letter stream — each as a single atomic Lua script so a job is never lost or duplicated by a crash. - Stalled jobs are recovered with
XAUTOCLAIM;XPENDINGdelivery counts bound retries viamaxStalledCount.
Delivery semantics
This is an at-least-once queue. A job may be delivered more than once (e.g. a
worker crashes after doing the work but before acking). Make your processors
idempotent — key side effects on job.id.
Operational notes
- Attach an
errorlistener. The worker emitserrorfor transient, recoverable failures (e.g. a Redis blip during ack). If no listener is attached the error is logged toconsole.errorinstead of crashing your process — but you should handle it so you have visibility. - Behaviour during a Redis outage. The underlying
ioredisclient queues commands while disconnected (its default), soqueue.add()will wait for reconnection rather than reject. If you'd rather fail fast, pass anioredisclient created withenableOfflineQueue: falseas theconnection. - The dead-letter queue is unbounded by design. Drain it periodically with
getDeadLetters/retryDeadLetter, or trim the{prefix}:{name}:deadstream out of band if you don't need the history. - Retry/delay latency. Delayed and retried jobs become available within ~1s (the promotion tick), so very short backoff delays are rounded up to roughly that granularity.
Development
npm install
npm run build # bundle ESM + CJS + d.ts into dist/ (tsup)
npm run typecheck # tsc --noEmit
npm test # unit tests (no Redis required)
# Run the integration tests against a real Redis:
REDIS_URL=redis://localhost:6379 npm testA quick local Redis:
docker run --rm -p 6379:6379 redis:7