npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

redis-streamq

v0.1.1

Published

A small, dependency-light, type-safe event queue for Node.js backed by Redis Streams. Reliable at-least-once delivery, delayed jobs, retries with backoff, dead-letter queue, concurrency and graceful shutdown.

Downloads

273

Readme

redis-streamq

A small, type-safe event/job queue for Node.js backed by Redis Streams.

Use Redis as a standalone, reliable queue without pulling in a heavyweight framework. You bring a Redis server and an ioredis connection; this library gives you clean producer/consumer abstractions with the reliability features you actually need in production.

import { Queue, Worker } from "redis-streamq";

const queue = new Queue("emails", { connection: "redis://localhost:6379" });
await queue.add("welcome", { to: "[email protected]" });

new Worker("emails", async (job) => {
  await sendEmail(job.data.to);
}, { connection: "redis://localhost:6379" });

Why

  • Reliable, at-least-once delivery built on Redis Streams consumer groups with explicit acknowledgements — a crashed worker never silently drops a job.
  • Retries with backoff (fixed or exponential), per-job or per-worker.
  • Dead-letter queue for jobs that exhaust their attempts, with inspection and one-call replay.
  • Delayed / scheduled jobs via a sorted set that is promoted automatically.
  • Stalled-job recovery — work abandoned by a dead worker is reclaimed and retried, bounded by maxStalledCount so poison messages can't loop forever.
  • Concurrency control and graceful shutdown (in-flight jobs finish and ack before the process exits).
  • Fully typed generic payloads and a typed event emitter.
  • Tiny surface area. ioredis is the only (peer) dependency.

Where does redis-streamq fit?

Node.js has several excellent queue libraries. Each solves a slightly different problem. redis-streamq is for when you already run Redis and want a small, reliable queue built on Redis Streams — without adopting a larger job framework or standing up separate infrastructure.

What redis-streamq gives you

  • A focused API: Queue (produce) and Worker (consume).
  • At-least-once delivery via Redis Streams consumer groups and explicit acks.
  • Production basics in one place: retries with backoff, a dead-letter queue (with inspection and replay), delayed jobs, stalled-job recovery, concurrency control, and graceful shutdown.
  • One peer dependency (ioredis) — you bring the Redis client and connection setup you already use (TLS, Sentinel, Cluster, etc.).
  • TypeScript-first generics and a typed event emitter.

When redis-streamq is a good fit

  • You want Streams semantics (consumer groups, pending entries, reclaim) with a minimal learning curve.
  • Your workload needs reliable background jobs, not cron scheduling, job flows, or a built-in dashboard.
  • You prefer a small dependency footprint and a library you can read through in an afternoon.

When another tool may suit you better

These are all solid choices — pick based on what your project needs:

  • BullMQ — a full-featured Redis job system with repeatable/cron jobs, flows, rate limiting, and a rich ecosystem (e.g. Bull Board). A great option when you need those capabilities.
  • bee-queue — a Redis-backed job queue with a straightforward API, well suited to many common job-processing workloads.
  • Raw Redis lists (LPUSH / BRPOP) — a simple pattern when you want the lightest possible Redis primitive and can design reliability yourself.
  • Dedicated brokers (RabbitMQ, Amazon SQS, etc.) — when messaging, routing, and broker-managed scaling are central to your architecture rather than extending an existing Redis deployment.

If your needs grow beyond what redis-streamq offers, moving to a fuller job system is a natural step — concepts like retries, dead-lettering, and delayed work translate directly.

Requirements

  • Node.js >= 24
  • Redis >= 6.2 (uses XAUTOCLAIM, available since 6.2)
  • ioredis ^5.3.0 (peer dependency)

Installation

npm install redis-streamq ioredis
# or
pnpm add redis-streamq ioredis
# or
yarn add redis-streamq ioredis

ioredis is a peer dependency so the queue shares the exact client (and version) your app already uses.

Quick start

Producer

import { Queue } from "redis-streamq";

interface EmailJob {
  to: string;
  subject: string;
}

const queue = new Queue<EmailJob>("emails", {
  connection: "redis://localhost:6379",
});

await queue.add("welcome", { to: "[email protected]", subject: "Hi!" });

await queue.close();

Consumer

import { Worker } from "redis-streamq";

