@recordset/adonis7-bullmq
v1.0.1
Published
BullMQ provider for AdonisJS 7
Maintainers
Readme
@recordset/adonis7-bullmq
BullMQ provider for AdonisJS 7. Manage queues, workers, and background jobs with Redis-backed queues.
Features
- Queue management with lazy initialization
- Worker lifecycle management with logging
- Job class pattern with
BaseJob - Auto-discovery of job files from a configurable directory
- AdonisJS 7 IoC container integration
make:jobscaffolding command- Graceful shutdown of all queues and workers
- Redis connection via environment variables
Installation
npm install @recordset/adonis7-bullmq bullmq
node ace configure @recordset/adonis7-bullmqThe configure command will:
- Register the provider and commands in
adonisrc.ts - Add Redis environment variables to
.env - Add env validations to
start/env.ts - Create
config/bullmq.ts
Usage
Creating a Job
node ace make:job send_emailThis generates app/jobs/send_email_job.ts:
import { BaseJob } from "@recordset/adonis7-bullmq/job";
import type { JobContext } from "@recordset/adonis7-bullmq/types";
export default class SendEmailJob extends BaseJob {
static jobName = "send_email";
async handle({ data, job }: JobContext): Promise<any> {
console.log(`Sending email to ${data.email}`);
return { sent: true };
}
async failed(job: any, error: Error): Promise<void> {
console.error(`SendEmailJob failed: ${error.message}`);
}
}Dispatching Jobs
import bullmq from "@recordset/adonis7-bullmq/services/main";
// Add a job to a queue
await bullmq.queue("send_email").add("send_email", {
email: "[email protected]",
subject: "Welcome!",
body: "Hello world",
});
// With job options
await bullmq.queue("send_email").add(
"send_email",
{ email: "[email protected]" },
{
delay: 5000, // Delay 5 seconds
priority: 1, // Higher priority
attempts: 5, // Retry 5 times
removeOnComplete: true,
},
);Listening to Queues
# Listen to all job queues
node ace bull:listen
# Listen to specific queues
node ace bull:listen --queue=send_email --queue=process_imageUsing BullMQManager Directly
import bullmq from "@recordset/adonis7-bullmq/services/main";
// Get or create a queue
const queue = bullmq.queue("notifications");
// Create a worker manually
const worker = bullmq.worker(
"notifications",
async (job) => {
console.log(job.data);
},
{ concurrency: 5 },
);
// Listen to queue events
const events = bullmq.events("notifications");
events.on("completed", ({ jobId }) => {
console.log(`Job ${jobId} completed`);
});
// Get all registered queues/workers
const queues = bullmq.getQueues();
const workers = bullmq.getWorkers();In Controllers
import { inject } from "@adonisjs/core";
import { HttpContext } from "@adonisjs/core/http";
import BullMQManager from "@recordset/adonis7-bullmq/bullmq_manager";
@inject()
export default class OrdersController {
constructor(private bullmq: BullMQManager) {}
async store({ request, response }: HttpContext) {
const order = await Order.create(request.body());
await this.bullmq.queue("process_order").add("process_order", {
orderId: order.id,
});
return response.created(order);
}
}Configuration
// config/bullmq.ts
import env from "#start/env";
import { defineConfig } from "@recordset/adonis7-bullmq/types";
export default defineConfig({
enabled: true,
connection: {
host: env.get("REDIS_HOST", "localhost"),
port: env.get("REDIS_PORT", 6379),
password: env.get("REDIS_PASSWORD", ""),
},
prefix: "bull",
jobsDirectory: "app/jobs",
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 100,
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
},
});| Option | Type | Default | Description |
| --------------------- | --------- | ------------- | ----------------------------------- |
| enabled | boolean | true | Enable/disable BullMQ |
| connection.host | string | 'localhost' | Redis host |
| connection.port | number | 6379 | Redis port |
| connection.password | string | '' | Redis password |
| connection.db | number | 0 | Redis database number |
| prefix | string | 'bull' | Key prefix for Redis |
| jobsDirectory | string | 'app/jobs' | Directory for job files |
| defaultJobOptions | object | — | Default options applied to all jobs |
Ace Commands
| Command | Description |
| -------------------------- | ------------------------------------- |
| make:job <name> | Generate a new job class |
| bull:listen | Start workers for all discovered jobs |
| bull:listen --queue=name | Start worker for specific queue(s) |
API Reference
BullMQManager
| Method | Returns | Description |
| ----------------------------------- | --------------- | ------------------------------------- |
| queue(name) | Queue | Get or create a named queue |
| worker(name, processor, options?) | Worker | Get or create a worker |
| events(name) | QueueEvents | Get or create queue event listener |
| getQueues() | Queue[] | List all registered queues |
| getWorkers() | Worker[] | List all registered workers |
| shutdown() | Promise<void> | Close all queues, workers, and events |
BaseJob
| Property/Method | Description |
| -------------------- | ------------------------------------ |
| static jobName | Queue name for this job |
| handle(context) | Process the job (required) |
| failed(job, error) | Called when job fails (optional) |
| completed(job) | Called when job completes (optional) |
License
MIT
