cron-safe
v2.0.1
Published
A robust wrapper around node-cron with automatic retries, overlap prevention, execution timeout, history tracking, and structured error handling
Downloads
230
Maintainers
Readme
cron-safe
A robust wrapper around node-cron with automatic retries, overlap prevention, distributed locking, concurrency control, execution timeout, history tracking, metrics, persistent storage, and structured error handling.
Why cron-safe?
Standard node-cron jobs are vulnerable to:
- ❌ Silent failures — A network glitch fails a task, and it won't retry until the next schedule (potentially hours later)
- ❌ Overlapping executions — Long-running tasks stack up, causing memory leaks or data corruption
- ❌ Zombie tasks — Hanging tasks that never complete block all future executions
- ❌ No visibility — No way to see when tasks last ran or if they're currently running
- ❌ Unhandled rejections — Async errors crash your process or go unnoticed
- ❌ Duplicate runs — In multi-instance deployments, the same job runs on every server
- ❌ No persistence — History and retry state lost after restart
cron-safe wraps your tasks with a protective layer:
- ✅ Automatic retries with configurable delays
- ✅ Exponential/linear backoff — smart retry delays that grow over time
- ✅ Overlap prevention — ensures only one instance runs at a time
- ✅ Distributed locking — ensures only one instance runs across multiple servers (Redis, etc.)
- ✅ Concurrency control — allow N parallel executions with
maxConcurrency - ✅ Execution timeout — kills zombie tasks that run too long
- ✅ Execution history — audit log of past runs with status and duration
- ✅ Persistent storage — save history to database, survive restarts
- ✅ Metrics & observability — track runs, failures, avg duration, export to Prometheus/OpenTelemetry
- ✅ Dynamic schedule update — change cron expression at runtime without restart
- ✅ Next run predictor — know exactly when your job runs next
- ✅ Async trigger — manually trigger tasks and await results
- ✅ Notification hooks — integrate with Slack, email, or any notification system
- ✅ Lifecycle hooks —
onStart,onSuccess,onRetry,onError,onTimeout
Installation
npm install cron-safe node-cronNote:
node-cronis a peer dependency. You must install it separately.
Quick Start
import { schedule } from 'cron-safe';
// Simple scheduled task
const task = schedule('*/5 * * * *', async () => {
const data = await fetchDataFromAPI();
await saveToDatabase(data);
return data; // Return value available via trigger()
});
// Stop when needed
task.stop();Features
Automatic Retries
import { schedule } from 'cron-safe';
const task = schedule('0 * * * *', async () => {
await unreliableApiCall();
}, {
retries: 3, // Retry up to 3 times
retryDelay: 5000, // Wait 5 seconds between retries
onRetry: (error, attempt) => {
console.log(`Attempt ${attempt} failed:`, error.message);
},
onError: (error) => {
// Called after all retries are exhausted
alertOpsTeam('Critical task failed!', error);
},
});Exponential & Linear Backoff
Smart retry delays that grow over time, preventing thundering herd problems:
import { schedule } from 'cron-safe';
const task = schedule('0 * * * *', async () => {
await unreliableApiCall();
}, {
retries: 5,
retryDelay: 1000, // Base delay: 1 second
backoffStrategy: 'exponential', // 2s, 4s, 8s, 16s, 32s
maxRetryDelay: 30000, // Cap at 30 seconds
onRetry: (error, attempt) => {
console.log(`Retry ${attempt}, next delay will be longer...`);
},
});
// Available strategies:
// - 'fixed': Same delay every time (default)
// - 'linear': delay * attempt (1s, 2s, 3s, 4s, 5s)
// - 'exponential': delay * 2^attempt (2s, 4s, 8s, 16s, 32s)Overlap Prevention
import { schedule } from 'cron-safe';
// This task runs every minute but might take 90 seconds
const task = schedule('* * * * *', async () => {
await longRunningDataSync(); // Takes ~90 seconds
}, {
preventOverlap: true, // Skip if previous run still executing
onOverlapSkip: () => {
console.log('Skipped: previous execution still running');
},
});Concurrency Control
Allow limited parallel executions instead of binary overlap prevention:
import { schedule } from 'cron-safe';
// Allow up to 3 concurrent executions
const task = schedule('* * * * *', async () => {
await processQueue();
}, {
maxConcurrency: 3, // Allow 3 parallel runs
onOverlapSkip: () => {
console.log('Max concurrency reached, skipping');
},
});When both
preventOverlapandmaxConcurrencyare set,maxConcurrencytakes priority. SettingmaxConcurrency: 1is equivalent topreventOverlap: true.
Distributed Locking
Ensure only one instance runs across multiple servers (Docker, PM2 cluster, auto-scaling):
import { schedule, LockProvider } from 'cron-safe';
// Implement the LockProvider interface for your lock backend
const redisLockProvider: LockProvider = {
async acquire(key: string, ttl: number): Promise<string | null> {
// SET key uniqueId NX PX ttl
const lockId = `${Date.now()}-${Math.random().toString(36).slice(2)}`;
const result = await redis.set(key, lockId, 'PX', ttl, 'NX');
return result === 'OK' ? lockId : null;
},
async release(key: string, lockId: string): Promise<void> {
// Only release if we still own the lock
const currentId = await redis.get(key);
if (currentId === lockId) {
await redis.del(key);
}
},
async extend(key: string, lockId: string, ttl: number): Promise<boolean> {
const currentId = await redis.get(key);
if (currentId === lockId) {
await redis.pexpire(key, ttl);
return true;
}
return false;
},
};
const task = schedule('0 * * * *', async () => {
await processOrders();
}, {
name: 'order-processor',
distributedLock: {
provider: redisLockProvider,
ttl: 120000, // Lock expires after 2 minutes
autoExtend: true, // Auto-extend lock for long-running tasks
extendInterval: 60000, // Extend every 60 seconds
},
});The lock is automatically released after task completion (success or failure). If the process crashes, the TTL ensures the lock eventually expires as a safety net.
Persistent Storage
Save execution history to a database so data survives restarts and crashes:
import { schedule, StorageAdapter, StoredRunRecord } from 'cron-safe';
// Implement the StorageAdapter interface for your database
const postgresAdapter: StorageAdapter = {
async saveRun(record: StoredRunRecord) {
await db.query('INSERT INTO cron_runs ...', record);
},
async updateRun(taskName: string, startedAt: string, updates: Partial<StoredRunRecord>) {
await db.query('UPDATE cron_runs SET ... WHERE task_name = $1 AND started_at = $2', ...);
},
async getRuns(taskName: string, limit: number) {
return db.query('SELECT * FROM cron_runs WHERE task_name = $1 ORDER BY started_at DESC LIMIT $2', taskName, limit);
},
async getLastIncompleteRun(taskName: string) {
return db.query("SELECT * FROM cron_runs WHERE task_name = $1 AND status = 'running' ORDER BY started_at DESC LIMIT 1", taskName);
},
};
const task = schedule('0 * * * *', async () => {
await generateReport();
}, {
name: 'hourly-report',
storage: postgresAdapter,
historyLimit: 100,
});On startup, cron-safe automatically loads the last N history records from storage, preserving your audit trail across restarts.
Metrics & Observability
Track task performance in real-time:
import { schedule } from 'cron-safe';
const task = schedule('* * * * *', async () => {
await processData();
}, {
name: 'data-processor',
});
// Get metrics snapshot
const metrics = task.getMetrics();
console.log(metrics);
// {
// totalRuns: 150,
// totalSuccess: 145,
// totalFailures: 3,
// totalTimeouts: 2,
// totalRetries: 10,
// totalOverlapSkips: 5,
// currentRunning: 1,
// avgDuration: 2340, // ms
// lastRunAt: Date,
// lastStatus: 'success'
// }Export to Prometheus / OpenTelemetry
import { schedule, MetricsProvider } from 'cron-safe';
const prometheusMetrics: MetricsProvider = {
recordEvent(taskName, event, duration) {
// Forward to your metrics system
cronRunsTotal.labels(taskName, event).inc();
if (duration) {
cronDurationHistogram.labels(taskName).observe(duration / 1000);
}
},
};
const task = schedule('* * * * *', processData, {
name: 'data-processor',
metricsProvider: prometheusMetrics,
});Dynamic Schedule Update
Change the cron expression at runtime without losing state:
import { schedule } from 'cron-safe';
const task = schedule('*/5 * * * *', async () => {
await syncData();
}, {
name: 'data-sync',
});
// Later, update schedule via admin dashboard or feature flag
task.updateSchedule('*/30 * * * *'); // Slow down to every 30 minutes
// All history, metrics, and state are preserved
console.log(task.getHistory().length); // Still has previous history
console.log(task.getMetrics()); // Still has accumulated metricsThrows an error if the new cron expression is invalid. The task must not be stopped when updating.
Execution Timeout (Safety Valve)
Prevent zombie tasks from blocking future executions:
import { schedule, TimeoutError } from 'cron-safe';
const task = schedule('*/5 * * * *', async () => {
await potentiallyHangingOperation();
}, {
executionTimeout: 30000, // 30 second timeout
onTimeout: (error) => {
console.error('Task timed out!', error.message);
// error instanceof TimeoutError === true
},
});Execution History (Audit Log)
Track past executions with status, duration, and errors:
import { schedule } from 'cron-safe';
const task = schedule('0 * * * *', async () => {
return await generateReport();
}, {
historyLimit: 20, // Keep last 20 executions (default: 10)
});
// Check execution history
const history = task.getHistory();
console.log(history);
// [
// {
// startedAt: Date,
// endedAt: Date,
// duration: 1234, // ms
// status: 'success' | 'failed' | 'timeout',
// error?: Error,
// triggeredBy: 'schedule' | 'manual'
// },
// ...
// ]
// Find failed executions
const failures = history.filter(h => h.status === 'failed');Next Run Predictor
Know exactly when your job runs next:
import { schedule } from 'cron-safe';
const task = schedule('0 9 * * *', async () => {
await sendDailyDigest();
});
const nextRun = task.nextRun();
console.log(`Next run: ${nextRun}`); // Date object or null if stopped
// Show in UI
const timeUntilNext = nextRun.getTime() - Date.now();
console.log(`Next backup in ${Math.round(timeUntilNext / 60000)} minutes`);Async Trigger with Results
Manually trigger tasks and get results (great for testing):
import { schedule } from 'cron-safe';
const task = schedule('0 0 * * *', async () => {
const report = await generateDailyReport();
return report; // Return the result
});
// Manual trigger returns the result
const result = await task.trigger();
console.log('Report:', result);
// Respects overlap prevention
// If preventOverlap is true and task is running, returns undefinedNotification Hooks
Integrate with Slack, email, or any notification system. The notifier callback receives structured payloads when events occur:
import { schedule, NotificationPayload } from 'cron-safe';
const task = schedule('0 * * * *', async () => {
await processData();
}, {
name: 'hourly-processor',
notifier: (payload: NotificationPayload) => {
console.log(`[${payload.taskName}] ${payload.event} at ${payload.timestamp}`);
// payload.result - for success events
// payload.error - for error/timeout events
// payload.duration - execution time in ms
// payload.attemptsMade - number of attempts
},
notifyOn: {
success: true, // default: true
error: true, // default: true
timeout: true, // default: true
overlapSkip: false, // default: false
lockFailed: false, // default: false
},
});Slack Webhook Adapter
import { schedule, NotificationPayload } from 'cron-safe';
const slackNotifier = async (payload: NotificationPayload) => {
const color = payload.event === 'success' ? 'good' : 'danger';
await fetch(process.env.SLACK_WEBHOOK_URL!, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
attachments: [{
color,
title: `Cron Job: ${payload.taskName}`,
text: `Event: ${payload.event.toUpperCase()}`,
fields: [
{ title: 'Duration', value: `${payload.duration}ms`, short: true },
{ title: 'Attempts', value: payload.attemptsMade, short: true },
],
ts: Math.floor(payload.timestamp.getTime() / 1000),
}],
}),
});
};
const task = schedule('0 6 * * *', dailyBackup, {
name: 'daily-backup',
notifier: slackNotifier,
notifyOn: { success: false, error: true }, // Only notify on failures
});Email Adapter (using nodemailer)
import { schedule, NotificationPayload } from 'cron-safe';
import nodemailer from 'nodemailer';
const transporter = nodemailer.createTransport({
host: 'smtp.example.com',
auth: { user: 'user', pass: 'pass' },
});
const emailNotifier = async (payload: NotificationPayload) => {
if (payload.event !== 'error' && payload.event !== 'timeout') return;
await transporter.sendMail({
from: '[email protected]',
to: '[email protected]',
subject: `[ALERT] ${payload.taskName} ${payload.event}`,
text: `
Task: ${payload.taskName}
Event: ${payload.event}
Time: ${payload.timestamp}
Duration: ${payload.duration}ms
Error: ${payload.error?.message ?? 'N/A'}
Attempts: ${payload.attemptsMade}
`,
});
};
const task = schedule('0 0 * * *', nightlyCleanup, {
name: 'nightly-cleanup',
notifier: emailNotifier,
});Full Lifecycle Hooks
import { schedule } from 'cron-safe';
const task = schedule('0 9 * * *', async () => {
return await generateDailyReport();
}, {
name: 'daily-report',
retries: 2,
retryDelay: 10000,
preventOverlap: true,
executionTimeout: 60000,
historyLimit: 50,
onStart: () => {
console.log('[daily-report] Starting execution');
},
onSuccess: (result) => {
console.log('[daily-report] Completed:', result);
},
onRetry: (error, attempt) => {
console.warn(`[daily-report] Retry ${attempt}:`, error.message);
},
onError: (error) => {
console.error('[daily-report] Failed permanently:', error);
sendSlackAlert('Daily report generation failed!');
},
onTimeout: (error) => {
console.error('[daily-report] Timed out:', error.message);
},
onOverlapSkip: () => {
console.warn('[daily-report] Skipped due to overlap');
},
});API
schedule(cronExpression, task, options?)
Schedules a task with automatic retries, timeout, and overlap prevention.
Parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| cronExpression | string | A valid cron expression (e.g., '* * * * *') |
| task | () => T \| Promise<T> | The function to execute |
| options | CronSafeOptions<T> | Configuration options (see below) |
Returns: CronSafeTask<T>
CronSafeOptions<T>
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| name | string | undefined | Identifier for logging/debugging |
| retries | number | 0 | Number of retry attempts after failure |
| retryDelay | number | 0 | Base delay in ms between retries |
| backoffStrategy | 'fixed' \| 'linear' \| 'exponential' | 'fixed' | How delay grows between retries |
| maxRetryDelay | number | undefined | Maximum delay cap for backoff |
| preventOverlap | boolean | false | Skip execution if previous run is active |
| maxConcurrency | number | undefined | Max concurrent executions allowed |
| executionTimeout | number | undefined | Max execution time in ms before timeout |
| historyLimit | number | 10 | Max number of history entries to keep |
| distributedLock | DistributedLockConfig | — | Distributed locking configuration |
| storage | StorageAdapter | — | Persistent storage adapter |
| metricsProvider | MetricsProvider | — | External metrics export provider |
| onStart | () => void | — | Called when task starts |
| onSuccess | (result: T) => void | — | Called with result on success |
| onRetry | (error, attempt) => void | — | Called before each retry |
| onError | (error) => void | — | Called when all retries exhausted |
| onTimeout | (error: Error) => void | — | Called when task times out |
| onOverlapSkip | () => void | — | Called when execution is skipped |
| notifier | Notifier<T> | — | Callback for Slack/email/custom notifications |
| notifyOn | NotifyOn | { success: true, error: true, timeout: true, overlapSkip: false, lockFailed: false } | Which events trigger notifications |
| timezone | string | — | Timezone for cron schedule |
| scheduled | boolean | true | Start immediately or wait for .start() |
| runOnInit | boolean | false | Run task immediately on creation |
DistributedLockConfig
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| provider | LockProvider | — | Lock provider implementation (required) |
| ttl | number | 60000 | Lock time-to-live in ms |
| autoExtend | boolean | false | Auto-extend lock while task is running |
| extendInterval | number | ttl / 2 | Interval in ms for auto-extending |
LockProvider
Interface to implement for your distributed lock backend:
| Method | Signature | Description |
|--------|-----------|-------------|
| acquire | (key: string, ttl: number) => Promise<string \| null> | Acquire lock, return lock ID or null |
| release | (key: string, lockId: string) => Promise<void> | Release the lock |
| extend? | (key: string, lockId: string, ttl: number) => Promise<boolean> | Extend lock TTL (optional) |
StorageAdapter
Interface to implement for persistent storage:
| Method | Signature | Description |
|--------|-----------|-------------|
| saveRun | (record: StoredRunRecord) => Promise<void> | Save a new run record |
| updateRun | (taskName, startedAt, updates) => Promise<void> | Update an existing run |
| getRuns | (taskName, limit) => Promise<StoredRunRecord[]> | Get recent runs |
| getLastIncompleteRun? | (taskName) => Promise<StoredRunRecord \| null> | Get crashed run (optional) |
MetricsProvider
Interface for metrics export:
| Method | Signature | Description |
|--------|-----------|-------------|
| recordEvent | (taskName, event, duration?) => void | Record a task event |
Events: 'start', 'success', 'failure', 'timeout', 'retry', 'overlapSkip'
CronSafeTask<T>
The object returned by schedule():
| Method | Returns | Description |
|--------|---------|-------------|
| start() | void | Start the scheduled task |
| stop() | void | Stop the scheduled task |
| getStatus() | 'scheduled' \| 'running' \| 'stopped' | Current status |
| trigger() | Promise<T \| undefined> | Execute immediately, returns result |
| getHistory() | RunHistory[] | Get execution history (newest first) |
| nextRun() | Date \| null | Next scheduled run time |
| getMetrics() | TaskMetrics | Get real-time metrics snapshot |
| updateSchedule(expr) | void | Change cron expression at runtime |
TaskMetrics
| Property | Type | Description |
|----------|------|-------------|
| totalRuns | number | Total number of runs started |
| totalSuccess | number | Number of successful completions |
| totalFailures | number | Number of failed runs |
| totalTimeouts | number | Number of timed out runs |
| totalRetries | number | Total retry attempts across all runs |
| totalOverlapSkips | number | Number of runs skipped due to overlap |
| currentRunning | number | Number of currently executing runs |
| avgDuration | number | Average execution duration in ms |
| lastRunAt | Date \| undefined | When the last run completed |
| lastStatus | string \| undefined | Status of the last completed run |
RunHistory
| Property | Type | Description |
|----------|------|-------------|
| startedAt | Date | When execution started |
| endedAt | Date \| undefined | When execution ended |
| duration | number \| undefined | Duration in milliseconds |
| status | 'running' \| 'success' \| 'failed' \| 'timeout' | Execution status |
| error | Error \| undefined | Error if failed/timeout |
| triggeredBy | 'schedule' \| 'manual' | How the run was triggered |
validate(expression)
Validates a cron expression. Re-exported from node-cron.
import { validate } from 'cron-safe';
console.log(validate('* * * * *')); // true
console.log(validate('invalid')); // falseTimeoutError
Error class thrown when a task exceeds its execution timeout.
import { TimeoutError } from 'cron-safe';
// In your onError handler
onError: (error) => {
if (error instanceof TimeoutError) {
console.log('Task timed out');
}
}Migration from node-cron
Before:
import cron from 'node-cron';
cron.schedule('* * * * *', async () => {
await myTask(); // Errors go unhandled!
});After:
import { schedule } from 'cron-safe';
schedule('* * * * *', async () => {
await myTask();
}, {
retries: 3,
executionTimeout: 30000,
onError: (err) => console.error('Task failed:', err),
});TypeScript
Full TypeScript support with strict types:
import { schedule, CronSafeOptions, CronSafeTask, RunHistory, TaskMetrics } from 'cron-safe';
interface ReportResult {
rowsProcessed: number;
duration: number;
}
const options: CronSafeOptions<ReportResult> = {
retries: 2,
executionTimeout: 60000,
historyLimit: 100,
onSuccess: (result) => {
// result is typed as ReportResult
console.log(`Processed ${result.rowsProcessed} rows`);
},
};
const task: CronSafeTask<ReportResult> = schedule('0 * * * *', async (): Promise<ReportResult> => {
return { rowsProcessed: 1000, duration: 5000 };
}, options);
// Trigger returns typed result
const result = await task.trigger();
if (result) {
console.log(result.rowsProcessed); // TypeScript knows this is a number
}
// History is also typed
const history: RunHistory[] = task.getHistory();
// Metrics snapshot
const metrics: TaskMetrics = task.getMetrics();
console.log(`Success rate: ${(metrics.totalSuccess / metrics.totalRuns * 100).toFixed(1)}%`);