@philiprehberger/task-queue
v0.3.4
Published
In-process async job queue with concurrency control, priorities, and retries
Readme
@philiprehberger/task-queue
In-process async job queue with concurrency control, priorities, and retries
Note: This is an in-process queue using in-memory storage. For distributed job processing across multiple servers, use a Redis-backed solution like BullMQ
Installation
npm install @philiprehberger/task-queueUsage
Basic
import { createQueue } from '@philiprehberger/task-queue';
const queue = createQueue<{ to: string; subject: string }>({
concurrency: 3,
retries: 2,
});
queue.process(async (job) => {
await sendEmail(job.data.to, job.data.subject);
});
queue.add({ to: '[email protected]', subject: 'Welcome!' });Priority
queue.add(data, { priority: 'high' }); // processed first
queue.add(data, { priority: 'normal' }); // default
queue.add(data, { priority: 'low' }); // processed lastDelayed Jobs
queue.add(data, { delay: '5m' }); // process after 5 minutes
queue.add(data, { delay: 30000 }); // process after 30 secondsJob Timeout
// Queue-level default timeout
const queue = createQueue<MyData>({ concurrency: 3, timeout: 30000 });
// Per-job timeout override
queue.add(data, { timeout: 5000 });Max Queue Size
const queue = createQueue<MyData>({ maxSize: 1000 });
queue.add(data); // throws Error if queue already has 1000 pending jobsDeduplication
queue.add(data, { key: 'user:123:welcome' });
queue.add(data, { key: 'user:123:welcome' }); // updates priority/data if changedEvents
const queue = createQueue<MyData>(
{ concurrency: 5, retries: 3 },
{
onComplete: (job) => console.log(`Job ${job.id} done`),
onFailed: (job, error) => console.error(`Job ${job.id} failed:`, error),
onRetry: (job, error, attempt) => console.log(`Retrying ${job.id} (${attempt})`),
onDrained: () => console.log('All jobs done'),
},
);Pause / Resume
queue.pause();
queue.resume();Graceful Shutdown
process.on('SIGTERM', async () => {
await queue.drain(); // wait for active jobs to finish
process.exit(0);
});Clear Pending Jobs
const removed = queue.clear(); // removes all pending jobs, returns count removedQueue Stats
queue.size(); // total pending jobs
queue.active(); // currently processing
queue.pending(); // ready to process (not delayed)API
| Export | Type | Description |
|--------|------|-------------|
| createQueue(options?, events?) | Function | Create a new job queue; returns queue instance |
| parseDuration(input) | Function | Parse a duration string (e.g. '5m') or number to milliseconds |
Queue Instance Methods
| Method | Returns | Description |
|--------|---------|-------------|
| process(handler) | void | Register the job handler function |
| add(data, options?) | Job<T> | Add a job to the queue |
| pause() | void | Pause job processing |
| resume() | void | Resume job processing |
| drain() | Promise<void> | Wait for all active and pending jobs to complete |
| size() | number | Total pending jobs |
| active() | number | Currently processing jobs |
| pending() | number | Ready-to-process jobs (not delayed) |
| clear() | number | Remove all pending jobs; returns count removed |
| destroy() | void | Stop the queue and clear all state |
Development
npm install
npm run build
npm testLicense
MIT
