@atlex/queue
v0.1.11
Published
Background jobs, workers, and scheduling for Atlex
Downloads
915
Maintainers
Readme
@atlex/queue
Robust background job processing with multiple queue drivers, batching, scheduling, and failure handling for Express + Node.js applications.
Installation
npm install @atlex/queueFor BullMQ driver support:
npm install bull bullmqFor AWS SQS support:
npm install @aws-sdk/client-sqsQuick start
Define a job class:
import { Job } from '@atlex/queue'
export class SendEmail extends Job {
public static queue = 'emails'
public static tries = 3
public static timeout = 30
public constructor(
private email: string,
private subject: string,
) {
super()
}
public async handle(): Promise<void> {
console.log(`Sending email to ${this.email}: ${this.subject}`)
// Send email logic here
}
}Dispatch the job:
import { dispatch } from '@atlex/queue'
// Queue for background processing
await dispatch(new SendEmail('[email protected]', 'Welcome!'))
// Execute synchronously
await dispatch(new SendEmail('[email protected]', 'Welcome!')).dispatchSync()
// Execute after HTTP response
await dispatch(new SendEmail('[email protected]', 'Welcome!')).dispatchAfterResponse()Process jobs:
npx atlex queue:workFeatures
Multiple Drivers
- SyncDriver: Execute jobs synchronously (development)
- NullDriver: Discard jobs (testing)
- DatabaseDriver: SQL-backed queue with persistence
- BullMQDriver: Redis-backed with BullMQ library
- SqsDriver: AWS SQS integration
Job Features
- Automatic serialization/deserialization
- Constructor-based payload preservation
- Retry logic with configurable attempts
- Exponential backoff and custom delay strategies
- Job timeout enforcement
- Job middleware for wrapping execution
- Unique job constraints
- Batch processing support
- Job chaining
- Failed job tracking and retry commands
Failure Handling
- Failed job storage and inspection
- Retry mechanism with exponential backoff
- Failed job cleanup
- DatabaseFailedJobProvider for persistence
- Max attempts and timeout error distinction
Batching
- Batch multiple jobs together
then(),catch(),finally()callbacks- Atomic batch operations
- Batch callbacks after all jobs complete
- Batch cancellation
Scheduling
- Cron-based task scheduling
- Common schedule shortcuts (daily, weekly, monthly)
- Timezone support
- Schedule mutations and mutex locking
- CacheScheduleMutex, RedisScheduleMutex, FileScheduleMutex
Middleware
- Job middleware pipeline
- Execute code before/after job handling
- Conditional job processing
- Logging and monitoring hooks
Configuration
- Per-job settings (queue, connection, tries, timeout)
- Class-level defaults
- Runtime overrides
- Multiple named connections
Core APIs
Job Class
Define background jobs by extending Job:
import { Job } from '@atlex/queue'
export class ProcessImage extends Job {
// Static configuration
public static queue = 'images' // Target queue
public static connection = 'default' // Connection name
public static tries = 3 // Retry attempts
public static timeout = 120 // Seconds
public static maxExceptions = 2 // Max exceptions before failure
public static backoff = 'exponential' // Backoff strategy
public static retryUntil = null // Unix timestamp stop retrying
// Instance properties
public attempts = 0 // Current attempt number
public batchId: string | null = null // Batch membership
public chained: JobPayload[] = [] // Chained jobs
public constructor(
private imageId: string,
private format: string = 'jpeg',
) {
super()
}
/**
* Main job logic.
*/
public async handle(): Promise<void> {
const image = await Image.find(this.imageId)
await image.processAndStore(this.format)
}
/**
* Called before handle() executes. Return false to skip job.
*/
public async before(): Promise<boolean | void> {
const image = await Image.find(this.imageId)
if (!image) return false // Skip if image doesn't exist
}
/**
* Called after successful handle().
*/
public async after(): Promise<void> {
console.log('Job completed')
}
/**
* Called when job fails (after all retries exhausted).
*/
public async failed(error: Error): Promise<void> {
console.error(`Job failed: ${error.message}`)
// Notify user, log to external service, etc.
}
/**
* Custom middleware for this job.
*/
public middleware(): JobMiddleware[] {
return [
{
async handle(job, next) {
console.log('Before job')
try {
await next()
} finally {
console.log('After job')
}
},
},
]
}
/**
* Run after job succeeds (useful for chaining).
*/
public chained(): JobPayload[] {
return [new GenerateThumbnail(this.imageId), new NotifyUser(this.imageId)]
}
}Dispatching Jobs
import { dispatch, dispatchSync, dispatchAfterResponse } from '@atlex/queue'
// Queue job for background processing
await dispatch(new SendEmail('[email protected]', 'Hello'))
// Execute immediately (synchronous)
await dispatchSync(new SendEmail('[email protected]', 'Hello'))
// Execute after HTTP response sent (non-blocking to client)
await dispatchAfterResponse(new SendEmail('[email protected]', 'Hello'))QueueManager
Central job dispatcher and queue manager:
import { QueueManager } from '@atlex/queue'
const queueManager = app.make(QueueManager)
// Dispatch job
await queueManager.dispatch(job)
// Dispatch after response
await queueManager.dispatchAfterResponse(job)
// Dispatch synchronously
await queueManager.dispatchSync(job)
// Get driver for specific connection
const connection = queueManager.connection('redis')
// Mark job as failed
await queueManager.fail(job, error)
// Extend queue with custom driver
queueManager.extend('custom', (app) => new CustomDriver(app))PendingDispatch
Fluent interface for job dispatch with options:
const pending = dispatch(job)
// Delay job execution
pending.delay(60) // 60 seconds
pending.delay(Duration.minutes(5)) // 5 minutes
// Specify queue
pending.onQueue('emails')
// Specify connection
pending.onConnection('redis')
// Set timeout
pending.timeout(120) // 120 seconds
// Retry configuration
pending.retry(5)
// Ensure job uniqueness
pending.unique()
// Dispatch methods
await pending.dispatch() // Queue it
await pending.dispatchSync() // Execute now
await pending.dispatchAfterResponse() // Queue for after responseWorker
Run the queue worker to process jobs:
import { Worker, WorkerOptions } from '@atlex/queue'
const worker = new Worker(app)
await worker.run({
queue: 'default', // Queue to process
connection: 'default', // Connection name
memory: 128, // Max memory in MB
timeout: 300, // Job timeout in seconds
maxJobs: 0, // Max jobs to process (0 = unlimited)
sleep: 3, // Sleep time between polls (seconds)
})Run via CLI:
# Process default queue
npx atlex queue:work
# Process specific queue
npx atlex queue:work --queue=emails
# Process multiple queues
npx atlex queue:work --queue=emails --queue=notifications
# Set memory limit
npx atlex queue:work --memory=256
# Set timeout
npx atlex queue:work --timeout=180
# Stop after N jobs
npx atlex queue:work --max-jobs=100Queue Drivers
SyncDriver
Execute jobs synchronously immediately (useful for development/testing):
// config/queue.ts
export default {
default: 'sync',
connections: {
sync: {
driver: 'sync',
},
},
} as QueueConfigNullDriver
Discard jobs without processing (useful for testing):
export default {
default: 'null',
connections: {
null: {
driver: 'null',
},
},
} as QueueConfigDatabaseDriver
Store jobs in SQL database with polling:
export default {
default: 'database',
connections: {
database: {
driver: 'database',
table: 'jobs',
failedTable: 'failed_jobs',
},
},
} as QueueConfigCreate migrations:
import { JobsTableMigration } from '@atlex/orm'
// In migration
new JobsTableMigration().up()
new FailedJobsTableMigration().up()BullMQDriver
High-performance Redis queue using BullMQ:
export default {
default: 'bullmq',
connections: {
bullmq: {
driver: 'bullmq',
redis: {
host: 'localhost',
port: 6379,
db: 0,
},
},
},
} as QueueConfigSqsDriver
AWS SQS queue integration:
export default {
default: 'sqs',
connections: {
sqs: {
driver: 'sqs',
region: 'us-east-1',
queue: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
},
},
} as QueueConfigFailure Handling
DatabaseFailedJobProvider
Track and retry failed jobs:
import { DatabaseFailedJobProvider } from '@atlex/queue'
const provider = new DatabaseFailedJobProvider(db, 'failed_jobs')
// Get all failed jobs
const failed = await provider.all()
// Get specific failed job
const job = await provider.get(id)
// Retry a failed job
await provider.retry(id)
// Delete failed job
await provider.forget(id)
// Clear all failed jobs
await provider.flush()Failed Job Events
Failed jobs trigger specific error types:
import { MaxAttemptsExceededError, TimeoutExceededError } from '@atlex/queue'
export class MyJob extends Job {
public async failed(error: Error): Promise<void> {
if (error instanceof MaxAttemptsExceededError) {
console.error('Job exhausted all retries')
} else if (error instanceof TimeoutExceededError) {
console.error('Job execution timed out')
}
}
}Retry CLI Commands
# List failed jobs
npx atlex queue:failed
# Show failed job details
npx atlex queue:failed --show=id
# Retry failed job
npx atlex queue:retry id
# Flush all failed jobs
npx atlex queue:flushBatching
Group multiple jobs and handle completion:
import { app } from '@atlex/core'
const batch = app.make(Scheduler).batch()
// Add multiple jobs
batch
.add(new SendEmail('[email protected]', 'Hello'))
.add(new SendEmail('[email protected]', 'Hello'))
.add(new SendEmail('[email protected]', 'Hello'))
// Handle completion of all jobs
batch.then(async () => {
console.log('All emails sent!')
})
// Handle batch failure
batch.catch(async (error) => {
console.log('Batch failed:', error)
})
// Handle finally (always)
batch.finally(async () => {
console.log('Batch complete')
})
// Dispatch the batch
await batch.dispatch()Scheduling
Schedule tasks to run on intervals via cron:
import { Scheduler } from '@atlex/queue'
const scheduler = app.make(Scheduler)
// Schedule job to run daily at 2 AM
scheduler.call(new CleanupOldFiles()).daily()
// Schedule command every 5 minutes
scheduler.exec('analytics:process').everyFiveMinutes()
// Cron expression
scheduler.call(new SendDailyReport()).cron('0 9 * * 1-5') // Weekdays at 9 AM
// Advanced scheduling
scheduler
.call(new BackupDatabase())
.cron('0 0 * * *') // Daily at midnight
.timezone('America/New_York')Schedule Shortcuts
scheduler.call(job).everyMinute()
scheduler.call(job).everyFiveMinutes()
scheduler.call(job).everyTenMinutes()
scheduler.call(job).everyFifteenMinutes()
scheduler.call(job).everyThirtyMinutes()
scheduler.call(job).hourly()
scheduler.call(job).daily()
scheduler.call(job).weekly()
scheduler.call(job).monthly()
scheduler.call(job).quarterly()
scheduler.call(job).yearly()
scheduler.call(job).weekdays()
scheduler.call(job).weekends()
scheduler.call(job).sundays()
scheduler.call(job).mondays()
// ... etc for all daysConfiguration
Create config/queue.ts:
import type { QueueConfig } from '@atlex/queue'
export default {
// Default connection
default: process.env.QUEUE_CONNECTION || 'sync',
// Named connections
connections: {
sync: {
driver: 'sync',
},
database: {
driver: 'database',
table: 'jobs',
failedTable: 'failed_jobs',
},
redis: {
driver: 'bullmq',
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
db: process.env.REDIS_DB || 0,
password: process.env.REDIS_PASSWORD,
},
},
sqs: {
driver: 'sqs',
region: process.env.AWS_REGION || 'us-east-1',
queue: process.env.SQS_QUEUE_URL,
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
},
},
// Failed jobs
failed: {
driver: 'database',
table: 'failed_jobs',
},
} as QueueConfigComplete Examples
Email Job with Retry
import { Job } from '@atlex/queue'
import { mail } from '@atlex/mail'
export class SendEmailJob extends Job {
public static queue = 'emails'
public static tries = 5
public static timeout = 30
public static backoff = 'exponential'
public constructor(
private readonly email: string,
private readonly subject: string,
private readonly template: string,
private readonly data: Record<string, any> = {},
) {
super()
}
public async handle(): Promise<void> {
await mail().to(this.email).send(this.template, this.data)
}
public async failed(error: Error): Promise<void> {
// Log email failure to monitoring service
console.error(`Failed to send email to ${this.email}:`, error)
// Notify admin
await mail().to('[email protected]').send('email-failure-alert', {
email: this.email,
error: error.message,
})
}
}
// Dispatch
router.post('/signup', async (req, res) => {
const user = await User.create(req.body)
// Queue welcome email
await dispatch(new SendEmailJob(user.email, 'Welcome!', 'welcome', { name: user.name })).delay(5) // Delay 5 seconds
res.json({ message: 'Signed up! Check your email.' })
})Unique Job
Prevent duplicate processing:
export class ProcessPayment extends Job {
public static queue = 'payments'
public static tries = 3
public constructor(private orderId: string) {
super()
}
public async handle(): Promise<void> {
const order = await Order.find(this.orderId)
await order.processPayment()
}
public shouldBeUnique(): boolean {
return true // Unique per instance
}
}
// Queue multiple times, only processes once
await dispatch(new ProcessPayment('order-123'))
await dispatch(new ProcessPayment('order-123'))
await dispatch(new ProcessPayment('order-123'))Job Chaining
Execute jobs in sequence:
export class ImageProcessingJob extends Job {
public async handle(): Promise<void> {
const image = await Image.find(this.imageId)
await image.compress()
}
public chained(): JobPayload[] {
return [
new ResizeImage(this.imageId, { width: 500 }),
new GenerateThumbnail(this.imageId),
new NotifyUserComplete(this.imageId),
]
}
}
// Dispatch starts the chain
await dispatch(new ImageProcessingJob('img-456'))
// Next: ResizeImage → GenerateThumbnail → NotifyUserCompleteBatch Processing
Process multiple jobs with callbacks:
router.post('/send-newsletter', async (req, res) => {
const users = await User.all()
const batch = app.make(Scheduler).batch()
for (const user of users) {
batch.add(new SendNewsletter(user.email))
}
batch.then(async () => {
console.log(`Newsletter sent to ${users.length} users`)
})
batch.catch(async (error) => {
console.error('Newsletter batch failed:', error)
})
await batch.dispatch()
res.json({ message: 'Newsletter queued for sending' })
})Job Middleware
Wrap job execution with logging/monitoring:
export class MonitoredJob extends Job {
public middleware(): JobMiddleware[] {
return [
{
async handle(job, next) {
const startTime = Date.now()
console.log(`Starting job: ${job.constructor.name}`)
try {
await next()
const duration = Date.now() - startTime
console.log(`Job completed in ${duration}ms`)
} catch (error) {
console.error('Job failed:', error)
throw error
}
},
},
]
}
}API Overview
Job
Job.handle()- Main job logic (required)Job.failed(error)- Called on failureJob.before()- Called before handleJob.after()- Called after successJob.middleware()- Custom middlewareJob.chained()- Chained jobsJob.shouldBeUnique()- Uniqueness check
Dispatch
dispatch(job)- Queue jobdispatchSync(job)- Execute immediatelydispatchAfterResponse(job)- Queue after responsePendingDispatch.delay(seconds)- Delay executionPendingDispatch.onQueue(name)- Specify queuePendingDispatch.onConnection(name)- Specify connectionPendingDispatch.timeout(seconds)- Set timeoutPendingDispatch.retry(attempts)- Set retries
QueueManager
QueueManager.dispatch(job)- Queue jobQueueManager.dispatchSync(job)- Execute immediatelyQueueManager.dispatchAfterResponse(job)- Defer dispatchQueueManager.connection(name)- Get driverQueueManager.fail(job, error)- Mark failedQueueManager.extend(name, factory)- Add driver
Worker
Worker.run(options)- Start processingWorker.processJob(job)- Execute single job
Scheduler
Scheduler.call(job)- Schedule jobScheduler.exec(command)- Schedule commandScheduler.cron(expression)- Cron schedule
Failed Jobs
DatabaseFailedJobProvider.all()- List failedDatabaseFailedJobProvider.retry(id)- Retry jobDatabaseFailedJobProvider.forget(id)- Delete jobDatabaseFailedJobProvider.flush()- Clear all
Events
Jobs emit events during execution:
- Job dispatched
- Job executing
- Job succeeded
- Job failed
- Job timeout
- Job retry
Subscribe via event dispatcher:
dispatcher.on('job.succeeded', (event) => {
console.log('Job succeeded:', event.job)
})
dispatcher.on('job.failed', (event) => {
console.log('Job failed:', event.job, event.error)
})Documentation
For comprehensive documentation, visit atlex.dev/guide/queue
License
MIT
Part of Atlex — A modern framework for Node.js.
