smart-request-queue
v1.2.1
Published
Enterprise-grade request queue for NestJS with Redis support, retries, priorities, and rate limiting
Downloads
71
Maintainers
Readme
smart-request-queue
Enterprise-grade request queue for NestJS with rate limiting, retries, priorities, Redis storage, and distributed Observable event streams.
Table of contents
- Features
- Installation
- Setup
- Basic usage
- Job options
- Cancel jobs
- Queue introspection
- Events
- Distributed events (multi-node)
- Interval formats
- Storage
- License
✨ Features
- ⚡ Rate limiting per queue (
requestsCount+requestInterval) - 🔑 Per-key isolation (per-user, per-tenant, per API key)
- 🧠 Priority-based execution (lower value = higher priority)
- ⏱️ Delayed jobs
- 🔁 Retry with exponential back-off
- ⛔ Job timeout handling
- 🛑 Cancel by job ID or key
- 🚦 In-memory or Redis storage
- 📦 Typed event system (
queueEvents) + RxJS Observable access - 📡 Distributed events across nodes via Redis Pub/Sub
- 📊 Queue size introspection via
queue.size(name)
📦 Installation
npm install smart-request-queue🏗️ Setup
In-memory (single instance)
import { Module } from '@nestjs/common';
import { QueueModule } from 'smart-request-queue';
@Module({
imports: [
QueueModule.forRoot({
requests: [{ name: 'api', requestsCount: 5, requestInterval: '1sec' }],
}),
],
})
export class AppModule {}Redis (production)
QueueModule.forRoot({
useRedis: true,
redisOptions: { host: 'localhost', port: 6379 },
requests: [{ name: 'api', requestsCount: 10, requestInterval: '1sec' }],
});Async setup
QueueModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
useRedis: true,
redisOptions: { host: config.get('REDIS_HOST'), port: 6379 },
requests: [{ name: 'api', requestsCount: 20, requestInterval: '1min' }],
}),
});Basic usage
constructor(private readonly queue: QueueService) {}
const result = await this.queue.add({
name: 'api',
fn: async () => 'ok',
});HTTP task (axios)
const data = await this.queue.add({
name: 'api',
axios: {
url: 'https://api.example.com/data',
config: { method: 'GET' },
},
});Job options
| Option | Type | Default | Description |
| ------------ | -------------------- | ------- | ----------------------------------------------------- |
| name | string | — | Queue name (required) |
| fn | () => Promise<any> | — | Function to execute (mutually exclusive with axios) |
| axios | { url, config? } | — | Axios request (mutually exclusive with fn) |
| key | string | — | Isolation key (per-user, per-tenant) |
| priority | number | 5 | Lower = higher priority |
| delay | number | 0 | Delay before execution (ms) |
| retry | number | 3 | Max retry attempts |
| retryDelay | number | 1000 | Base delay for exponential back-off (ms) |
| timeout | number | — | Max execution time; rejects on exceedance (ms) |
| stop | boolean | false | Cancel job before execution |
await this.queue.add({
name: 'api',
fn: async () => fetchData(),
key: 'user-123',
priority: 1,
delay: 5000,
retry: 3,
retryDelay: 500,
timeout: 3000,
});Cancel jobs
// Cancel a specific job by ID
await queue.stopJob('api', jobId);
// Cancel all jobs for a given key
await queue.stopByKey('api', 'user-123');Queue introspection
const waiting = await queue.size('api');
console.log(`${waiting} jobs pending`);Events
import { QueueEvent, queueEvents } from 'smart-request-queue';
queueEvents.on(QueueEvent.ADDED, (job) => console.log('added', job.id));
queueEvents.on(QueueEvent.COMPLETED, (result, job) =>
console.log('done', job.id, result),
);
queueEvents.on(QueueEvent.ERROR, (error, job) =>
console.error('failed', job.id, error),
);
queueEvents.on(QueueEvent.RETRY, (job) =>
console.warn('retrying', job.id, job.retry),
);
queueEvents.on(QueueEvent.TIMEOUT, (error, job) =>
console.warn('timeout', job?.id),
);All events: ADDED · SCHEDULED · STARTED · BEFORE · AFTER · COMPLETED · RETRY · ERROR · TIMEOUT · STOPPED · REMOVED
RxJS stream access
queueEvents
.asObservable(QueueEvent.COMPLETED)
.subscribe(([result, job]) => console.log(job.id, result));Unsubscribe
const handler = (job) => console.log(job.id);
queueEvents.on(QueueEvent.ADDED, handler);
// later…
queueEvents.off(QueueEvent.ADDED, handler);Distributed events (multi-node)
Redis Pub/Sub bridge with Observable streams for cross-instance event propagation.
Enable
QueueModule.forRoot({
useRedis: true,
redisOptions: { host: 'localhost', port: 6379 },
requests: [{ name: 'api', requestsCount: 5, requestInterval: '1sec' }],
distributedEvents: {
channel: 'myapp:queue-events', // optional, defaults to 'smart-request-queue:events'
sourceId: 'node-1', // optional, defaults to randomUUID()
},
});Subscribe
import { DistributedEventsBridge, QueueEvent } from 'smart-request-queue';
import { filter } from 'rxjs';
@Injectable()
export class MetricsService {
constructor(private readonly bridge: DistributedEventsBridge) {
// All events (local + remote)
bridge.events$.subscribe(({ event, sourceId, direction, timestamp }) => {
console.log(direction, event, 'from', sourceId);
});
// Filtered stream for a specific event
bridge.on$(QueueEvent.ERROR).subscribe(({ args }) => {
console.error(args[0]);
});
// Remote events only
bridge.events$
.pipe(filter((e) => e.direction === 'received'))
.subscribe((e) => console.log('remote event:', e.event));
}
}Notes:
- Self-published events are ignored on inbound replay using
sourceId— no double-processing. bridge.close()unsubscribes from Redis, completes the stream, and disconnects owned connections.- The bridge is only available when
distributedEventsis configured.
Interval formats
All requestInterval / retryDelay fields accept:
// Milliseconds (number)
requestInterval: 1000
// String shorthand
requestInterval: '500ms'
requestInterval: '2sec'
requestInterval: '1min'
requestInterval: '1hour'
requestInterval: '1day'
// Object
requestInterval: { value: 30, unit: 'sec' }Storage
| Backend | Use case | | -------------------- | ----------------------------------------------------------- | | Memory (default) | Local dev, single-node deployments | | Redis | Production, horizontal scaling, persistence across restarts |
License
MIT
