@aprismlab/thinrsmq
v0.1.2
Published
Thin Redis Streams message queue layer with retry, DLQ, and claim monitoring
Downloads
172
Maintainers
Readme
thinrsmq (Node.js)
Thin Redis Streams message queue layer with retry, DLQ, and claim monitoring. Wire-compatible with the Go implementation.
Installation
npm install thinrsmq ioredisNote: ioredis is a peer dependency. You must install it separately.
Quick Start
Producer
import Redis from 'ioredis';
import { Producer, withDefaults } from 'thinrsmq';
const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const producer = new Producer(redis, config);
// Publish a single message
const id = await producer.publish('orders', {
type: 'order.created',
payload: JSON.stringify({ orderId: '123', amount: 99.99 }),
version: '1',
producedAt: '',
traceId: 'trace-abc-123', // optional
producer: 'order-service', // optional
});
console.log('Published message:', id);
// Publish a batch
const ids = await producer.publishBatch('orders', [
{ type: 'order.created', payload: '{"orderId":"124"}', version: '1', producedAt: '' },
{ type: 'order.created', payload: '{"orderId":"125"}', version: '1', producedAt: '' },
]);
console.log('Published batch:', ids);Consumer
import Redis from 'ioredis';
import { Consumer, withDefaults } from 'thinrsmq';
const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const consumer = new Consumer(redis, config);
// Subscribe to a topic with a handler
await consumer.subscribe('orders', 'order-processor', async (msg) => {
console.log('Processing order:', msg.payload);
// Throw error to trigger retry
// if (shouldRetry) throw new Error('Temporary failure');
// Success - message will be acknowledged
});
// Optional: handle skipped messages (nil/trimmed or unknown version)
consumer.onSkip((id, reason) => {
console.warn('Skipped message:', id, reason);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await consumer.stop();
await redis.quit();
});Claim Monitor
The claim monitor detects idle messages in the pending entries list (PEL) and routes them to retry or DLQ based on attempt count.
import Redis from 'ioredis';
import { ClaimMonitor, withDefaults } from 'thinrsmq';
const redis = new Redis();
const config = withDefaults({
namespace: 'myapp',
monitor: {
enabled: true,
scanIntervalMs: 5000,
minIdleTimeMs: 10000,
},
});
const monitor = new ClaimMonitor(redis, config);
await monitor.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await monitor.stop();
await redis.quit();
});Dead Letter Queue (DLQ)
import Redis from 'ioredis';
import { DLQ, withDefaults } from 'thinrsmq';
const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const dlq = new DLQ(redis, config);
// Peek at DLQ messages
const messages = await dlq.peek('orders', 10);
console.log('DLQ messages:', messages);
// Replay a message (with guard)
await dlq.replay('orders', messageId);
// Purge all DLQ messages for a topic
await dlq.purge('orders');
// Get DLQ size
const size = await dlq.size('orders');
console.log('DLQ size:', size);Admin
import Redis from 'ioredis';
import { Admin, withDefaults } from 'thinrsmq';
const redis = new Redis();
const config = withDefaults({ namespace: 'myapp' });
const admin = new Admin(redis, config);
// Get pending stats for a stream
const pendingInfo = await admin.pendingStats('orders', 'order-processor');
console.log('Pending messages:', pendingInfo.count);
// Get consumer details
const consumerInfo = await admin.consumerInfo('orders', 'order-processor');
console.log('Consumers:', consumerInfo);
// Get stream details
const streamInfo = await admin.streamInfo('orders');
console.log('Stream length:', streamInfo.length);Configuration
Configuration Reference
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| namespace | string | (required) | Namespace for all Redis keys |
| redis.address | string | "localhost:6379" | Redis server address |
| redis.password | string | "" | Redis password |
| redis.db | number | 0 | Redis database number |
| redis.poolSize | number | 10 | Connection pool size |
| redis.readTimeoutMs | number | 3000 | Read timeout in milliseconds |
| redis.writeTimeoutMs | number | 3000 | Write timeout in milliseconds |
| redis.useTls | boolean | false | Enable TLS for Redis connection |
| streams.defaultMaxLen | number | 10000 | Default MAXLEN for streams |
| consumer.batchSize | number | 10 | Number of messages to read per batch |
| consumer.blockMs | number | 5000 | Block duration for XREADGROUP |
| consumer.consumerName | string | "thinrsmq" | Prefix for consumer name generation |
| consumer.shutdownTimeoutMs | number | 30000 | Graceful shutdown timeout |
| retry.maxAttempts | number | 3 | Maximum retry attempts before DLQ |
| retry.baseDelayMs | number | 1000 | Base delay for exponential backoff |
| retry.maxDelayMs | number | 60000 | Maximum delay cap |
| retry.jitter | boolean | true | Add random jitter to delays |
| monitor.enabled | boolean | false | Enable claim monitor |
| monitor.scanIntervalMs | number | 10000 | Interval between PEL scans |
| monitor.minIdleTimeMs | number | 60000 | Minimum idle time to claim |
| monitor.claimBatchSize | number | 100 | Max messages to claim per scan |
| dlq.maxReplays | number | 3 | Max replays before freezing |
Using Environment Variables
import Redis from 'ioredis';
import { configFromEnv, withDefaults, Producer } from 'thinrsmq';
// Read Redis connection from environment variables
const envConfig = configFromEnv();
const config = withDefaults({ ...envConfig, namespace: 'myapp' });
const [host, port] = config.redis.address.split(':');
const redis = new Redis({
host,
port: parseInt(port, 10),
password: config.redis.password || undefined,
tls: config.redis.useTls ? { servername: host } : undefined,
});
const producer = new Producer(redis, config);Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| REDIS_HOST | Redis hostname | "localhost" |
| REDIS_PORT | Redis port | "6379" |
| REDIS_PASSWORD | Redis password | "" |
| REDIS_USE_TLS | Enable TLS ("true" or "1") | false |
API Reference
Producer
new Producer(redis, config)
Creates a new producer instance.
redis: ioredis client instanceconfig: Configuration object
publish(topic: string, msg: Omit<Message, 'id'>): Promise<string>
Publishes a single message. Returns the stream entry ID.
topic: Topic name (stream will be created as{namespace}:{topic})msg: Message object without theidfieldtype: Message type identifierpayload: Message payload (string)version: Envelope version (use"1")producedAt: ISO 8601 timestamp (auto-set if empty)traceId: Optional trace IDproducer: Optional producer identifier
publishBatch(topic: string, msgs: Omit<Message, 'id'>[]): Promise<string[]>
Publishes multiple messages in a pipeline. Returns array of stream entry IDs.
Consumer
new Consumer(redis, config)
Creates a new consumer instance with a unique name.
subscribe(topic: string, group: string, handler: Handler): Promise<void>
Subscribes to a topic with a consumer group. Starts processing messages.
topic: Topic namegroup: Consumer group namehandler:async (msg: Message) => void- Message handler function
Throws error to trigger retry. Success (no throw) acknowledges the message.
stop(): Promise<void>
Gracefully stops the consumer. Waits for in-flight messages to complete (up to shutdownTimeoutMs).
onSkip(handler: SkipHandler): void
Registers a callback for skipped messages (nil/trimmed or unknown version).
handler:(id: string, reason: string) => void
getConsumerName(): string
Returns the unique consumer name (format: {prefix}-{hostname}-{pid}-{uuid}).
ClaimMonitor
new ClaimMonitor(redis, config)
Creates a new claim monitor instance.
start(): Promise<void>
Starts the claim monitor. Periodically scans for idle messages and routes them to retry or DLQ.
stop(): Promise<void>
Stops the claim monitor.
scanOnce(): Promise<void>
Performs a single scan cycle. Useful for testing.
DLQ
new DLQ(redis, config)
Creates a new DLQ instance.
moveToDLQ(topic: string, msg: Message, failureMetadata): Promise<void>
Moves a message to the DLQ.
failureMetadata:originalStream: Original stream keyfailedAt: Failure timestamptotalAttempts: Total retry attemptslastError: Error messageconsumerGroup: Consumer group name
peek(topic: string, count: number): Promise<DLQMessage[]>
Returns up to count messages from the DLQ (oldest first).
replay(topic: string, messageId: string): Promise<void>
Replays a DLQ message back to the original stream. Increments replay_count and freezes if >= maxReplays.
purge(topic: string): Promise<void>
Deletes all messages from the DLQ for a topic.
size(topic: string): Promise<number>
Returns the number of messages in the DLQ.
Admin
new Admin(redis, config)
Creates a new admin instance.
pendingStats(topic: string, group: string): Promise<PendingInfo>
Returns pending message statistics.
- Returns:
{ count, minId, maxId, consumers }
consumerInfo(topic: string, group: string): Promise<ConsumerDetail[]>
Returns consumer details for a group.
- Returns: Array of
{ name, pending, idle }
streamInfo(topic: string): Promise<StreamDetail>
Returns stream metadata.
- Returns:
{ length, firstEntryId, lastEntryId, groups }
DLQ Operations
Admin also provides DLQ operations:
dlqSize(topic): Alias forDLQ.size()dlqPeek(topic, count): Alias forDLQ.peek()dlqReplay(topic, messageId): Alias forDLQ.replay()dlqPurge(topic): Alias forDLQ.purge()
RetryStore
new RetryStore(redis, config)
Creates a new retry store instance.
initIfNotExists(topic: string, messageId: string): Promise<void>
Initializes retry hash if it doesn't exist (HSETNX for single-writer rule).
get(topic: string, messageId: string): Promise<RetryInfo | null>
Gets retry information for a message.
- Returns:
{ attempt, lastAttemptAt }ornull
set(topic: string, messageId: string, info: RetryInfo): Promise<void>
Sets retry information.
delete(topic: string, messageId: string): Promise<void>
Deletes retry hash.
incrementAttempt(topic: string, messageId: string): Promise<number>
Increments attempt count and returns new value.
Helper Functions
configFromEnv(): { redis: Partial<RedisConfig> }
Reads Redis connection settings from environment variables.
withDefaults(config: Partial<Config>): Config
Merges provided config with defaults.
validateConfig(config: Config): void
Validates configuration. Throws ConfigError on validation failure.
computeDelay(attempt: number, cfg: BackoffConfig): number
Computes exponential backoff delay in milliseconds.
- Formula:
min(baseDelay * 2^(attempt-1) + jitter, maxDelay)
Key Functions
streamKey(namespace: string, topic: string): string
Returns stream key: "{namespace}:{topic}"
dlqKey(namespace: string, topic: string): string
Returns DLQ key: "{namespace}:{topic}:dlq"
retryKey(namespace: string, topic: string, messageId: string): string
Returns retry hash key: "{namespace}:{topic}:retries:{messageId}"
Types
Message
type Message = {
id: string;
version: string;
type: string;
payload: string;
traceId?: string;
producedAt: string;
producer?: string;
};DLQMessage
type DLQMessage = Message & {
originalId: string;
originalStream: string;
failedAt: string;
totalAttempts: number;
lastError: string;
consumerGroup: string;
replayCount: number;
};Error Types
ThinrsmqError: Base error classConfigError: Configuration validation errorUnknownVersionError: Thrown when envelope version is not "1"MessageTrimmedError: Thrown when message was trimmed from streamMissingFieldError: Thrown when required field is missing
CLI Tools
CLI tools are provided for development and testing.
Producer CLI
npx tsx src/cli/producer.ts --topic orders --type order.created --payload '{"orderId":"123"}'Consumer CLI
npx tsx src/cli/consumer.ts --topic orders --group order-processorAdmin CLI
# Pending stats
npx tsx src/cli/admin.ts --topic orders --group order-processor --command pending
# Consumer info
npx tsx src/cli/admin.ts --topic orders --group order-processor --command consumers
# Stream info
npx tsx src/cli/admin.ts --topic orders --command streamWire Format
Messages use envelope version "1" with the following fields:
| Field | Required | Description |
|-------|----------|-------------|
| v | Yes | Envelope version (always "1") |
| type | Yes | Message type identifier |
| payload | Yes | Message payload (string) |
| produced_at | Yes | ISO 8601 timestamp |
| trace_id | No | Trace ID for distributed tracing |
| producer | No | Producer identifier |
DLQ messages include additional enrichment fields:
original_id: Original stream entry IDoriginal_stream: Original stream keyfailed_at: Failure timestamptotal_attempts: Total retry attemptslast_error: Error message (truncated to 1000 chars)consumer_group: Consumer group namereplay_count: Number of times replayed from DLQ
Cross-Language Interoperability
This Node.js implementation is wire-compatible with the Go implementation. Messages produced by one can be consumed by the other seamlessly.
See Go README for the Go implementation.
License
MIT License. See LICENSE for details.
