@flightdev/queue
v0.2.1
Published
Agnostic background job queue for Flight Framework. Choose your backend: Redis, BullMQ, SQLite, Postgres, SQS.
Readme
@flightdev/queue
Background job processing for Flight Framework. Run tasks asynchronously with retries, scheduling, and multiple adapters.
Table of Contents
- Features
- Installation
- Quick Start
- Adapters
- Defining Jobs
- Enqueueing Jobs
- Processing Jobs
- Scheduling
- Error Handling
- API Reference
- License
Features
- Multiple storage adapters (Redis, Memory, IndexedDB)
- Automatic retries with exponential backoff
- Job scheduling (delayed and recurring)
- Priority queues
- Concurrency control
- Dead letter queue for failed jobs
- Job batching
- Progress tracking
- Type-safe job definitions
Installation
npm install @flightdev/queue
# For Redis adapter
npm install ioredisQuick Start
import { createQueue } from '@flightdev/queue';
import { redis } from '@flightdev/queue/redis';
const queue = createQueue(redis({
url: process.env.REDIS_URL,
}));
// Define a job handler
queue.define('sendEmail', async (job) => {
await sendEmail(job.data.to, job.data.subject, job.data.body);
console.log(`Email sent to ${job.data.to}`);
});
// Enqueue a job
await queue.enqueue('sendEmail', {
to: '[email protected]',
subject: 'Welcome!',
body: 'Thanks for signing up.',
});
// Start processing
queue.process();Adapters
Redis (Production)
Persistent, distributed queue using Redis.
import { redis } from '@flightdev/queue/redis';
const adapter = redis({
url: process.env.REDIS_URL,
prefix: 'myapp:queue:', // Key prefix
maxRetries: 3,
});Memory (Development)
In-memory queue for development and testing.
import { memory } from '@flightdev/queue/memory';
const adapter = memory();IndexedDB (Client-Side)
Browser-based queue for offline-capable apps.
import { indexeddb } from '@flightdev/queue-indexeddb';
const adapter = indexeddb({
dbName: 'myapp-queue',
});Defining Jobs
Basic Definition
queue.define('processImage', async (job) => {
const { imageUrl, size } = job.data;
const result = await resizeImage(imageUrl, size);
return result;
});With Options
queue.define('sendNotification', async (job) => {
await sendPushNotification(job.data);
}, {
retries: 5,
backoff: 'exponential',
timeout: 30000, // 30 seconds
priority: 'high',
});Type-Safe Jobs
interface EmailJob {
to: string;
subject: string;
body: string;
attachments?: string[];
}
queue.define<EmailJob>('sendEmail', async (job) => {
// job.data is typed as EmailJob
await sendEmail(job.data.to, job.data.subject, job.data.body);
});Enqueueing Jobs
Basic Enqueue
await queue.enqueue('sendEmail', {
to: '[email protected]',
subject: 'Hello',
body: 'World',
});With Options
await queue.enqueue('sendEmail', data, {
delay: 60000, // Delay 1 minute
priority: 'high', // high, normal, low
attempts: 5, // Max retry attempts
backoff: 'exponential', // Retry strategy
jobId: 'unique-id', // Dedupe by ID
});Delayed Jobs
// Run in 5 minutes
await queue.enqueue('reminder', data, {
delay: 5 * 60 * 1000,
});
// Run at specific time
await queue.enqueue('reminder', data, {
runAt: new Date('2026-01-15T10:00:00Z'),
});Batch Enqueue
await queue.enqueueMany('sendEmail', [
{ to: '[email protected]', subject: 'Hello' },
{ to: '[email protected]', subject: 'Hello' },
{ to: '[email protected]', subject: 'Hello' },
]);Processing Jobs
Start Processing
// Process all queues
queue.process();
// Process specific queues with concurrency
queue.process({
queues: ['sendEmail', 'processImage'],
concurrency: 5,
});Graceful Shutdown
process.on('SIGTERM', async () => {
await queue.shutdown({ timeout: 30000 });
process.exit(0);
});Progress Updates
queue.define('processVideo', async (job) => {
for (let i = 0; i <= 100; i += 10) {
await processChunk(job.data, i);
await job.updateProgress(i);
}
});
// Listen to progress
queue.on('progress', (jobId, progress) => {
console.log(`Job ${jobId}: ${progress}%`);
});Scheduling
Recurring Jobs
import { schedule } from '@flightdev/queue';
// Run every hour
schedule(queue, 'cleanup', {}, {
pattern: '0 * * * *', // Cron pattern
});
// Run every 5 minutes
schedule(queue, 'healthCheck', {}, {
every: 5 * 60 * 1000,
});Cron Patterns
| Pattern | Description |
|---------|-------------|
| 0 * * * * | Every hour |
| 0 0 * * * | Every day at midnight |
| 0 0 * * 0 | Every Sunday |
| */5 * * * * | Every 5 minutes |
| 0 9 * * 1-5 | 9am on weekdays |
Error Handling
Retry Strategy
queue.define('unreliableTask', handler, {
retries: 3,
backoff: 'exponential', // 1s, 2s, 4s, 8s...
// or: 'linear' // 1s, 2s, 3s, 4s...
// or: 'fixed' // 1s, 1s, 1s, 1s...
});Dead Letter Queue
Jobs that exceed max retries go to the dead letter queue:
// Get failed jobs
const failed = await queue.getDeadJobs();
// Retry a dead job
await queue.retryDead(jobId);
// Clear dead jobs
await queue.clearDead();Event Listeners
queue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
queue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error);
});
queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});API Reference
createQueue Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| concurrency | number | 1 | Max concurrent jobs |
| prefix | string | 'queue:' | Key prefix |
| defaultRetries | number | 3 | Default retry count |
queue.define Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| retries | number | 3 | Max retries |
| backoff | string | 'exponential' | Retry strategy |
| timeout | number | 30000 | Job timeout (ms) |
| priority | string | 'normal' | Queue priority |
queue.enqueue Options
| Option | Type | Description |
|--------|------|-------------|
| delay | number | Delay in ms |
| runAt | Date | Specific run time |
| priority | string | Job priority |
| attempts | number | Max attempts |
| jobId | string | Unique job ID |
Queue Methods
| Method | Description |
|--------|-------------|
| define(name, handler, opts?) | Define job handler |
| enqueue(name, data, opts?) | Add job to queue |
| enqueueMany(name, items) | Batch enqueue |
| process(opts?) | Start processing |
| shutdown(opts?) | Graceful shutdown |
| getJob(id) | Get job by ID |
| getDeadJobs() | Get failed jobs |
| retryDead(id) | Retry failed job |
License
MIT
