@tamasha/bullmq-connection
v1.0.1
Published
BullMQ connection package for job scheduling and management
Downloads
263
Readme
@tamasha/bullmq-connection
A TypeScript package for managing BullMQ job queues with Redis, providing easy-to-use methods for job scheduling, cancellation, and management.
Features
Producer Features
- 🚀 Easy job scheduling with delay support
- ❌ Job cancellation by queue name and job ID
- 🔄 Recurring job support with cron patterns
- 📊 Job status monitoring
- 🏗️ TypeScript support with full type safety
- 🔧 Configurable Redis connection
- 🧹 Automatic job cleanup options
Consumer Features
- 🔄 Job processing with worker management
- 🎯 Multi-job type handling in single worker
- ⚡ Configurable concurrency and retry logic
- 🛡️ Built-in error handling and event logging
- 🔧 Flexible worker configuration
- 🛑 Graceful worker shutdown and cleanup
Installation
npm install @tamasha/bullmq-connectionPrerequisites
- Redis server running
- Node.js 16+
- TypeScript (for development)
Quick Start
Producer (Adding Jobs)
import { ProducerScheduling, generateUniqueId } from '@tamasha/bullmq-connection';
// Initialize (optional - uses defaults if not called)
ProducerScheduling.initialize({
redis: {
host: 'localhost',
port: 6379,
// password: 'your-redis-password', // if needed
}
});
// Add a job with delay
const job = await ProducerScheduling.addJob(
"notification",
"send-multicast-push-notification",
{ userId: '123', message: 'Hello!' },
{
delay: 5000, // 5 seconds
jobId: `MPN-${generateUniqueId()}`,
removeOnComplete: true,
removeOnFail: true,
}
);
// Cancel a job
const cancelled = await ProducerScheduling.cancelJob("notification", job.id!);
console.log('Job cancelled:', cancelled);Consumer (Processing Jobs)
import { ConsumerScheduling, Job } from '@tamasha/bullmq-connection';
// Initialize consumer
ConsumerScheduling.initialize({
redis: {
host: 'localhost',
port: 6379,
}
});
// Start a worker to process jobs
const worker = ConsumerScheduling.startWorker(
'notification',
async (job: Job) => {
console.log('Processing job:', job.data);
// Your job processing logic here
const { userId, message } = job.data;
await sendNotification(userId, message);
return { success: true };
},
{
concurrency: 2, // Process 2 jobs at once
attempts: 3, // Retry failed jobs 3 times
}
);
// Gracefully stop the worker when done
await ConsumerScheduling.stopWorker(worker);API Reference
ProducerScheduling
The main class for managing job queues.
Methods
initialize(config?: ProducerSchedulingConfig)
Initialize the ProducerScheduling with custom configuration.
ProducerScheduling.initialize({
redis: {
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0,
},
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 5,
}
});addJob<T>(queueName: string, jobName: string, data: T, options?: JobOptions): Promise<Job<T>>
Add a job to the specified queue.
Parameters:
queueName- Name of the queuejobName- Name of the jobdata- Job data/metadataoptions- Job options (delay, jobId, etc.)
Example:
const job = await ProducerScheduling.addJob(
"notification",
"send-push-notification",
{
userId: '12345',
message: 'Welcome!',
channels: ['push', 'email']
},
{
delay: 10000, // 10 seconds delay
jobId: `PUSH-${generateUniqueId()}`,
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
}
}
);cancelJob(queueName: string, jobId: string): Promise<boolean>
Cancel a job by queue name and job ID.
Parameters:
queueName- Name of the queuejobId- ID of the job to cancel
Returns: Promise<boolean> - True if job was cancelled, false if not found
Example:
const cancelled = await ProducerScheduling.cancelJob("notification", "PUSH-123");
if (cancelled) {
console.log('Job successfully cancelled');
} else {
console.log('Job not found or already processed');
}getJob<T>(queueName: string, jobId: string): Promise<Job<T> | null>
Get job status and details.
Example:
const job = await ProducerScheduling.getJob("notification", "PUSH-123");
if (job) {
console.log('Job status:', {
id: job.id,
name: job.name,
processedOn: job.processedOn,
finishedOn: job.finishedOn,
failedReason: job.failedReason,
});
}getQueueInstance(queueName: string): Queue
Get the BullMQ Queue instance for advanced operations.
close(): Promise<void>
Close all queue connections and cleanup resources.
generateUniqueId(): string
Generate a unique ID for jobs.
ConsumerScheduling
The main class for managing job workers and processing.
Methods
initialize(config?: ConsumerSchedulingConfig)
Initialize the ConsumerScheduling with custom configuration.
ConsumerScheduling.initialize({
redis: {
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0,
},
defaultWorkerOptions: {
concurrency: 2,
attempts: 3,
}
});startWorker<T>(queueName: string, processor: JobProcessor<T>, options?: Partial<WorkerOptions>): Worker
Start a worker for a queue with a single job processor.
Parameters:
queueName- Name of the queue to processprocessor- Function to process jobsoptions- Worker options (concurrency, attempts, etc.)
Example:
const worker = ConsumerScheduling.startWorker(
'email',
async (job: Job) => {
const { recipient, subject, body } = job.data;
await sendEmail(recipient, subject, body);
return { emailSent: true };
},
{
concurrency: 3,
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
}
}
);startMultiJobWorker<T>(queueName: string, handlers: JobHandler<T>[], options?: Partial<WorkerOptions>): Worker
Start a worker that can handle multiple job types in the same queue.
Parameters:
queueName- Name of the queue to processhandlers- Array of job handlers for different job typesoptions- Worker options
Example:
const worker = ConsumerScheduling.startMultiJobWorker(
'notifications',
[
{
jobName: 'send-email',
processor: async (job: Job) => {
await sendEmail(job.data);
return { emailSent: true };
}
},
{
jobName: 'send-sms',
processor: async (job: Job) => {
await sendSMS(job.data);
return { smsSent: true };
}
}
],
{ concurrency: 2 }
);stopWorker(worker: Worker): Promise<void>
Stop a specific worker gracefully.
stopAllWorkers(): Promise<void>
Stop all active workers.
getActiveWorkers(): Worker[]
Get all currently active workers.
close(): Promise<void>
Close all worker connections and cleanup resources.
Job Options
The JobOptions interface extends BullMQ's JobsOptions with additional properties:
interface JobOptions {
delay?: number; // Delay in milliseconds
jobId?: string; // Custom job ID
removeOnComplete?: boolean | number; // Auto-remove on completion
removeOnFail?: boolean | number; // Auto-remove on failure
attempts?: number; // Number of retry attempts
backoff?: { // Retry backoff strategy
type: 'fixed' | 'exponential';
delay: number;
};
repeat?: { // Recurring job pattern
pattern: string; // Cron pattern
limit?: number; // Max repetitions
};
priority?: number; // Job priority
// ... other BullMQ options
}Configuration
ProducerSchedulingConfig
interface ProducerSchedulingConfig {
redis?: {
host?: string; // Redis host (default: 'localhost')
port?: number; // Redis port (default: 6379)
password?: string; // Redis password
db?: number; // Redis database (default: 0)
lazyConnect?: boolean; // Lazy connection (default: true)
};
defaultJobOptions?: Partial<JobOptions>; // Default options for all jobs
}ConsumerSchedulingConfig
interface ConsumerSchedulingConfig {
redis?: {
host?: string; // Redis host (default: 'localhost')
port?: number; // Redis port (default: 6379)
password?: string; // Redis password
db?: number; // Redis database (default: 0)
lazyConnect?: boolean; // Lazy connection (default: true)
};
defaultWorkerOptions?: Partial<WorkerOptions>; // Default options for all workers
}Job Processing Types
type JobProcessor<T = any> = (job: Job<T>) => Promise<any>;
interface JobHandler<T = any> {
jobName: string; // Name of the job type to handle
processor: JobProcessor<T>; // Function to process this job type
}Examples
Basic Usage
import { ProducerScheduling, generateUniqueId } from '@tamasha/bullmq-connection';
// Add a delayed notification job
const job = await ProducerScheduling.addJob(
"notification",
"send-multicast-push-notification",
{
userId: '12345',
message: 'Your order is ready!',
channels: ['push', 'sms']
},
{
delay: 30000, // 30 seconds
jobId: `MPN-${generateUniqueId()}`,
removeOnComplete: true,
removeOnFail: true,
}
);
console.log(`Job ${job.id} scheduled`);Consumer Usage
import { ConsumerScheduling, Job } from '@tamasha/bullmq-connection';
// Start a simple worker
const worker = ConsumerScheduling.startWorker(
'notification',
async (job: Job) => {
const { userId, message, channels } = job.data;
console.log(`Processing notification for user ${userId}`);
// Process each channel
for (const channel of channels) {
await sendNotification(channel, userId, message);
}
return { processed: true, channels };
},
{
concurrency: 3,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
}
}
);
// Stop worker when done
await ConsumerScheduling.stopWorker(worker);Recurring Jobs
// Schedule a daily cleanup job
const cleanupJob = await ProducerScheduling.addJob(
"maintenance",
"cleanup-old-data",
{
tables: ['logs', 'temp_files'],
olderThanDays: 30
},
{
jobId: `CLEANUP-${generateUniqueId()}`,
repeat: {
pattern: '0 2 * * *', // Every day at 2 AM
},
removeOnComplete: 5,
removeOnFail: 3,
}
);Multi-Job Worker
// Handle multiple job types in one worker
const multiWorker = ConsumerScheduling.startMultiJobWorker(
'email',
[
{
jobName: 'send-welcome-email',
processor: async (job: Job) => {
const { email, name } = job.data;
await sendWelcomeEmail(email, name);
return { welcomeEmailSent: true };
}
},
{
jobName: 'send-password-reset',
processor: async (job: Job) => {
const { email, resetToken } = job.data;
await sendPasswordResetEmail(email, resetToken);
return { resetEmailSent: true };
}
},
{
jobName: 'send-newsletter',
processor: async (job: Job) => {
const { recipients, subject, content } = job.data;
await sendNewsletter(recipients, subject, content);
return { newsletterSent: true, count: recipients.length };
}
}
],
{ concurrency: 2 }
);Batch Processing
// Producer: Create multiple jobs with staggered delays
const jobs = [];
for (let i = 0; i < 10; i++) {
const job = await ProducerScheduling.addJob(
"batch-processing",
"process-data-chunk",
{
chunkId: i,
data: `chunk-${i}-data`,
},
{
jobId: `CHUNK-${i}-${generateUniqueId()}`,
delay: i * 1000, // Stagger by 1 second each
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
}
}
);
jobs.push(job);
}
// Consumer: Process batch jobs
const batchWorker = ConsumerScheduling.startWorker(
'batch-processing',
async (job: Job) => {
const { chunkId, data } = job.data;
console.log(`Processing chunk ${chunkId}`);
// Simulate processing
await processDataChunk(data);
return {
processed: true,
chunkId,
processedAt: new Date().toISOString()
};
},
{
concurrency: 3,
attempts: 5,
}
);Error Handling
try {
const job = await ProducerScheduling.addJob(
"email",
"send-newsletter",
{ recipients: ['[email protected]'] },
{ jobId: `EMAIL-${generateUniqueId()}` }
);
console.log('Job created:', job.id);
} catch (error) {
console.error('Failed to create job:', error);
}
// Cancel with error handling
const cancelled = await ProducerScheduling.cancelJob("email", jobId);
if (!cancelled) {
console.log('Job may have already been processed or does not exist');
}Environment Variables
You can use environment variables for configuration:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_DB=0ProducerScheduling.initialize({
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
}
});Best Practices
- Always initialize before using if you need custom configuration
- Use unique job IDs to avoid conflicts
- Set appropriate cleanup options (
removeOnComplete,removeOnFail) - Handle errors gracefully when adding or cancelling jobs
- Close connections when your application shuts down
- Use delays wisely to avoid overwhelming your system
- Monitor job status for critical operations
Dependencies
bullmq: ^4.15.0ioredis: ^5.3.2
License
MIT
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
