npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@aprismlab/thinrsmq

v0.1.2

Published

Thin Redis Streams message queue layer with retry, DLQ, and claim monitoring

Downloads

172

Readme

thinrsmq (Node.js)

Thin Redis Streams message queue layer with retry, DLQ, and claim monitoring. Wire-compatible with the Go implementation.

Installation

npm install thinrsmq ioredis

Note: ioredis is a peer dependency. You must install it separately.

Quick Start

Producer

import Redis from 'ioredis';
import { Producer, withDefaults } from 'thinrsmq';

const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const producer = new Producer(redis, config);

// Publish a single message
const id = await producer.publish('orders', {
  type: 'order.created',
  payload: JSON.stringify({ orderId: '123', amount: 99.99 }),
  version: '1',
  producedAt: '',
  traceId: 'trace-abc-123', // optional
  producer: 'order-service', // optional
});

console.log('Published message:', id);

// Publish a batch
const ids = await producer.publishBatch('orders', [
  { type: 'order.created', payload: '{"orderId":"124"}', version: '1', producedAt: '' },
  { type: 'order.created', payload: '{"orderId":"125"}', version: '1', producedAt: '' },
]);

console.log('Published batch:', ids);

Consumer

import Redis from 'ioredis';
import { Consumer, withDefaults } from 'thinrsmq';

const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const consumer = new Consumer(redis, config);

// Subscribe to a topic with a handler
await consumer.subscribe('orders', 'order-processor', async (msg) => {
  console.log('Processing order:', msg.payload);

  // Throw error to trigger retry
  // if (shouldRetry) throw new Error('Temporary failure');

  // Success - message will be acknowledged
});

// Optional: handle skipped messages (nil/trimmed or unknown version)
consumer.onSkip((id, reason) => {
  console.warn('Skipped message:', id, reason);
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await consumer.stop();
  await redis.quit();
});

Claim Monitor

The claim monitor detects idle messages in the pending entries list (PEL) and routes them to retry or DLQ based on attempt count.

import Redis from 'ioredis';
import { ClaimMonitor, withDefaults } from 'thinrsmq';

const redis = new Redis();
const config = withDefaults({
  namespace: 'myapp',
  monitor: {
    enabled: true,
    scanIntervalMs: 5000,
    minIdleTimeMs: 10000,
  },
});

const monitor = new ClaimMonitor(redis, config);
await monitor.start();

// Graceful shutdown
process.on('SIGTERM', async () => {
  await monitor.stop();
  await redis.quit();
});

Dead Letter Queue (DLQ)

import Redis from 'ioredis';
import { DLQ, withDefaults } from 'thinrsmq';

const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const dlq = new DLQ(redis, config);

// Peek at DLQ messages
const messages = await dlq.peek('orders', 10);
console.log('DLQ messages:', messages);

// Replay a message (with guard)
await dlq.replay('orders', messageId);

// Purge all DLQ messages for a topic
await dlq.purge('orders');

// Get DLQ size
const size = await dlq.size('orders');
console.log('DLQ size:', size);

Admin

import Redis from 'ioredis';
import { Admin, withDefaults } from 'thinrsmq';

const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const admin = new Admin(redis, config);

// Get pending stats for a stream
const pendingInfo = await admin.pendingStats('orders', 'order-processor');
console.log('Pending messages:', pendingInfo.count);

// Get consumer details
const consumerInfo = await admin.consumerInfo('orders', 'order-processor');
console.log('Consumers:', consumerInfo);

// Get stream details
const streamInfo = await admin.streamInfo('orders');
console.log('Stream length:', streamInfo.length);

Configuration

Configuration Reference

| Field | Type | Default | Description | |-------|------|---------|-------------| | namespace | string | (required) | Namespace for all Redis keys | | redis.address | string | "localhost:6379" | Redis server address | | redis.password | string | "" | Redis password | | redis.db | number | 0 | Redis database number | | redis.poolSize | number | 10 | Connection pool size | | redis.readTimeoutMs | number | 3000 | Read timeout in milliseconds | | redis.writeTimeoutMs | number | 3000 | Write timeout in milliseconds | | redis.useTls | boolean | false | Enable TLS for Redis connection | | streams.defaultMaxLen | number | 10000 | Default MAXLEN for streams | | consumer.batchSize | number | 10 | Number of messages to read per batch | | consumer.blockMs | number | 5000 | Block duration for XREADGROUP | | consumer.consumerName | string | "thinrsmq" | Prefix for consumer name generation | | consumer.shutdownTimeoutMs | number | 30000 | Graceful shutdown timeout | | retry.maxAttempts | number | 3 | Maximum retry attempts before DLQ | | retry.baseDelayMs | number | 1000 | Base delay for exponential backoff | | retry.maxDelayMs | number | 60000 | Maximum delay cap | | retry.jitter | boolean | true | Add random jitter to delays | | monitor.enabled | boolean | false | Enable claim monitor | | monitor.scanIntervalMs | number | 10000 | Interval between PEL scans | | monitor.minIdleTimeMs | number | 60000 | Minimum idle time to claim | | monitor.claimBatchSize | number | 100 | Max messages to claim per scan | | dlq.maxReplays | number | 3 | Max replays before freezing |

