@oxlayer/capabilities-adapters-bullmq-scheduler
v0.1.4
Published
BullMQ scheduler adapter for @oxlayer/capabilities
Readme
@oxlayer/pro-adapters-bullmq-scheduler
BullMQ scheduler adapter for @oxlayer/capabilities. Provides scheduled job execution using BullMQ's job schedulers with support for interval-based, cron-based, and custom scheduling strategies.
Features
- Schedule recurring jobs using "every" interval strategy
- Schedule jobs using cron expressions
- Support for custom repeat strategies (e.g., RRULE)
- Redis connection reuse from existing Redis client
- Job scheduler management (create, update, remove, list)
- Next run time tracking
- Timezone support
Installation
bun add @oxlayer/pro-adapters-bullmq-schedulerDependencies
Requires Redis and the Redis adapter:
bun add @oxlayer/capabilities-adapters-redis bullmqUsage
Basic Usage with Redis Client
import { createBullMQScheduler } from '@oxlayer/pro-adapters-bullmq-scheduler';
import { createDefaultRedisClient } from '@oxlayer/capabilities-adapters-redis';
const redisClient = createDefaultRedisClient();
const scheduler = createBullMQScheduler({
redisClient,
queueName: 'scheduled-jobs',
});
// Schedule a job every minute
await scheduler.schedule('cleanup', {
every: 60_000,
}, {
name: 'cleanup-task',
data: { type: 'logs' },
});
// Schedule with cron (9 AM weekdays)
await scheduler.schedule('reports', {
pattern: '0 0 9 * * 1-5',
}, {
name: 'daily-report',
data: { format: 'pdf' },
});
// Get scheduler info
const info = await scheduler.get('cleanup');
console.log(info?.next); // Next run timestamp
// List all schedulers
const schedulers = await scheduler.list();
// Remove a scheduler
await scheduler.remove('cleanup');
await scheduler.close();Using Connection Config
const scheduler = createBullMQScheduler({
connection: {
host: 'localhost',
port: 6379,
password: 'optional-password',
db: 0,
},
queueName: 'my-jobs',
});Advanced Scheduling Options
// Schedule with start/end dates
await scheduler.schedule('seasonal-job', {
every: 24 * 60 * 60 * 1000, // Daily
startDate: new Date('2024-01-01'),
endDate: new Date('2024-12-31'),
tz: 'America/New_York',
}, {
name: 'yearly-task',
data: { year: 2024 },
});
// Schedule with limit
await scheduler.schedule('limited-job', {
every: 60_000,
limit: 100, // Run only 100 times
}, {
name: 'limited-task',
});Custom Repeat Strategy (RRULE)
import { createBullMQScheduler, createRRuleStrategy } from '@oxlayer/pro-adapters-bullmq-scheduler';
import { rrulestr } from 'rrule';
const scheduler = createBullMQScheduler({
redisClient,
repeatStrategy: (millis, opts) => {
const currentDate = opts.startDate && new Date(opts.startDate) > new Date(millis)
? new Date(opts.startDate)
: new Date(millis);
const rrule = rrulestr(opts.pattern);
const next = rrule.after(currentDate, false);
return next?.getTime();
},
});
// Schedule using RRULE pattern
await scheduler.schedule('complex-schedule', {
pattern: 'DTSTART=20240101T090000Z RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR',
}, {
name: 'monday-wednesday-friday-task',
});API Reference
BullMQScheduler
Main scheduler class.
Constructor
constructor(options: BullMQSchedulerOptions)Options:
redisClient- Redis client for connection reuseconnection- Redis connection config (if not using redisClient)queueName- Queue name for scheduled jobs (default:'scheduled-jobs')repeatStrategy- Custom repeat strategy functionsettings- Additional BullMQ queue settings
Methods
schedule(key: string, repeat: RepeatOptions, job: ScheduledJob): Promise<void>
Schedule a job to repeat based on the provided options.
Parameters:
key- Unique identifier for the schedulerrepeat- Repeat options (every, pattern, or custom)job- Job definition with name, data, and opts
update(key: string, repeat: RepeatOptions, job: ScheduledJob): Promise<void>
Update an existing scheduler or create if it doesn't exist.
remove(key: string): Promise<boolean>
Remove a scheduler by its key. Returns true if removed, false if not found.
get(key: string): Promise<JobScheduler | undefined>
Get a scheduler by its key.
Returns:
key- Scheduler identifierrepeat- Repeat optionsjob- Job definitionnext- Next run timestampcreated- Creation timestamp
list(options?: SchedulerListOptions): Promise<JobScheduler[]>
List all schedulers with optional pagination.
Options:
start- Start index (default: 0)end- End index (default: 99)ascending- Sort order (default: true)
removeAll(): Promise<number>
Remove all schedulers. Returns number of schedulers removed.
close(): Promise<void>
Close the underlying queue connection.
getQueue(): Queue
Get the underlying BullMQ queue for advanced operations.
Types
BullMQSchedulerOptions
interface BullMQSchedulerOptions {
redisClient?: RedisClient;
connection?: {
host: string;
port: number;
password?: string;
db?: number;
};
queueName?: string;
repeatStrategy?: (
millis: number,
opts: RepeatOptions,
jobName: string,
) => number | undefined;
settings?: {
repeatStrategy?: (
millis: number,
opts: any,
jobName: string,
) => number | undefined;
};
}RepeatOptions
type RepeatOptions =
| { every: number; startDate?: Date; endDate?: Date; limit?: number; tz?: string }
| { pattern: string; startDate?: Date; endDate?: Date; limit?: number; tz?: string };ScheduledJob
interface ScheduledJob {
name: string;
data: any;
opts?: any;
}JobScheduler
interface JobScheduler {
key: string;
repeat: RepeatOptions;
job: ScheduledJob;
next?: number;
created?: number;
}Repeat Strategies
"Every" Strategy
Schedule jobs at fixed intervals:
{
every: 60_000, // Every minute
}"Pattern" Strategy (Cron)
Schedule jobs using cron expressions:
{
pattern: '0 0 9 * * 1-5', // 9 AM weekdays
}Custom Strategy
Implement custom scheduling logic:
{
repeatStrategy: (millis, opts) => {
// Custom logic to calculate next run time
return nextRunTimestamp;
},
}Cron Expression Format
The adapter supports standard cron expressions with 6 fields:
* * * * * *
│ │ │ │ │ │
│ │ │ │ │ └─ Day of week (0-7, Sunday = 0 or 7)
│ │ │ │ └─── Month (1-12)
│ │ │ └───── Day of month (1-31)
│ │ └─────── Hour (0-23)
│ └───────── Minute (0-59)
└─────────── Second (0-59)Timezone Support
Specify timezones for scheduled jobs:
await scheduler.schedule('tz-job', {
every: 60 * 60 * 1000, // Hourly
tz: 'America/New_York',
}, {
name: 'hourly-task',
});Best Practices
- Use unique keys: Each scheduler must have a unique key
- Set reasonable limits: Use
limitfor jobs that shouldn't run forever - Handle job failures: Implement error handling in your workers
- Monitor schedulers: Check
nextrun times to verify scheduling - Clean up old schedulers: Remove schedulers that are no longer needed
License
MIT