const worker = new Worker<EmailJob>(
  "emails",
  async (job) => {
    await sendEmail(job.data.to, job.data.subject);
  },
  { connection: "redis://localhost:6379", concurrency: 10 },
);

worker.on("completed", (job) => console.log("sent", job.id));
worker.on("failed", (job, err) => console.warn("failed", job?.id, err.message));

// Graceful shutdown
process.on("SIGTERM", () => worker.close().then(() => process.exit(0)));

The worker starts automatically. Pass autorun: false and call worker.run() yourself if you'd rather control when processing begins.

Connecting to Redis

connection accepts three forms:

// 1. A connection string
new Queue("q", { connection: "redis://:password@localhost:6379/0" });

// 2. ioredis options
new Queue("q", { connection: { host: "localhost", port: 6379, db: 1 } });

// 3. An existing ioredis client (NOT closed for you on close())
import { Redis } from "ioredis";
const client = new Redis(process.env.REDIS_URL);
new Queue("q", { connection: client });

When you pass a string or options object, the library creates and owns the client(s) and closes them on close(). When you pass your own client, you keep ownership. Workers always create one extra connection internally for blocking reads (a blocking command monopolizes its connection).

TLS / Redis Cluster / Sentinel: construct an ioredis client however you need and pass it in as connection.

Common scenarios

Delayed and scheduled jobs

// Run 30 seconds from now
await queue.add("reminder", { id: 1 }, { delay: 30_000 });

// "Schedule" by computing the delay
const runAt = new Date("2026-01-01T00:00:00Z");
await queue.add("new-year", {}, { delay: runAt.getTime() - Date.now() });

Retries and backoff

Configure defaults on the worker, override per job:

const worker = new Worker("q", processor, {
  connection,
  attempts: 5,                                   // default max attempts
  backoff: { type: "exponential", delay: 1000, maxDelay: 60_000 },
});

// Per-job override at enqueue time
await queue.add("critical", data, {
  attempts: 10,
  backoff: { type: "fixed", delay: 2000 },
});
  • fixed: waits delay ms between every attempt.
  • exponential: waits delay * 2 ** (attemptsMade - 1) ms, capped by maxDelay.

A job is retried until attemptsMade reaches maxAttempts, after which it is moved to the dead-letter queue.

Dead-letter queue

// Inspect failed jobs
const dead = await queue.getDeadLetters(50);
for (const entry of dead) {
  console.log(entry.job.id, entry.failedReason, entry.deadLetteredAt);
}

// Count them
const { deadLetter } = await queue.getCounts();

// Requeue everything for another try (resets attempt counters)
const requeued = await queue.retryDeadLetter();

Concurrency

new Worker("q", processor, { connection, concurrency: 25 });

The worker never has more than concurrency jobs in flight. Scale horizontally by running multiple worker processes — they form a single consumer group and share the load.

Bulk enqueue

await queue.addBulk([
  { name: "task", data: { i: 1 } },
  { name: "task", data: { i: 2 }, opts: { delay: 5000 } },
]);

Graceful shutdown

async function shutdown() {
  await worker.close(); // stops fetching, waits for in-flight jobs, acks, disconnects
  await queue.close();
  process.exit(0);
}
process.on("SIGINT", () => void shutdown());
process.on("SIGTERM", () => void shutdown());

Stalled / crashed worker recovery

If a worker dies mid-job (e.g. the pod is killed), the job stays in the consumer group's pending list. Another worker reclaims it after stalledInterval ms and re-processes it. After maxStalledCount reclaims the job is dead-lettered to prevent an infinite crash loop.

new Worker("q", processor, {
  connection,
  stalledInterval: 30_000, // consider a job stalled after 30s unacked
  maxStalledCount: 3,      // give up after 3 reclaims
});

Events

Worker is a typed event emitter:

| Event | Arguments | When | | --------------- | ------------------------------- | ----------------------------------------------- | | ready | — | Connected and consumer group is ready | | active | (job) | A job is about to be processed | | completed | (job, result) | A job finished successfully | | failed | (job, error) | An attempt failed (may still be retried) | | dead-lettered | (job, error) | Attempts exhausted; moved to the dead-letter queue | | stalled | (jobId) | A stalled job was reclaimed | | error | (error) | A non-fatal error in the run loop | | closed | — | The worker has fully stopped |