Using Environment Variables

import Redis from 'ioredis';
import { configFromEnv, withDefaults, Producer } from 'thinrsmq';

// Read Redis connection from environment variables
const envConfig = configFromEnv();
const config = withDefaults({ ...envConfig, namespace: 'myapp' });

const [host, port] = config.redis.address.split(':');
const redis = new Redis({
  host,
  port: parseInt(port, 10),
  password: config.redis.password || undefined,
  tls: config.redis.useTls ? { servername: host } : undefined,
});

const producer = new Producer(redis, config);

Environment Variables

| Variable | Description | Default | |----------|-------------|---------| | REDIS_HOST | Redis hostname | "localhost" | | REDIS_PORT | Redis port | "6379" | | REDIS_PASSWORD | Redis password | "" | | REDIS_USE_TLS | Enable TLS ("true" or "1") | false |

API Reference

Producer

new Producer(redis, config)

Creates a new producer instance.

  • redis: ioredis client instance
  • config: Configuration object

publish(topic: string, msg: Omit<Message, 'id'>): Promise<string>

Publishes a single message. Returns the stream entry ID.

  • topic: Topic name (stream will be created as {namespace}:{topic})
  • msg: Message object without the id field
    • type: Message type identifier
    • payload: Message payload (string)
    • version: Envelope version (use "1")
    • producedAt: ISO 8601 timestamp (auto-set if empty)
    • traceId: Optional trace ID
    • producer: Optional producer identifier

publishBatch(topic: string, msgs: Omit<Message, 'id'>[]): Promise<string[]>

Publishes multiple messages in a pipeline. Returns array of stream entry IDs.

Consumer

new Consumer(redis, config)

Creates a new consumer instance with a unique name.

subscribe(topic: string, group: string, handler: Handler): Promise<void>

Subscribes to a topic with a consumer group. Starts processing messages.

  • topic: Topic name
  • group: Consumer group name
  • handler: async (msg: Message) => void - Message handler function

Throws error to trigger retry. Success (no throw) acknowledges the message.

stop(): Promise<void>

Gracefully stops the consumer. Waits for in-flight messages to complete (up to shutdownTimeoutMs).

onSkip(handler: SkipHandler): void

Registers a callback for skipped messages (nil/trimmed or unknown version).

  • handler: (id: string, reason: string) => void

getConsumerName(): string

Returns the unique consumer name (format: {prefix}-{hostname}-{pid}-{uuid}).

ClaimMonitor

new ClaimMonitor(redis, config)

Creates a new claim monitor instance.

start(): Promise<void>

Starts the claim monitor. Periodically scans for idle messages and routes them to retry or DLQ.

stop(): Promise<void>

Stops the claim monitor.

scanOnce(): Promise<void>

Performs a single scan cycle. Useful for testing.

DLQ

new DLQ(redis, config)

Creates a new DLQ instance.

moveToDLQ(topic: string, msg: Message, failureMetadata): Promise<void>

Moves a message to the DLQ.

  • failureMetadata:
    • originalStream: Original stream key
    • failedAt: Failure timestamp
    • totalAttempts: Total retry attempts
    • lastError: Error message
    • consumerGroup: Consumer group name

peek(topic: string, count: number): Promise<DLQMessage[]>

Returns up to count messages from the DLQ (oldest first).

replay(topic: string, messageId: string): Promise<void>

Replays a DLQ message back to the original stream. Increments replay_count and freezes if >= maxReplays.

purge(topic: string): Promise<void>

Deletes all messages from the DLQ for a topic.

size(topic: string): Promise<number>

Returns the number of messages in the DLQ.

Admin

new Admin(redis, config)

