nest-scheduler-engine
v3.1.0
Published
A production-grade, event-type-driven scheduling engine for NestJS applications with comprehensive fault tolerance and monitoring
Downloads
33
Maintainers
Readme
nest-scheduler-engine
A production-grade, event-type-driven scheduling engine for NestJS applications. This package provides a flexible and scalable solution for scheduling and executing recurring and one-time events with built-in fault tolerance, retry logic, dead-letter handling, and RRULE support (RFC 5545).
Version 3.1.0 - Adds declarative Cron decorator and production features (priority/concurrency). See CHANGELOG.md for details and migration guide.
Features
Core Features
- Zero Infrastructure Ownership - Package never creates DB connections, queues, or caches. All are injected via adapters.
- Adapter Pattern - Works with any PostgreSQL client (pg, TypeORM, Knex, Prisma) and queue system (SQS, BullMQ, RabbitMQ).
- Ships Database Migrations - Includes migration files that consumers run against their own database.
- Data-Driven Scheduling - All scheduling decisions from DB, no hardcoded schedules.
- RRULE Support - RFC 5545 compliant recurrence rules for complex recurring schedules.
- Framework-Agnostic Core - Core logic is framework-agnostic with a NestJS wrapper module.
- Horizontal Scalability - Multiple instances can run concurrently with database-level locking.
- Persistent Storage - All scheduler state (event types, event instances, execution history, dead letters, and cron metadata) is persisted in the consumer database via the
IDatabaseAdapteryou provide. This ensures scheduled jobs survive process restarts and multiple instances coordinate safely using database locks and idempotency keys.
Production Features (v3.0.0+)
Fault Tolerance - Circuit breaker pattern prevents cascading failures and protects downstream systems.
Advanced Retry - Exponential backoff with jitter and capped delays for optimal retry behavior.
Health Checks - Component-level health monitoring for database, queue, poller, and dispatcher.
Graceful Shutdown - Ensures in-flight events complete before process termination.
Comprehensive Error Handling - Custom error hierarchy for better error tracking and debugging.
Contextual Logging - Correlation IDs and performance tracking throughout event lifecycle.
Timeout Protection - Configurable timeouts prevent handlers from blocking indefinitely.
Dead-Letter Queue - Failed events are preserved for analysis and manual retry.
Observability Hooks - Lifecycle hooks for monitoring, alerting, and audit logging.
Edge Case Handling - Comprehensive validation for dates, RRULE, payloads, state transitions, and more. See EDGE_CASES.md.
Persistent DB-backed State - Scheduler persists event types, instances, execution history, and dead letters via the provided
IDatabaseAdapter. Persistent state enables recovery after restarts, safe horizontal scaling, and idempotent scheduling operations (use shipped migrations andtablePrefixto namespace tables).
Installation
npm install nest-scheduler-engine
# Peer dependencies (consumer must install)
npm install @nestjs/core @nestjs/common rxjsQuick Start
1. Run Migrations
First, export and run the migrations against your database:
import { getMigrations } from 'nest-scheduler-engine';
const migrations = getMigrations({ tablePrefix: 'scheduler_' });
// Use your migration tool (Knex, TypeORM, Flyway, etc.)
// migrations is an array of { version, up, down }Or use the CLI helper:
npx scheduler-engine migrations:export --output ./migrations/2. Implement Adapters
Create adapter implementations for your infrastructure:
// adapters/pg-database.adapter.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
import { IDatabaseAdapter, ITransactionClient } from 'nest-scheduler-engine';
@Injectable()
export class PgDatabaseAdapter implements IDatabaseAdapter {
constructor(private readonly pool: Pool) {}
async query<T = any>(sql: string, params?: any[]): Promise<T[]> {
const result = await this.pool.query(sql, params);
return result.rows;
}
async transaction<T>(fn: (trx: ITransactionClient) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const trx = {
query: async <T = any>(sql: string, params?: any[]) => {
const result = await client.query(sql, params);
return result.rows;
},
};
const result = await fn(trx);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async ping(): Promise<boolean> {
try {
await this.pool.query('SELECT 1');
return true;
} catch {
return false;
}
}
}// adapters/sqs-queue.adapter.ts
import { Injectable } from '@nestjs/common';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { IQueueAdapter, QueueMessage } from 'nest-scheduler-engine';
@Injectable()
export class SqsQueueAdapter implements IQueueAdapter {
constructor(
private readonly sqs: SQSClient,
private readonly queueUrl: string,
) {}
async publish(message: QueueMessage): Promise<void> {
await this.sqs.send(new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
}));
}
async subscribe(handler: (msg: QueueMessage) => Promise<void>): Promise<void> {
// Implement SQS polling and handler invocation
// This is typically done in a separate worker process
}
async ack(messageId: string): Promise<void> {
// Delete message from SQS
}
async nack(messageId: string): Promise<void> {
// Change message visibility or move to DLQ
}
}3. Register the Module
// app.module.ts
import { Module } from '@nestjs/common';
import { SchedulerModule } from 'nest-scheduler-engine';
import { PgDatabaseAdapter } from './adapters/pg-database.adapter';
import { SqsQueueAdapter } from './adapters/sqs-queue.adapter';
import { Pool } from 'pg';
@Module({
imports: [
SchedulerModule.forRoot({
// Required adapters
database: {
useFactory: (pool: Pool) => new PgDatabaseAdapter(pool),
inject: [Pool],
},
queue: {
useFactory: (sqs: SQSClient) => new SqsQueueAdapter(sqs, process.env.QUEUE_URL),
inject: [SQSClient],
},
// Optional configuration
config: {
pollingIntervalMs: 5000,
lockDurationMs: 30000,
batchSize: 50,
concurrency: 10,
deadLetterEnabled: true,
tablePrefix: 'scheduler_',
},
// Optional hooks
hooks: {
onEventScheduled: async (event) => {
console.log('Event scheduled:', event.id);
},
onEventFailed: async (event, error, retryCount) => {
console.error('Event failed:', event.id, error);
},
onEventDeadLettered: async (event) => {
// Send alert to oncall
},
},
}),
],
providers: [
// Your event handlers
ReminderHandler,
DataSyncHandler,
ReportHandler,
],
})
export class AppModule {}4. Create Event Handlers
// handlers/reminder.handler.ts
import { Injectable } from '@nestjs/common';
import { EventHandler, IEventHandler, ExecutionContext, HandlerResult } from 'nest-scheduler-engine';
@Injectable()
@EventHandler('REMINDER_NOTIFICATION')
export class ReminderHandler implements IEventHandler {
readonly eventType = 'REMINDER_NOTIFICATION';
constructor(
private readonly notificationService: NotificationService,
) {}
async handle(payload: Record<string, any>, ctx: ExecutionContext): Promise<HandlerResult> {
try {
await this.notificationService.send(payload.userId, payload.message);
return { success: true };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error),
};
}
}
}5. Schedule Events
// some.service.ts
import { Injectable } from '@nestjs/common';
import { SchedulerService, ScheduleType } from 'nest-scheduler-engine';
@Injectable()
export class NotificationService {
constructor(private readonly scheduler: SchedulerService) {}
async scheduleReminder(userId: string, message: string, sendAt: Date) {
// First, create the event type (usually done once on app startup)
await this.scheduler.createEventType({
name: 'REMINDER_NOTIFICATION',
description: 'Send reminder notifications to users',
retryPolicy: {
maxRetries: 3,
delayMs: 5000,
backoff: 'exponential',
},
});
// Schedule a one-time event
const event = await this.scheduler.scheduleEvent({
eventTypeName: 'REMINDER_NOTIFICATION',
payload: { userId, message },
scheduleType: ScheduleType.ONE_TIME,
scheduledAt: sendAt,
});
return event;
}
async scheduleRecurringReport() {
// Schedule a recurring event using RRULE
const event = await this.scheduler.scheduleEvent({
eventTypeName: 'WEEKLY_REPORT',
payload: { reportType: 'sales' },
scheduleType: ScheduleType.RECURRING,
rrule: 'FREQ=WEEKLY;BYDAY=MO;BYHOUR=9;BYMINUTE=0', // Every Monday at 9:00 AM
});
return event;
}
}Declarative Cron Jobs (new)
You can declare cron-style scheduled jobs using the @Cron method decorator. The decorator converts a standard 5-field cron expression into an RRULE and schedules it through the engine.
Example:
import { Injectable } from '@nestjs/common';
import { Cron } from 'nest-scheduler-engine/decorators/cron.decorator';
@Injectable()
export class JobsService {
@Cron('0 9 * * *', { name: 'DAILY_SUMMARY', timezone: 'America/Los_Angeles' })
async handleDailySummary() {
// business logic executed by the scheduler engine
}
}Notes:
- The decorator converts the cron expression to an RRULE (RFC 5545) using an internal converter and schedules a recurring event.
- Each decorated method is registered as an event handler and mapped to a unique event type.
- Scheduling is idempotent: by default a cron job uses an idempotency key
cron:<eventTypeName>to avoid duplicate scheduled instances on bootstrap. - The discovery currently runs at module init and will create the event type and schedule the recurring event if it does not already exist.
- Advanced features (persistent cron metadata table, timezone persistence, missed-execution recovery and versioning) are planned and will be enabled progressively; basic decorator + idempotent scheduling is available now.
API Reference
SchedulerService
Main service for interacting with the scheduler.
Event Type Management
createEventType(input: CreateEventTypeInput): Promise<EventType>getEventType(id: string): Promise<EventType | null>getEventTypeByName(name: string): Promise<EventType | null>listEventTypes(): Promise<EventType[]>deleteEventType(id: string): Promise<void>
Event Instance Management
scheduleEvent(input: ScheduleEventInput): Promise<EventInstance>getEvent(id: string): Promise<EventInstance | null>listEvents(filters?: EventFilters): Promise<PaginatedResult<EventInstance>>pauseEvent(id: string): Promise<EventInstance>resumeEvent(id: string): Promise<EventInstance>cancelEvent(id: string): Promise<void>
Execution History
getExecutions(eventId: string): Promise<EventExecution[]>
Dead Letters
listDeadLetters(): Promise<DeadLetter[]>retryDeadLetter(id: string): Promise<EventInstance>
Bulk Scheduling (v2.1.0+)
scheduleEvents(inputs: ScheduleEventInput[], options?: BulkScheduleOptions): Promise<EventInstance[]>
Middleware
use(middleware: MiddlewareFunction | IMiddleware): void
High-ROI Features (v2.1.0+)
Priority + Concurrency Control
Control execution priority and limit concurrent executions per event type.
Create Event Types with Priority:
// High priority events execute first
const urgentEventType = await scheduler.createEventType({
name: 'URGENT_NOTIFICATION',
description: 'Critical user notifications',
priority: 10, // Higher = higher priority (default: 0)
maxConcurrency: 5, // Max 5 concurrent executions (default: null = unlimited)
});
// Low priority background jobs
const backgroundEventType = await scheduler.createEventType({
name: 'BACKGROUND_SYNC',
description: 'Background data sync',
priority: 1, // Lower priority
maxConcurrency: 2, // Limit to 2 concurrent executions
});How it works:
- Polling engine orders events by
priority DESC, next_run_at ASC - Higher priority events are picked first
maxConcurrencyprevents resource exhaustion:nullorundefined= unlimited (default)> 0= maximum concurrent executions= 0= event type disabled
- Prevents starvation of low-priority events through fair scheduling
- Distributed-safe: Uses database queries to track concurrency
Use Cases:
- Critical user notifications (high priority)
- Background cleanup jobs (low priority)
- Rate-limited API calls (maxConcurrency = 1)
- Resource-intensive tasks (maxConcurrency = 3)
Bulk Scheduling API
Schedule thousands of events efficiently in a single operation.
// Prepare batch of events
const events = users.map(user => ({
eventTypeName: 'WELCOME_EMAIL',
payload: { userId: user.id, email: user.email },
scheduleType: ScheduleType.ONE_TIME,
scheduledAt: new Date(Date.now() + 3600000), // 1 hour from now
idempotencyKey: `welcome-${user.id}`, // Prevent duplicates
}));
// FAIL_ALL mode: Transaction-safe, rolls back if any event fails
const scheduled = await scheduler.scheduleEvents(events, { mode: 'FAIL_ALL' });
// PARTIAL_SUCCESS mode: Continues on errors
const scheduled = await scheduler.scheduleEvents(events, { mode: 'PARTIAL_SUCCESS' });Features:
- Transaction Support: FAIL_ALL mode uses single transaction
- Partial Success: PARTIAL_SUCCESS mode processes valid events
- Validation: All inputs validated before processing
- Idempotency: Detects duplicate idempotency keys in batch
- Performance: Optimized for 1000+ events
Options:
mode: 'FAIL_ALL'(default): All-or-nothing transactionmode: 'PARTIAL_SUCCESS': Returns successful events, logs failures
Use Cases:
- Onboarding campaigns (bulk welcome emails)
- Batch reporting jobs
- Mass reminders/notifications
- Data migration scheduling
Middleware System
Inject custom logic into the event execution pipeline.
Register Middleware:
// Logging middleware
scheduler.use(async (ctx, next) => {
console.log(`[START] ${ctx.eventType} - ${ctx.eventId}`);
const start = Date.now();
await next(); // Call next middleware or handler
const duration = Date.now() - start;
console.log(`[END] ${ctx.eventType} - ${duration}ms`);
});
// Tracing middleware (e.g., OpenTelemetry)
scheduler.use(async (ctx, next) => {
const span = tracer.startSpan(ctx.eventType, {
attributes: {
'event.id': ctx.eventId,
'event.type': ctx.eventType,
'event.attempt': ctx.attemptNumber,
'event.correlation_id': ctx.correlationId,
},
});
try {
await next();
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
throw error;
} finally {
span.end();
}
});
// Authentication/authorization middleware
scheduler.use(async (ctx, next) => {
if (ctx.payload.requiresAuth) {
await validateAuthToken(ctx.payload.token);
}
await next();
});Middleware Context:
interface MiddlewareContext {
eventId: string; // Unique event instance ID
eventType: string; // Event type name
payload: Record<string, any>; // Event payload
attemptNumber: number; // Current execution attempt
correlationId: string; // Correlation ID for tracing
scheduledAt: Date; // Original scheduled time
startedAt: Date; // Execution start time
}Class-based Middleware:
class MetricsMiddleware implements IMiddleware {
async use(ctx: MiddlewareContext, next: NextFunction) {
metrics.increment('events.started', { type: ctx.eventType });
try {
await next();
metrics.increment('events.succeeded', { type: ctx.eventType });
} catch (error) {
metrics.increment('events.failed', { type: ctx.eventType });
throw error;
}
}
}
scheduler.use(new MetricsMiddleware());Features:
- Execution Order: Middleware executes in registration order
- Pre/Post Processing: Code before
next()runs first, code after runs last - Error Propagation: Errors bubble up through middleware chain
- Async Support: Full async/await support
- No Breaking Changes: Existing handlers work unchanged
Use Cases:
- Distributed tracing (OpenTelemetry, Jaeger)
- Performance monitoring (execution times)
- Payload validation/transformation
- Authentication/authorization
- Custom logging and audit trails
- Analytics and metrics collection
Production Features
Error Handling
The package provides a comprehensive error hierarchy for better error tracking:
import {
SchedulerError, // Base error class
EventNotFoundError, // Event not found in database
HandlerNotFoundError, // No handler registered for event type
EventTypeNotFoundError, // Event type doesn't exist
InvalidEventStateError, // Invalid state transition
LockAcquisitionError, // Failed to acquire event lock
RetryExhaustedError, // Max retries reached
} from 'nest-scheduler-engine';
// All errors extend SchedulerError with context
try {
await schedulerService.getEvent('invalid-id');
} catch (error) {
if (error instanceof EventNotFoundError) {
console.log('Event not found:', error.eventId);
console.log('Stack trace:', error.stack);
}
}Fault Tolerance
Circuit Breaker
Protects downstream systems from cascading failures:
import { CircuitBreaker } from 'nest-scheduler-engine';
const breaker = new CircuitBreaker({
failureThreshold: 5, // Open after 5 failures
resetTimeout: 60000, // Try again after 60s
});
const result = await breaker.execute(async () => {
return await externalApiCall();
});States:
- CLOSED: Normal operation
- OPEN: Too many failures, reject immediately
- HALF_OPEN: Testing if service recovered
Advanced Retry Logic
import { retry, calculateRetryDelay } from 'nest-scheduler-engine';
// Retry with exponential backoff + jitter
const result = await retry(
async () => await unstableOperation(),
3, // Max 3 retries
1000, // Base delay 1s
'exponential', // Backoff strategy
);
// Calculate retry delays
const delay = calculateRetryDelay(
1000, // Base delay 1s
2, // Retry count
'exponential', // Strategy
true, // Add jitter
);
// Returns: ~4000ms ± 25% (exponential: 1s * 2^2 = 4s)Features:
- Exponential or linear backoff
- Jitter (±25%) to prevent thundering herd
- Capped at 5 minutes maximum delay
- Automatic retryable error detection
Health Checks
Monitor component health for observability:
import { Injectable } from '@nestjs/common';
import { HealthCheckService } from 'nest-scheduler-engine';
@Injectable()
export class AppHealthService {
constructor(private healthCheck: HealthCheckService) {}
async check() {
const status = await this.healthCheck.check();
console.log('Overall healthy:', status.healthy);
console.log('Database:', status.components.database);
console.log('Queue:', status.components.queue);
console.log('Poller:', status.components.poller);
console.log('Dispatcher:', status.components.dispatcher);
return status;
}
}
// Expose as HTTP endpoint
@Controller('health')
export class HealthController {
@Get()
async getHealth(@Res() res: Response) {
const health = await healthCheckService.check();
const status = health.healthy ? 200 : 503;
return res.status(status).json(health);
}
}Health Status Schema:
{
healthy: boolean; // Overall health
timestamp: Date;
components: {
database: 'HEALTHY' | 'UNHEALTHY';
queue: 'HEALTHY' | 'UNHEALTHY';
poller: 'RUNNING' | 'STOPPED';
dispatcher: 'RUNNING' | 'STOPPED';
};
}Graceful Shutdown
Ensures in-flight events complete before process termination:
import { GracefulShutdownManager } from 'nest-scheduler-engine';
const shutdownManager = new GracefulShutdownManager();
// Register shutdown handlers (SIGTERM, SIGINT)
shutdownManager.registerHandlers();
// Add cleanup callbacks
shutdownManager.addCallback(async () => {
await pollingEngine.stop();
await dispatcher.stop();
console.log('Scheduler stopped gracefully');
});
// Trigger shutdown programmatically
await shutdownManager.shutdown(5000); // 5s timeoutProcess:
- Signal received (SIGTERM/SIGINT)
- Stop accepting new work
- Wait for in-flight events to complete
- Execute cleanup callbacks
- Exit process
Timeout Protection
Prevent handlers from blocking indefinitely:
import { withTimeout } from 'nest-scheduler-engine';
@EventHandler('LONG_RUNNING_TASK')
export class TaskHandler implements IEventHandler {
async handle(payload: any) {
// Auto-timeout after 10 seconds
const result = await withTimeout(
this.externalService.process(payload),
10000,
'Task execution timeout',
);
return { success: true, result };
}
}Contextual Logging
Track events throughout their lifecycle:
import { ContextualLogger } from 'nest-scheduler-engine';
const logger = new ContextualLogger('MyHandler');
// Logs include correlation ID automatically
logger.log('Processing event', {
eventId: '123',
eventType: 'EMAIL_SEND',
});
// Performance tracking
const tracker = logger.trackPerformance('email-send');
await sendEmail();
tracker.end(); // Logs duration automaticallyLog Format:
{
"timestamp": "2025-01-15T10:30:00.000Z",
"level": "info",
"context": "MyHandler",
"correlationId": "abc-123-def",
"message": "Processing event",
"eventId": "123",
"eventType": "EMAIL_SEND"
}Deployment Modes
The package supports three deployment modes:
Combined Mode (Default)
Polls + dispatches + consumes in one process.
config: {
workerMode: true, // default
}Use case: Simple deployments, low to medium load
Poller Only
Only polls and dispatches to queue. Separate workers consume.
config: {
pollerOnly: true,
}Use case: Separate concerns, scale workers independently
Worker Only
Only consumes from queue. Separate poller dispatches.
config: {
workerMode: true,
pollerOnly: false,
}Use case: Horizontal scaling, high throughput
See PRODUCTION.md for detailed deployment guide, scaling strategies, and production best practices.
Testing
Testing Your Handlers
The package provides mock adapters for testing:
import { Test } from '@nestjs/testing';
import { SchedulerModule } from 'nest-scheduler-engine';
import { MockDatabaseAdapter, MockQueueAdapter } from 'nest-scheduler-engine/test';
describe('EmailHandler', () => {
let handler: EmailHandler;
let mockDb: MockDatabaseAdapter;
let mockQueue: MockQueueAdapter;
beforeEach(async () => {
mockDb = new MockDatabaseAdapter();
mockQueue = new MockQueueAdapter();
const module = await Test.createTestingModule({
imports: [
SchedulerModule.forRoot({
database: { useValue: mockDb },
queue: { useValue: mockQueue },
config: {
pollingIntervalMs: 1000,
workerMode: false, // Don't start polling in tests
},
}),
],
providers: [EmailHandler],
}).compile();
handler = module.get(EmailHandler);
});
it('should send email successfully', async () => {
const payload = { to: '[email protected]', body: 'Hello' };
const result = await handler.handle(payload, {
eventId: '123',
eventType: 'EMAIL_SEND',
attemptNumber: 1,
correlationId: 'test-123',
});
expect(result.success).toBe(true);
});
});Running Package Tests
# Run all tests
npm test
# Run with coverage
npm run test:cov
# Run unit tests only
npm run test:unit
# Watch mode
npm run test:watchCoverage Requirements
The package maintains test coverage:
- Statements: 40%+
- Branches: 40%+
- Functions: 40%+
- Lines: 40%+
Test Structure
test/
├── setup.ts # Jest setup
├── mocks/
│ └── adapters.mock.ts # Mock implementations
├── fixtures/
│ └── test-data.fixtures.ts # Test data factories
├── unit/
│ ├── errors.spec.ts # Error class tests
│ ├── circuit-breaker.spec.ts
│ └── retry-helper.spec.ts
├── integration/
│ ├── event-manager.spec.ts
│ └── dispatcher.spec.ts
└── e2e/
└── full-lifecycle.spec.tsRRULE Examples
// Every day at 10:00 AM
rrule: 'FREQ=DAILY;BYHOUR=10;BYMINUTE=0'
// Every Monday and Friday at 9:00 AM
rrule: 'FREQ=WEEKLY;BYDAY=MO,FR;BYHOUR=9;BYMINUTE=0'
// Last day of every month at 11:59 PM
rrule: 'FREQ=MONTHLY;BYMONTHDAY=-1;BYHOUR=23;BYMINUTE=59'
// Every 2 hours
rrule: 'FREQ=HOURLY;INTERVAL=2'See RFC 5545 for full RRULE specification.
Database Schema
The package creates the following tables:
event_types- Event type definitions with retry policiesevent_instances- Scheduled event instancesevent_executions- Execution history for audit and debuggingdead_letters- Failed events that exhausted retries
All tables support an optional prefix via tablePrefix config option.
Persistence note:
- All scheduler state (event types, event instances, execution history, dead letters) is persisted to your database via the
IDatabaseAdapteryou provide. The package does not create or manage database connections itself — consumers must supply an adapter implementation (seesrc/interfaces/database-adapter.interface.ts) that performsqueryandtransactionoperations against their chosen database client. This ensures scheduled jobs survive process restarts and can be safely scaled across multiple instances: locks, idempotency keys, and database transactions provide distributed-safety for scheduling operations.
Be sure to run the shipped migrations and set tablePrefix if you want to namespace tables in a shared schema.
What's New in v2.0.0
Version 2.0.0 brings production-grade features and comprehensive testing:
✨ New Features
- Circuit Breaker Pattern - Protect downstream systems from cascading failures
- Health Check System - Component-level health monitoring
- Graceful Shutdown - Ensure in-flight events complete before exit
- Enhanced Retry Logic - Exponential backoff with jitter and capping
- Timeout Protection - Prevent handlers from blocking indefinitely
- Contextual Logging - Correlation IDs and performance tracking
- Custom Error Hierarchy - 8 specialized error classes for better debugging
🧪 Testing & Quality
- Comprehensive Test Suite - 35+ unit tests with 96%+ coverage of critical utilities
- Mock Adapters - Easy testing with provided mocks
- Test Fixtures - Factory functions for common test data
- Coverage Reporting - Integrated with Jest
📚 Documentation
- PRODUCTION.md - Complete production deployment guide
- EDGE_CASE_HANDLING.md - Comprehensive documentation of all edge cases handled by the scheduler
- CHANGELOG.md - Detailed migration guide from v1.x
- Enhanced README - Production features and best practices
For detailed information about how the scheduler handles edge cases including:
- Time-related issues (DST, leap years, timezone)
- RRULE edge cases (infinite loops, zero occurrences)
- Concurrency and distributed system challenges
- Security and data integrity protections
- State transition validation
🔧 Breaking Changes
See CHANGELOG.md for migration guide from v1.x to v2.0.0.
Architecture
┌─────────────────────────────────────────────────────────────┐
│ NestJS Application │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Handler │ │ Handler │ │ Handler │ │
│ │ @EventHandler│ │ @EventHandler│ │ @EventHandler│ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┴────────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ HandlerRegistry│ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────────┴───────────────────┐ │
│ │ │ │
│ ┌──────▼──────┐ ┌───────▼──────┐ │
│ │PollingEngine│◄──────────────────────┤ Dispatcher │ │
│ │(Database) │ │ (Queue) │ │
│ └──────┬──────┘ └───────┬──────┘ │
│ │ │ │
└─────────┼──────────────────────────────────────┼────────────┘
│ │
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ PostgreSQL │ │MessageQueue │
│ (Adapter) │ │ (Adapter) │
└─────────────┘ └─────────────┘Flow:
- PollingEngine polls database for due events
- Events are locked to prevent duplicate processing
- Events are dispatched to message queue
- Dispatcher consumes queue and invokes registered handlers
- Results are recorded in execution history
- Failed events are retried with backoff
- Exhausted events move to dead-letter queue
Performance Characteristics
- Polling Overhead: ~10-50ms per poll cycle (depends on batch size)
- Lock Contention: Database-level row locks, scales to 100+ instances
- Throughput: 1000+ events/second with proper queue and DB tuning
- Latency: 1-5 seconds typical (polling interval + queue latency)
For time-critical events (< 1s latency), consider reducing pollingIntervalMs to 1000ms or using a push-based system.
FAQ
How does it handle clock skew in distributed systems?
The scheduler uses database timestamps (NOW()) for all time comparisons, ensuring consistency across all instances regardless of application server clock skew.
What happens if a handler takes longer than lockDurationMs?
The event will be unlocked and potentially picked up by another instance. Set lockDurationMs to your P99 handler execution time + buffer. Use the withTimeout utility to enforce handler timeouts.
Can I use this without a message queue?
Yes! The package supports in-memory queue adapters for development and testing. However, message queues are recommended for production to ensure durability and horizontal scaling.
How do I pause all events of a specific type?
await schedulerService.deleteEventType('EVENT_TYPE_NAME');
// All instances of this type will stop being scheduledHow do I view failed events?
const deadLetters = await schedulerService.listDeadLetters();
deadLetters.forEach(dl => {
console.log(`Event ${dl.eventInstanceId} failed: ${dl.lastError}`);
});Can I manually trigger an event execution?
The package focuses on time-based scheduling. For manual/immediate execution, consider using a separate job queue (BullMQ, etc.) or invoke your handler directly.
Troubleshooting
Events not executing
- Check health status:
healthCheckService.check() - Verify poller is running:
config.workerModeshould betrue - Check for stuck locks:
SELECT * FROM scheduler_event_instances WHERE status = 'EXECUTING' AND locked_until < NOW(); - Review logs for errors
High dead-letter rate
- Check handler implementation for bugs
- Verify external dependencies are available
- Review retry policy (may be too aggressive)
- Check handler timeout settings
Poor performance
- Increase
batchSizefor higher throughput - Add database indexes (included in migrations)
- Scale workers horizontally
- Optimize handler execution time
- Consider database connection pooling
See PRODUCTION.md for detailed troubleshooting guide.
Roadmap
- [ ] Support for MySQL/MariaDB
- [ ] Built-in observability with OpenTelemetry
- [ ] Event scheduling UI/Dashboard
- [ ] Distributed tracing integration
- [ ] Advanced scheduling (cron, relative schedules)
- [ ] Event dependencies (wait for parent event)
License
MIT
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for details.
Support
For issues and feature requests, please use the GitHub issue tracker.
