@fluojs/queue
v1.0.0
Published
Redis-backed background job processing with worker discovery and DLQ support for Fluo.
Downloads
736
Maintainers
Readme
@fluojs/queue
Redis-backed distributed job processing for fluo. It features decorator-based worker discovery, JSON-safe job serialization, and lifecycle-managed execution.
Table of Contents
Installation
npm install @fluojs/queue @fluojs/redisWhen to Use
- When you need to process long-running or resource-intensive tasks in the background.
- When you want to decouple expensive operations (e.g., sending emails, image processing) from the request-response cycle.
- When you need a distributed queue with retry logic, backoff, and dead-letter handling.
Quick Start
1. Define a Job and Worker
Create a job class and a worker class decorated with @QueueWorker.
import { QueueWorker } from '@fluojs/queue';
export class ProcessOrderJob {
constructor(public readonly orderId: string) {}
}
@QueueWorker(ProcessOrderJob, { attempts: 3, backoff: { type: 'fixed', delayMs: 5000 } })
export class OrderWorker {
async handle(job: ProcessOrderJob) {
console.log(`Processing order: ${job.orderId}`);
// Your logic here
}
}2. Register and Enqueue
Import QueueModule and inject QueueLifecycleService to enqueue jobs.
QueueModule.forRoot(...) is the supported root entrypoint for application-level queue registration.
import { Module, Inject } from '@fluojs/core';
import { QueueModule, QueueLifecycleService } from '@fluojs/queue';
import { RedisModule } from '@fluojs/redis';
@Inject(QueueLifecycleService)
export class OrderService {
constructor(private readonly queue: QueueLifecycleService) {}
async placeOrder(id: string) {
await this.queue.enqueue(new ProcessOrderJob(id));
}
}
@Module({
imports: [
RedisModule.forRoot({ host: 'localhost', port: 6379 }),
QueueModule.forRoot(),
],
providers: [OrderService, OrderWorker],
})
export class AppModule {}Common Patterns
Named Redis Client
Leave clientName unset to keep using the default @fluojs/redis client from your app. If your queues should use a non-default Redis connection, set clientName to the name registered with RedisModule.forRoot({ name, ... }).
QueueModule.forRoot({ clientName: 'jobs' })@fluojs/queue resolves that Redis client during application bootstrap, then creates queue-owned duplicate connections for BullMQ. The shared @fluojs/redis client remains owned by RedisModule; Queue closes only the duplicate BullMQ connections it creates. Those duplicate connections are configured with BullMQ's required maxRetriesPerRequest: null worker setting so startup behavior matches BullMQ's runtime constraints.
Bootstrap and Shutdown Lifecycle
Queue discovers workers and creates queue-owned BullMQ resources during application bootstrap, but BullMQ worker processors are started only after the runtime marks the full application bootstrap/readiness sequence complete. Jobs enqueued by other onApplicationBootstrap() hooks can be accepted once the Queue service is initialized, and their processors run after the bootstrap-ready handoff instead of racing ahead of later async bootstrap hooks or application readiness.
Application shutdown marks Queue as stopping, rejects new enqueue attempts, closes queue-owned workers/queues/connections, and drains pending dead-letter writes. Worker shutdown is bounded by workerShutdownTimeoutMs so an active processor that never settles cannot block application shutdown indefinitely. When the timeout elapses, Queue logs the timeout and asks BullMQ to force-close the worker before continuing resource cleanup.
Distributed Retries
Workers can be configured with a maximum number of attempts and backoff strategies to handle transient failures automatically.
@QueueWorker(MyJob, {
attempts: 5,
backoff: { type: 'exponential', delayMs: 1000 }
})Dead-Letter Handling
When a worker exhausts its retry attempts, Queue appends a dead-letter record to Redis (fluo:queue:dead-letter:<jobName>) for manual inspection or recovery. Queue does not move the BullMQ job itself.
QueueModule.forRoot() keeps the most recent 1_000 dead-letter entries per job by default. Set defaultDeadLetterMaxEntries: false to opt out, or provide a smaller positive number when operators need a tighter retention budget.
Jobs must be JSON-serializable plain objects. Queue serializes the job payload before enqueueing and rehydrates the job prototype on the worker side.
Treat low-level provider assembly as an internal implementation detail: low-level provider helpers are not part of the documented root-barrel contract.
Public API Overview
Core
QueueModule: Main entry point for queue registration.QueueModule.forRoot(options): Registers queue support for an application module.QueueLifecycleService: Primary service for enqueuing jobs (enqueue(job)).@QueueWorker(JobClass, options?): Decorator to mark a class as a job handler.QUEUE: Compatibility injection token for the queue facade.createQueuePlatformStatusSnapshot(...): Status snapshot helper for lifecycle/readiness diagnostics.
Types
QueueModuleOptions: Global queue settings (clientName, default attempts,defaultBackoff, concurrency, rate limiting, dead-letter retention).QueueWorkerOptions: Per-job settings (attempts, backoff, concurrency, jobName, rate limiting).QueueBackoffOptions: Retry backoff settings (type,delayMs).
QueueModuleOptions also includes lifecycle and dead-letter retention controls such as workerShutdownTimeoutMs and defaultDeadLetterMaxEntries.
QueueModuleOptions lifecycle/status controls:
workerShutdownTimeoutMs: maximum time to wait for active worker processors during shutdown before force-closing the BullMQ worker. Defaults to30_000.defaultDeadLetterMaxEntries: maximum retained dead-letter records per job, orfalseto disable trimming. Defaults to1_000.
createQueuePlatformStatusSnapshot(...) reports readiness as ready only after Queue reaches started; starting reports degraded readiness, and stopping/stopped report not-ready. Snapshot details include the Redis dependency id, lifecycle state, ready/discovered worker counts, pending dead-letter writes, the dead-letter drain timeout, and workerShutdownTimeoutMs.
Only singleton @QueueWorker() providers/controllers are registered. Request/transient workers are skipped during discovery.
Related Packages
@fluojs/redis: Required as the backing store for job persistence.@fluojs/cron: For scheduled/recurring background tasks.
Example Sources
packages/queue/src/module.test.ts: Worker discovery and enqueueing tests.packages/queue/src/public-surface.test.ts: Public API contract verification.packages/queue/src/status.test.ts: Queue lifecycle status snapshot tests.