worker.on("completed", (job, result) => {});
worker.on("failed", (job, err) => {});
worker.on("error", (err) => {});

API reference

new Queue<T>(name, options)

| Option | Type | Default | Description | | ------------------- | -------------------------- | ------- | ------------------------------------ | | connection | string \| RedisOptions \| Redis | — | How to connect to Redis (required) | | prefix | string | "req" | Key prefix for namespacing | | defaultJobOptions | JobOptions | {} | Defaults applied to every add |

Methods: add(name, data, opts?), addBulk(jobs), getCounts(), getWaitingCount(), getDelayedCount(), getDeadLetterCount(), getDeadLetters(count?), retryDeadLetter(), drain(opts?), close().

new Worker<T, R>(name, processor, options)

| Option | Type | Default | Description | | ----------------- | --------------- | ------- | --------------------------------------------- | | connection | see above | — | How to connect to Redis (required) | | prefix | string | "req" | Must match the producing queue | | concurrency | number | 1 | Max jobs processed at once | | attempts | number | 3 | Default max attempts | | backoff | BackoffOptions| exp 1s | Default backoff strategy | | blockingTimeout | number (ms) | 5000 | Blocking read timeout per fetch loop | | stalledInterval | number (ms) | 30000 | Unacked time before a job is considered stalled | | maxStalledCount | number | 3 | Max reclaims before dead-lettering | | autorun | boolean | true | Start processing immediately |

Methods: run(), close(), isRunning(), plus the events above.

JobOptions

| Field | Type | Description | | --------- | --------------------------------- | ---------------------------------------- | | delay | number (ms) | Delay before the job becomes available | | attempts| number | Max attempts for this job | | backoff | BackoffOptions | Backoff for this job | | jobId | string | Custom id (for tracing; not dedup) |

Job<T>

{ id, name, data, attemptsMade, maxAttempts, backoff, timestamp } — the object passed to your processor.

How it works

Each queue uses a small set of Redis keys, all derived from {prefix}:{name}:

| Key | Type | Purpose | | ------------------------- | ------ | ---------------------------------------- | | {prefix}:{name}:stream | stream | Ready-to-process jobs (consumer group) | | {prefix}:{name}:delayed | zset | Delayed / retrying jobs (score = run-at) | | {prefix}:{name}:dead | stream | Dead-lettered jobs |

  • Producing an immediate job is an XADD; a delayed job is a ZADD into the sorted set.
  • Workers form one consumer group and read with XREADGROUP ... BLOCK.
  • A background tick promotes due delayed jobs into the stream (ZRANGEBYSCOREXADD) atomically via a Lua script.
  • On success the job is XACK'd and XDEL'd. On failure it is rescheduled into the delayed set (retry) or appended to the dead-letter stream — each as a single atomic Lua script so a job is never lost or duplicated by a crash.
  • Stalled jobs are recovered with XAUTOCLAIM; XPENDING delivery counts bound retries via maxStalledCount.

Delivery semantics

This is an at-least-once queue. A job may be delivered more than once (e.g. a worker crashes after doing the work but before acking). Make your processors idempotent — key side effects on job.id.

Operational notes

  • Attach an error listener. The worker emits error for transient, recoverable failures (e.g. a Redis blip during ack). If no listener is attached the error is logged to console.error instead of crashing your process — but you should handle it so you have visibility.
  • Behaviour during a Redis outage. The underlying ioredis client queues commands while disconnected (its default), so queue.add() will wait for reconnection rather than reject. If you'd rather fail fast, pass an ioredis client created with enableOfflineQueue: false as the connection.
  • The dead-letter queue is unbounded by design. Drain it periodically with getDeadLetters / retryDeadLetter, or trim the {prefix}:{name}:dead stream out of band if you don't need the history.
  • Retry/delay latency. Delayed and retried jobs become available within ~1s (the promotion tick), so very short backoff delays are rounded up to roughly that granularity.

Development

npm install
npm run build        # bundle ESM + CJS + d.ts into dist/ (tsup)
npm run typecheck    # tsc --noEmit
npm test             # unit tests (no Redis required)

# Run the integration tests against a real Redis:
REDIS_URL=redis://localhost:6379 npm test

A quick local Redis:

docker run --rm -p 6379:6379 redis:7

License

MIT