npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@fluojs/queue

v1.0.0

Published

Redis-backed background job processing with worker discovery and DLQ support for Fluo.

Downloads

736

Readme

@fluojs/queue

Redis-backed distributed job processing for fluo. It features decorator-based worker discovery, JSON-safe job serialization, and lifecycle-managed execution.

Table of Contents

Installation

npm install @fluojs/queue @fluojs/redis

When to Use

  • When you need to process long-running or resource-intensive tasks in the background.
  • When you want to decouple expensive operations (e.g., sending emails, image processing) from the request-response cycle.
  • When you need a distributed queue with retry logic, backoff, and dead-letter handling.

Quick Start

1. Define a Job and Worker

Create a job class and a worker class decorated with @QueueWorker.

import { QueueWorker } from '@fluojs/queue';

export class ProcessOrderJob {
  constructor(public readonly orderId: string) {}
}

@QueueWorker(ProcessOrderJob, { attempts: 3, backoff: { type: 'fixed', delayMs: 5000 } })
export class OrderWorker {
  async handle(job: ProcessOrderJob) {
    console.log(`Processing order: ${job.orderId}`);
    // Your logic here
  }
}

2. Register and Enqueue

Import QueueModule and inject QueueLifecycleService to enqueue jobs.

QueueModule.forRoot(...) is the supported root entrypoint for application-level queue registration.

import { Module, Inject } from '@fluojs/core';
import { QueueModule, QueueLifecycleService } from '@fluojs/queue';
import { RedisModule } from '@fluojs/redis';

@Inject(QueueLifecycleService)
export class OrderService {
  constructor(private readonly queue: QueueLifecycleService) {}

  async placeOrder(id: string) {
    await this.queue.enqueue(new ProcessOrderJob(id));
  }
}

@Module({
  imports: [
    RedisModule.forRoot({ host: 'localhost', port: 6379 }),
    QueueModule.forRoot(),
  ],
  providers: [OrderService, OrderWorker],
})
export class AppModule {}

Common Patterns

Named Redis Client

Leave clientName unset to keep using the default @fluojs/redis client from your app. If your queues should use a non-default Redis connection, set clientName to the name registered with RedisModule.forRoot({ name, ... }).

QueueModule.forRoot({ clientName: 'jobs' })

@fluojs/queue resolves that Redis client during application bootstrap, then creates queue-owned duplicate connections for BullMQ. The shared @fluojs/redis client remains owned by RedisModule; Queue closes only the duplicate BullMQ connections it creates. Those duplicate connections are configured with BullMQ's required maxRetriesPerRequest: null worker setting so startup behavior matches BullMQ's runtime constraints.

Bootstrap and Shutdown Lifecycle

Queue discovers workers and creates queue-owned BullMQ resources during application bootstrap, but BullMQ worker processors are started only after the runtime marks the full application bootstrap/readiness sequence complete. Jobs enqueued by other onApplicationBootstrap() hooks can be accepted once the Queue service is initialized, and their processors run after the bootstrap-ready handoff instead of racing ahead of later async bootstrap hooks or application readiness.

Application shutdown marks Queue as stopping, rejects new enqueue attempts, closes queue-owned workers/queues/connections, and drains pending dead-letter writes. Worker shutdown is bounded by workerShutdownTimeoutMs so an active processor that never settles cannot block application shutdown indefinitely. When the timeout elapses, Queue logs the timeout and asks BullMQ to force-close the worker before continuing resource cleanup.

Distributed Retries

Workers can be configured with a maximum number of attempts and backoff strategies to handle transient failures automatically.

@QueueWorker(MyJob, { 
  attempts: 5, 
  backoff: { type: 'exponential', delayMs: 1000 } 
})

Dead-Letter Handling

When a worker exhausts its retry attempts, Queue appends a dead-letter record to Redis (fluo:queue:dead-letter:<jobName>) for manual inspection or recovery. Queue does not move the BullMQ job itself.

QueueModule.forRoot() keeps the most recent 1_000 dead-letter entries per job by default. Set defaultDeadLetterMaxEntries: false to opt out, or provide a smaller positive number when operators need a tighter retention budget.

Jobs must be JSON-serializable plain objects. Queue serializes the job payload before enqueueing and rehydrates the job prototype on the worker side.

Treat low-level provider assembly as an internal implementation detail: low-level provider helpers are not part of the documented root-barrel contract.

Public API Overview

Core

  • QueueModule: Main entry point for queue registration.
  • QueueModule.forRoot(options): Registers queue support for an application module.
  • QueueLifecycleService: Primary service for enqueuing jobs (enqueue(job)).
  • @QueueWorker(JobClass, options?): Decorator to mark a class as a job handler.
  • QUEUE: Compatibility injection token for the queue facade.
  • createQueuePlatformStatusSnapshot(...): Status snapshot helper for lifecycle/readiness diagnostics.

Types

  • QueueModuleOptions: Global queue settings (clientName, default attempts, defaultBackoff, concurrency, rate limiting, dead-letter retention).
  • QueueWorkerOptions: Per-job settings (attempts, backoff, concurrency, jobName, rate limiting).
  • QueueBackoffOptions: Retry backoff settings (type, delayMs).

QueueModuleOptions also includes lifecycle and dead-letter retention controls such as workerShutdownTimeoutMs and defaultDeadLetterMaxEntries.

QueueModuleOptions lifecycle/status controls:

  • workerShutdownTimeoutMs: maximum time to wait for active worker processors during shutdown before force-closing the BullMQ worker. Defaults to 30_000.
  • defaultDeadLetterMaxEntries: maximum retained dead-letter records per job, or false to disable trimming. Defaults to 1_000.

createQueuePlatformStatusSnapshot(...) reports readiness as ready only after Queue reaches started; starting reports degraded readiness, and stopping/stopped report not-ready. Snapshot details include the Redis dependency id, lifecycle state, ready/discovered worker counts, pending dead-letter writes, the dead-letter drain timeout, and workerShutdownTimeoutMs.

Only singleton @QueueWorker() providers/controllers are registered. Request/transient workers are skipped during discovery.

Related Packages

  • @fluojs/redis: Required as the backing store for job persistence.
  • @fluojs/cron: For scheduled/recurring background tasks.

Example Sources

  • packages/queue/src/module.test.ts: Worker discovery and enqueueing tests.
  • packages/queue/src/public-surface.test.ts: Public API contract verification.
  • packages/queue/src/status.test.ts: Queue lifecycle status snapshot tests.