pg-cron-lock
v1.0.0
Published
Distributed cron jobs using PostgreSQL Advisory Locks - prevent multiple instances from running the same scheduled task
Maintainers
Readme
🔐 pg-cron-lock
Distributed cron jobs for Node.js using PostgreSQL Advisory Locks
Prevent duplicate task execution across multiple application instances using PostgreSQL's native locking mechanism.
🎯 The Problem
In distributed environments, scheduled tasks run on every instance simultaneously:
- Instance 1: Sends daily email at 9:00 AM
- Instance 2: Sends daily email at 9:00 AM ❌ DUPLICATE
- Instance 3: Sends daily email at 9:00 AM ❌ DUPLICATEResult: Users receive 3 identical emails, databases are processed multiple times, resources are wasted.
✨ The Solution
pg-cron-lock ensures only ONE instance executes each scheduled task:
+ Instance 1: ✅ Acquires lock → Sends email
+ Instance 2: ❌ Lock taken → Skips gracefully
+ Instance 3: ❌ Lock taken → Skips gracefullyResult: Perfect coordination, zero duplicates, optimal resource usage.
🚀 Quick Start
Installation
npm install pg-cron-lockBasic Usage
const { PgCronLock } = require('pg-cron-lock');
const { Sequelize } = require('sequelize');
// Initialize with your existing Sequelize connection
const sequelize = new Sequelize('postgresql://user:pass@localhost:5432/db');
const cronLock = new PgCronLock({
database: { sequelize },
});
// Schedule a distributed task
cronLock
.schedule({
name: 'daily-data-processing',
schedule: '0 9 * * *', // Every day at 9 AM
handler: async (metadata) => {
console.log(`Processing on: ${metadata.nodeId}`);
// Your business logic here
await processData();
return { processed: 42 };
},
options: {
lockKey: 100001, // Unique identifier for this task
},
})
.start();
console.log('✅ Distributed cron job started!');🏗️ Key Features
| Feature | Description | Benefit |
| --------------------------- | ----------------------------------- | --------------------------- |
| 🔒 Distributed Locking | PostgreSQL Advisory Locks | No external dependencies |
| ⏰ Standard Cron Syntax | Full node-cron compatibility | Easy migration |
| 🔄 Smart Retry Logic | Configurable backoff strategies | Handles transient failures |
| 📊 Built-in Monitoring | Execution stats & health checks | Production observability |
| 🛡️ TypeScript First | Complete type definitions | Better developer experience |
| ⚡ Zero Config | Works with existing Sequelize setup | Drop-in replacement |
📖 Core Concepts
Task Configuration
cronLock.schedule({
name: 'unique-task-name', // Required: Task identifier
schedule: '0 */6 * * *', // Required: Standard cron expression
handler: async (metadata) => {
// Required: Your task logic
// metadata.nodeId - Current instance ID
// metadata.lockKey - Lock identifier
// metadata.taskName - Task name
return result; // Optional return value
},
options: {
lockKey: 200001, // Required: Unique lock identifier
timeout: 60000, // Optional: Max execution time (ms)
maxRetries: 3, // Optional: Retry attempts on failure
retryDelay: 5000, // Optional: Delay between retries (ms)
},
timezone: 'Europe/Paris', // Optional: Timezone for schedule
});Manual Task Execution
// Execute with automatic lock management
const result = await cronLock.withLock(
'manual-task',
300001, // Lock key
async (metadata) => {
await performOperation();
return { success: true };
}
);
if (result) {
console.log('Task executed:', result);
} else {
console.log('Task skipped (lock not available)');
}Task Management
// Control task lifecycle
cronLock.startAll(); // Start all scheduled tasks
cronLock.stopAll(); // Stop all scheduled tasks
cronLock.startTask('task-name'); // Start specific task
cronLock.stopTask('task-name'); // Stop specific task
await cronLock.triggerTask('task-name'); // Execute immediately
// Task information
const taskNames = cronLock.getTaskNames();
const task = cronLock.getTask('task-name');📊 Monitoring & Health
Task Statistics
const stats = cronLock.getStats('task-name');
console.log(stats);Output:
{
"taskName": "daily-processing",
"totalExecutions": 127,
"successfulExecutions": 125,
"failedExecutions": 2,
"skippedExecutions": 1045,
"averageDuration": 2340,
"lastExecution": "2024-01-15T09:00:00.000Z",
"lastSuccess": "2024-01-15T09:00:00.000Z",
"lastFailure": "2024-01-14T09:00:00.000Z",
"lastError": "Connection timeout"
}System Health
const health = await cronLock.getHealthStatus();
console.log(health);Output:
{
"databaseConnected": true,
"activeLocks": 3,
"scheduledTasks": 8,
"runningTasks": 1
}⚙️ Advanced Configuration
Production Setup
const cronLock = new PgCronLock({
database: {
sequelize: sequelizeInstance,
},
// Custom logging
logger: {
info: (msg) => logger.info(msg),
error: (msg) => logger.error(msg),
warn: (msg) => logger.warn(msg),
},
// Global defaults
defaults: {
retryOnFailure: true,
maxRetries: 5,
retryDelay: 2000,
timeout: 300000, // 5 minutes
logSuccess: false, // Reduce log noise
logFailure: true,
},
// Prevent conflicts between applications
lockKeyPrefix: 'myapp',
// Debug mode
debug: process.env.NODE_ENV === 'development',
});Error Handling
cronLock.schedule({
name: 'resilient-task',
schedule: '*/15 * * * *',
handler: async (metadata) => {
try {
await criticalOperation();
} catch (error) {
// Log and re-throw to trigger retry
console.error(`Task failed: ${error.message}`);
throw error;
}
},
options: {
lockKey: 400001,
maxRetries: 3,
retryDelay: 10000, // 10 seconds
// Callbacks for monitoring
onSuccess: (result, metadata) => {
metrics.increment('task.success');
},
onFailure: (error, metadata) => {
metrics.increment('task.failure');
alerting.send(`Task failed: ${error.message}`);
},
onSkipped: (metadata) => {
metrics.increment('task.skipped');
},
},
});Graceful Shutdown
// Handle shutdown signals
process.on('SIGTERM', async () => {
console.log('Received SIGTERM, shutting down gracefully...');
await cronLock.destroy({
timeout: 30000, // Wait up to 30s
waitForRunning: true, // Wait for running tasks
force: false, // Don't kill running tasks
});
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('Received SIGINT, shutting down gracefully...');
await cronLock.destroy({ timeout: 10000, force: true });
process.exit(0);
});🔧 Integration Examples
Express.js API
const express = require('express');
const app = express();
// Health check endpoint
app.get('/health', async (req, res) => {
try {
const health = await cronLock.getHealthStatus();
const allStats = cronLock.getAllStats();
res.json({
status: health.databaseConnected ? 'healthy' : 'unhealthy',
cron: {
scheduledTasks: health.scheduledTasks,
runningTasks: health.runningTasks,
activeLocks: health.activeLocks,
},
tasks: allStats,
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Trigger task endpoint
app.post('/tasks/:name/trigger', async (req, res) => {
try {
const result = await cronLock.triggerTask(req.params.name);
res.json({ success: true, result });
} catch (error) {
res.status(500).json({ error: error.message });
}
});📋 Requirements
- Node.js 14+ (ESM and CommonJS supported)
- PostgreSQL 11+ (uses Advisory Locks)
- Sequelize 6+ (peer dependency)
🤝 Contributing
Contributions are welcome!
- Fork the repository
- Create your feature branch:
git checkout -b feature/my-feature - Commit changes:
git commit -m 'Add my feature' - Push to branch:
git push origin feature/my-feature - Submit a Pull Request
📄 License
MIT License - see LICENSE for details.
🙏 Credits
- Built on node-cron
- Uses PostgreSQL Advisory Locks
- Integrates with Sequelize ORM
Ready to eliminate duplicate tasks in your distributed Node.js applications? Start with npm install pg-cron-lock 🚀