Creates a new admin instance.

pendingStats(topic: string, group: string): Promise<PendingInfo>

Returns pending message statistics.

  • Returns: { count, minId, maxId, consumers }

consumerInfo(topic: string, group: string): Promise<ConsumerDetail[]>

Returns consumer details for a group.

  • Returns: Array of { name, pending, idle }

streamInfo(topic: string): Promise<StreamDetail>

Returns stream metadata.

  • Returns: { length, firstEntryId, lastEntryId, groups }

DLQ Operations

Admin also provides DLQ operations:

  • dlqSize(topic): Alias for DLQ.size()
  • dlqPeek(topic, count): Alias for DLQ.peek()
  • dlqReplay(topic, messageId): Alias for DLQ.replay()
  • dlqPurge(topic): Alias for DLQ.purge()

RetryStore

new RetryStore(redis, config)

Creates a new retry store instance.

initIfNotExists(topic: string, messageId: string): Promise<void>

Initializes retry hash if it doesn't exist (HSETNX for single-writer rule).

get(topic: string, messageId: string): Promise<RetryInfo | null>

Gets retry information for a message.

  • Returns: { attempt, lastAttemptAt } or null

set(topic: string, messageId: string, info: RetryInfo): Promise<void>

Sets retry information.

delete(topic: string, messageId: string): Promise<void>

Deletes retry hash.

incrementAttempt(topic: string, messageId: string): Promise<number>

Increments attempt count and returns new value.

Helper Functions

configFromEnv(): { redis: Partial<RedisConfig> }

Reads Redis connection settings from environment variables.

withDefaults(config: Partial<Config>): Config

Merges provided config with defaults.

validateConfig(config: Config): void

Validates configuration. Throws ConfigError on validation failure.

computeDelay(attempt: number, cfg: BackoffConfig): number

Computes exponential backoff delay in milliseconds.

  • Formula: min(baseDelay * 2^(attempt-1) + jitter, maxDelay)

Key Functions

streamKey(namespace: string, topic: string): string

Returns stream key: "{namespace}:{topic}"

dlqKey(namespace: string, topic: string): string

Returns DLQ key: "{namespace}:{topic}:dlq"

retryKey(namespace: string, topic: string, messageId: string): string

Returns retry hash key: "{namespace}:{topic}:retries:{messageId}"

Types

Message

type Message = {
  id: string;
  version: string;
  type: string;
  payload: string;
  traceId?: string;
  producedAt: string;
  producer?: string;
};

DLQMessage

type DLQMessage = Message & {
  originalId: string;
  originalStream: string;
  failedAt: string;
  totalAttempts: number;
  lastError: string;
  consumerGroup: string;
  replayCount: number;
};

Error Types

  • ThinrsmqError: Base error class
  • ConfigError: Configuration validation error
  • UnknownVersionError: Thrown when envelope version is not "1"
  • MessageTrimmedError: Thrown when message was trimmed from stream
  • MissingFieldError: Thrown when required field is missing

CLI Tools

CLI tools are provided for development and testing.

Producer CLI

npx tsx src/cli/producer.ts --topic orders --type order.created --payload '{"orderId":"123"}'

Consumer CLI

npx tsx src/cli/consumer.ts --topic orders --group order-processor

Admin CLI

# Pending stats
npx tsx src/cli/admin.ts --topic orders --group order-processor --command pending

# Consumer info
npx tsx src/cli/admin.ts --topic orders --group order-processor --command consumers

# Stream info
npx tsx src/cli/admin.ts --topic orders --command stream

Wire Format

Messages use envelope version "1" with the following fields:

| Field | Required | Description | |-------|----------|-------------| | v | Yes | Envelope version (always "1") | | type | Yes | Message type identifier | | payload | Yes | Message payload (string) | | produced_at | Yes | ISO 8601 timestamp | | trace_id | No | Trace ID for distributed tracing | | producer | No | Producer identifier |

DLQ messages include additional enrichment fields:

  • original_id: Original stream entry ID
  • original_stream: Original stream key
  • failed_at: Failure timestamp
  • total_attempts: Total retry attempts
  • last_error: Error message (truncated to 1000 chars)
  • consumer_group: Consumer group name
  • replay_count: Number of times replayed from DLQ

Cross-Language Interoperability

This Node.js implementation is wire-compatible with the Go implementation. Messages produced by one can be consumed by the other seamlessly.

See Go README for the Go implementation.

License

MIT License. See LICENSE for details.