@monque/core
v1.2.0
Published
MongoDB-backed job scheduler with atomic locking, exponential backoff, and cron scheduling
Maintainers
Readme
A robust, type-safe MongoDB job queue for TypeScript with atomic locking, exponential backoff, and cron scheduling.
Features
- 🔒 Atomic Locking: Mandatory
findOneAndUpdatefor safe job acquisition in distributed environments. - 📈 Exponential Backoff: Built-in retry logic with configurable backoff strategies.
- 📅 Cron Scheduling: Native support for recurring jobs using standard cron syntax.
- 🔍 Type Safety: Fully typed job payloads and worker definitions.
- ⚡ Event-Driven: Comprehensive event system for monitoring and logging.
- 🛠️ Native Driver: Uses the native MongoDB driver for maximum performance and compatibility.
- 🛑 Graceful Shutdown: Ensures all in-progress jobs finish or are safely released before stopping.
Installation
Using Bun:
bun add @monque/core mongodbOr using npm/yarn/pnpm:
npm install @monque/core mongodb
yarn add @monque/core mongodb
pnpm add @monque/core mongodbUsage
import { Monque } from '@monque/core';
import { MongoClient } from 'mongodb';
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const monque = new Monque(client.db('myapp'), {
collectionName: 'jobs',
pollInterval: 1000,
maxRetries: 10,
defaultConcurrency: 5,
});
await monque.initialize();
// Register workers
monque.register('send-email', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});
// Start processing
monque.start();
// Enqueue jobs
await monque.enqueue('send-email', { to: '[email protected]', subject: 'Hello' });
// Schedule recurring jobs
await monque.schedule('0 9 * * *', 'daily-report', { type: 'summary' });
// Management
await monque.cancelJob('job-id');
const stats = await monque.getQueueStats();
// Graceful shutdown
await monque.stop();API
new Monque(db, options?)
Creates a new Monque instance.
Options:
collectionName- MongoDB collection name (default:'monque_jobs')pollInterval- Polling interval in ms (default:1000)maxRetries- Max retry attempts (default:10)baseRetryInterval- Base backoff interval in ms (default:1000)shutdownTimeout- Graceful shutdown timeout in ms (default:30000)defaultConcurrency- Jobs per worker (default:5)lockTimeout- Stale job threshold in ms (default:1800000)recoverStaleJobs- Recover stale jobs on startup (default:true)
Methods
initialize()- Set up collection and indexesenqueue(name, data, options?)- Enqueue a jobnow(name, data)- Enqueue for immediate processingschedule(cron, name, data)- Schedule recurring jobregister(name, handler, options?)- Register a workerstart()- Start processing jobsstop()- Graceful shutdownisHealthy()- Check scheduler health
Management:
getJob(id)- Get job detailsgetJobs(filter)- List jobsgetJobsWithCursor(options)- Paginated listgetQueueStats(filter?)- Queue statisticscancelJob(id)- Cancel a jobretryJob(id)- Retry a jobrescheduleJob(id, date)- Reschedule a jobdeleteJob(id)- Delete a jobcancelJobs(filter)- Bulk cancelretryJobs(filter)- Bulk retrydeleteJobs(filter)- Bulk delete
Events
monque.on('job:start', (job) => { /* job started */ });
monque.on('job:complete', ({ job, duration }) => { /* job completed */ });
monque.on('job:fail', ({ job, error, willRetry }) => { /* job failed */ });
monque.on('job:error', ({ error, job? }) => { /* unexpected error */ });
monque.on('job:cancelled', ({ job }) => { /* job cancelled */ });
monque.on('job:retried', ({ job, previousStatus }) => { /* job retried */ });
monque.on('job:deleted', ({ jobId }) => { /* job deleted */ });
monque.on('stale:recovered', ({ count }) => { /* stale jobs recovered */ });Development
Running Tests
# Run tests once (fresh container each time)
bun run test
# Run tests in watch mode with container reuse (faster iteration)
bun run test:dev
# Or enable reuse globally in your shell profile
export TESTCONTAINERS_REUSE_ENABLE=true
bun run test:watchWhen TESTCONTAINERS_REUSE_ENABLE=true, the MongoDB testcontainer persists between test runs, significantly speeding up local development. Ryuk (the testcontainers cleanup daemon) remains enabled as a safety net for orphaned containers.
To manually clean up reusable containers:
docker ps -q --filter label=org.testcontainers=true | while read -r id; do docker stop "$id"; doneLicense
ISC
