npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

nest-scheduler-engine

v3.1.0

Published

A production-grade, event-type-driven scheduling engine for NestJS applications with comprehensive fault tolerance and monitoring

Downloads

33

Readme

nest-scheduler-engine

A production-grade, event-type-driven scheduling engine for NestJS applications. This package provides a flexible and scalable solution for scheduling and executing recurring and one-time events with built-in fault tolerance, retry logic, dead-letter handling, and RRULE support (RFC 5545).

Version 3.1.0 - Adds declarative Cron decorator and production features (priority/concurrency). See CHANGELOG.md for details and migration guide.

Features

Core Features

  • Zero Infrastructure Ownership - Package never creates DB connections, queues, or caches. All are injected via adapters.
  • Adapter Pattern - Works with any PostgreSQL client (pg, TypeORM, Knex, Prisma) and queue system (SQS, BullMQ, RabbitMQ).
  • Ships Database Migrations - Includes migration files that consumers run against their own database.
  • Data-Driven Scheduling - All scheduling decisions from DB, no hardcoded schedules.
  • RRULE Support - RFC 5545 compliant recurrence rules for complex recurring schedules.
  • Framework-Agnostic Core - Core logic is framework-agnostic with a NestJS wrapper module.
  • Horizontal Scalability - Multiple instances can run concurrently with database-level locking.
  • Persistent Storage - All scheduler state (event types, event instances, execution history, dead letters, and cron metadata) is persisted in the consumer database via the IDatabaseAdapter you provide. This ensures scheduled jobs survive process restarts and multiple instances coordinate safely using database locks and idempotency keys.

Production Features (v3.0.0+)

  • Fault Tolerance - Circuit breaker pattern prevents cascading failures and protects downstream systems.

  • Advanced Retry - Exponential backoff with jitter and capped delays for optimal retry behavior.

  • Health Checks - Component-level health monitoring for database, queue, poller, and dispatcher.

  • Graceful Shutdown - Ensures in-flight events complete before process termination.

  • Comprehensive Error Handling - Custom error hierarchy for better error tracking and debugging.

  • Contextual Logging - Correlation IDs and performance tracking throughout event lifecycle.

  • Timeout Protection - Configurable timeouts prevent handlers from blocking indefinitely.

  • Dead-Letter Queue - Failed events are preserved for analysis and manual retry.

  • Observability Hooks - Lifecycle hooks for monitoring, alerting, and audit logging.

  • Edge Case Handling - Comprehensive validation for dates, RRULE, payloads, state transitions, and more. See EDGE_CASES.md.

  • Persistent DB-backed State - Scheduler persists event types, instances, execution history, and dead letters via the provided IDatabaseAdapter. Persistent state enables recovery after restarts, safe horizontal scaling, and idempotent scheduling operations (use shipped migrations and tablePrefix to namespace tables).

Installation

npm install nest-scheduler-engine

# Peer dependencies (consumer must install)
npm install @nestjs/core @nestjs/common rxjs

Quick Start

1. Run Migrations

First, export and run the migrations against your database:

import { getMigrations } from 'nest-scheduler-engine';

const migrations = getMigrations({ tablePrefix: 'scheduler_' });

// Use your migration tool (Knex, TypeORM, Flyway, etc.)
// migrations is an array of { version, up, down }

Or use the CLI helper:

npx scheduler-engine migrations:export --output ./migrations/

2. Implement Adapters

Create adapter implementations for your infrastructure:

// adapters/pg-database.adapter.ts
import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
import { IDatabaseAdapter, ITransactionClient } from 'nest-scheduler-engine';

@Injectable()
export class PgDatabaseAdapter implements IDatabaseAdapter {
  constructor(private readonly pool: Pool) {}

  async query<T = any>(sql: string, params?: any[]): Promise<T[]> {
    const result = await this.pool.query(sql, params);
    return result.rows;
  }

  async transaction<T>(fn: (trx: ITransactionClient) => Promise<T>): Promise<T> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      const trx = {
        query: async <T = any>(sql: string, params?: any[]) => {
          const result = await client.query(sql, params);
          return result.rows;
        },
      };
      const result = await fn(trx);
      await client.query('COMMIT');
      return result;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async ping(): Promise<boolean> {
    try {
      await this.pool.query('SELECT 1');
      return true;
    } catch {
      return false;
    }
  }
}
// adapters/sqs-queue.adapter.ts
import { Injectable } from '@nestjs/common';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { IQueueAdapter, QueueMessage } from 'nest-scheduler-engine';

@Injectable()
export class SqsQueueAdapter implements IQueueAdapter {
  constructor(
    private readonly sqs: SQSClient,
    private readonly queueUrl: string,
  ) {}

  async publish(message: QueueMessage): Promise<void> {
    await this.sqs.send(new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(message),
    }));
  }

  async subscribe(handler: (msg: QueueMessage) => Promise<void>): Promise<void> {
    // Implement SQS polling and handler invocation
    // This is typically done in a separate worker process
  }

  async ack(messageId: string): Promise<void> {
    // Delete message from SQS
  }

  async nack(messageId: string): Promise<void> {
    // Change message visibility or move to DLQ
  }
}

3. Register the Module

// app.module.ts
import { Module } from '@nestjs/common';
import { SchedulerModule } from 'nest-scheduler-engine';
import { PgDatabaseAdapter } from './adapters/pg-database.adapter';
import { SqsQueueAdapter } from './adapters/sqs-queue.adapter';
import { Pool } from 'pg';

@Module({
  imports: [
    SchedulerModule.forRoot({
      // Required adapters
      database: {
        useFactory: (pool: Pool) => new PgDatabaseAdapter(pool),
        inject: [Pool],
      },
      queue: {
        useFactory: (sqs: SQSClient) => new SqsQueueAdapter(sqs, process.env.QUEUE_URL),
        inject: [SQSClient],
      },

      // Optional configuration
      config: {
        pollingIntervalMs: 5000,
        lockDurationMs: 30000,
        batchSize: 50,
        concurrency: 10,
        deadLetterEnabled: true,
        tablePrefix: 'scheduler_',
      },

      // Optional hooks
      hooks: {
        onEventScheduled: async (event) => {
          console.log('Event scheduled:', event.id);
        },
        onEventFailed: async (event, error, retryCount) => {
          console.error('Event failed:', event.id, error);
        },
        onEventDeadLettered: async (event) => {
          // Send alert to oncall
        },
      },
    }),
  ],
  providers: [
    // Your event handlers
    ReminderHandler,
    DataSyncHandler,
    ReportHandler,
  ],
})
export class AppModule {}

4. Create Event Handlers

// handlers/reminder.handler.ts
import { Injectable } from '@nestjs/common';
import { EventHandler, IEventHandler, ExecutionContext, HandlerResult } from 'nest-scheduler-engine';

@Injectable()
@EventHandler('REMINDER_NOTIFICATION')
export class ReminderHandler implements IEventHandler {
  readonly eventType = 'REMINDER_NOTIFICATION';

  constructor(
    private readonly notificationService: NotificationService,
  ) {}

  async handle(payload: Record<string, any>, ctx: ExecutionContext): Promise<HandlerResult> {
    try {
      await this.notificationService.send(payload.userId, payload.message);
      return { success: true };
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : String(error),
      };
    }
  }
}

5. Schedule Events

// some.service.ts
import { Injectable } from '@nestjs/common';
import { SchedulerService, ScheduleType } from 'nest-scheduler-engine';

@Injectable()
export class NotificationService {
  constructor(private readonly scheduler: SchedulerService) {}

  async scheduleReminder(userId: string, message: string, sendAt: Date) {
    // First, create the event type (usually done once on app startup)
    await this.scheduler.createEventType({
      name: 'REMINDER_NOTIFICATION',
      description: 'Send reminder notifications to users',
      retryPolicy: {
        maxRetries: 3,
        delayMs: 5000,
        backoff: 'exponential',
      },
    });

    // Schedule a one-time event
    const event = await this.scheduler.scheduleEvent({
      eventTypeName: 'REMINDER_NOTIFICATION',
      payload: { userId, message },
      scheduleType: ScheduleType.ONE_TIME,
      scheduledAt: sendAt,
    });

    return event;
  }

  async scheduleRecurringReport() {
    // Schedule a recurring event using RRULE
    const event = await this.scheduler.scheduleEvent({
      eventTypeName: 'WEEKLY_REPORT',
      payload: { reportType: 'sales' },
      scheduleType: ScheduleType.RECURRING,
      rrule: 'FREQ=WEEKLY;BYDAY=MO;BYHOUR=9;BYMINUTE=0', // Every Monday at 9:00 AM
    });

    return event;
  }
}

Declarative Cron Jobs (new)

You can declare cron-style scheduled jobs using the @Cron method decorator. The decorator converts a standard 5-field cron expression into an RRULE and schedules it through the engine.

Example:

import { Injectable } from '@nestjs/common';
import { Cron } from 'nest-scheduler-engine/decorators/cron.decorator';

@Injectable()
export class JobsService {
  @Cron('0 9 * * *', { name: 'DAILY_SUMMARY', timezone: 'America/Los_Angeles' })
  async handleDailySummary() {
    // business logic executed by the scheduler engine
  }
}

Notes:

  • The decorator converts the cron expression to an RRULE (RFC 5545) using an internal converter and schedules a recurring event.
  • Each decorated method is registered as an event handler and mapped to a unique event type.
  • Scheduling is idempotent: by default a cron job uses an idempotency key cron:<eventTypeName> to avoid duplicate scheduled instances on bootstrap.
  • The discovery currently runs at module init and will create the event type and schedule the recurring event if it does not already exist.
  • Advanced features (persistent cron metadata table, timezone persistence, missed-execution recovery and versioning) are planned and will be enabled progressively; basic decorator + idempotent scheduling is available now.

API Reference

SchedulerService

Main service for interacting with the scheduler.

Event Type Management

  • createEventType(input: CreateEventTypeInput): Promise<EventType>
  • getEventType(id: string): Promise<EventType | null>
  • getEventTypeByName(name: string): Promise<EventType | null>
  • listEventTypes(): Promise<EventType[]>
  • deleteEventType(id: string): Promise<void>

Event Instance Management

  • scheduleEvent(input: ScheduleEventInput): Promise<EventInstance>
  • getEvent(id: string): Promise<EventInstance | null>
  • listEvents(filters?: EventFilters): Promise<PaginatedResult<EventInstance>>
  • pauseEvent(id: string): Promise<EventInstance>
  • resumeEvent(id: string): Promise<EventInstance>
  • cancelEvent(id: string): Promise<void>

Execution History

  • getExecutions(eventId: string): Promise<EventExecution[]>

Dead Letters

  • listDeadLetters(): Promise<DeadLetter[]>
  • retryDeadLetter(id: string): Promise<EventInstance>

Bulk Scheduling (v2.1.0+)

  • scheduleEvents(inputs: ScheduleEventInput[], options?: BulkScheduleOptions): Promise<EventInstance[]>

Middleware

  • use(middleware: MiddlewareFunction | IMiddleware): void

High-ROI Features (v2.1.0+)

Priority + Concurrency Control

Control execution priority and limit concurrent executions per event type.

Create Event Types with Priority:

// High priority events execute first
const urgentEventType = await scheduler.createEventType({
  name: 'URGENT_NOTIFICATION',
  description: 'Critical user notifications',
  priority: 10,          // Higher = higher priority (default: 0)
  maxConcurrency: 5,     // Max 5 concurrent executions (default: null = unlimited)
});

// Low priority background jobs
const backgroundEventType = await scheduler.createEventType({
  name: 'BACKGROUND_SYNC',
  description: 'Background data sync',
  priority: 1,           // Lower priority
  maxConcurrency: 2,     // Limit to 2 concurrent executions
});

How it works:

  • Polling engine orders events by priority DESC, next_run_at ASC
  • Higher priority events are picked first
  • maxConcurrency prevents resource exhaustion:
    • null or undefined = unlimited (default)
    • > 0 = maximum concurrent executions
    • = 0 = event type disabled
  • Prevents starvation of low-priority events through fair scheduling
  • Distributed-safe: Uses database queries to track concurrency

Use Cases:

  • Critical user notifications (high priority)
  • Background cleanup jobs (low priority)
  • Rate-limited API calls (maxConcurrency = 1)
  • Resource-intensive tasks (maxConcurrency = 3)

Bulk Scheduling API

Schedule thousands of events efficiently in a single operation.

// Prepare batch of events
const events = users.map(user => ({
  eventTypeName: 'WELCOME_EMAIL',
  payload: { userId: user.id, email: user.email },
  scheduleType: ScheduleType.ONE_TIME,
  scheduledAt: new Date(Date.now() + 3600000), // 1 hour from now
  idempotencyKey: `welcome-${user.id}`,        // Prevent duplicates
}));

// FAIL_ALL mode: Transaction-safe, rolls back if any event fails
const scheduled = await scheduler.scheduleEvents(events, { mode: 'FAIL_ALL' });

// PARTIAL_SUCCESS mode: Continues on errors
const scheduled = await scheduler.scheduleEvents(events, { mode: 'PARTIAL_SUCCESS' });

Features:

  • Transaction Support: FAIL_ALL mode uses single transaction
  • Partial Success: PARTIAL_SUCCESS mode processes valid events
  • Validation: All inputs validated before processing
  • Idempotency: Detects duplicate idempotency keys in batch
  • Performance: Optimized for 1000+ events

Options:

  • mode: 'FAIL_ALL' (default): All-or-nothing transaction
  • mode: 'PARTIAL_SUCCESS': Returns successful events, logs failures

Use Cases:

  • Onboarding campaigns (bulk welcome emails)
  • Batch reporting jobs
  • Mass reminders/notifications
  • Data migration scheduling

Middleware System

Inject custom logic into the event execution pipeline.

Register Middleware:

// Logging middleware
scheduler.use(async (ctx, next) => {
  console.log(`[START] ${ctx.eventType} - ${ctx.eventId}`);
  const start = Date.now();
  
  await next(); // Call next middleware or handler
  
  const duration = Date.now() - start;
  console.log(`[END] ${ctx.eventType} - ${duration}ms`);
});

// Tracing middleware (e.g., OpenTelemetry)
scheduler.use(async (ctx, next) => {
  const span = tracer.startSpan(ctx.eventType, {
    attributes: {
      'event.id': ctx.eventId,
      'event.type': ctx.eventType,
      'event.attempt': ctx.attemptNumber,
      'event.correlation_id': ctx.correlationId,
    },
  });
  
  try {
    await next();
    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.recordException(error);
    span.setStatus({ code: SpanStatusCode.ERROR });
    throw error;
  } finally {
    span.end();
  }
});

// Authentication/authorization middleware
scheduler.use(async (ctx, next) => {
  if (ctx.payload.requiresAuth) {
    await validateAuthToken(ctx.payload.token);
  }
  await next();
});

Middleware Context:

interface MiddlewareContext {
  eventId: string;           // Unique event instance ID
  eventType: string;         // Event type name
  payload: Record<string, any>;  // Event payload
  attemptNumber: number;     // Current execution attempt
  correlationId: string;     // Correlation ID for tracing
  scheduledAt: Date;         // Original scheduled time
  startedAt: Date;           // Execution start time
}

Class-based Middleware:

class MetricsMiddleware implements IMiddleware {
  async use(ctx: MiddlewareContext, next: NextFunction) {
    metrics.increment('events.started', { type: ctx.eventType });
    
    try {
      await next();
      metrics.increment('events.succeeded', { type: ctx.eventType });
    } catch (error) {
      metrics.increment('events.failed', { type: ctx.eventType });
      throw error;
    }
  }
}

scheduler.use(new MetricsMiddleware());

Features:

  • Execution Order: Middleware executes in registration order
  • Pre/Post Processing: Code before next() runs first, code after runs last
  • Error Propagation: Errors bubble up through middleware chain
  • Async Support: Full async/await support
  • No Breaking Changes: Existing handlers work unchanged

Use Cases:

  • Distributed tracing (OpenTelemetry, Jaeger)
  • Performance monitoring (execution times)
  • Payload validation/transformation
  • Authentication/authorization
  • Custom logging and audit trails
  • Analytics and metrics collection

Production Features

Error Handling

The package provides a comprehensive error hierarchy for better error tracking:

import {
  SchedulerError,          // Base error class
  EventNotFoundError,      // Event not found in database
  HandlerNotFoundError,    // No handler registered for event type
  EventTypeNotFoundError,  // Event type doesn't exist
  InvalidEventStateError,  // Invalid state transition
  LockAcquisitionError,    // Failed to acquire event lock
  RetryExhaustedError,     // Max retries reached
} from 'nest-scheduler-engine';

// All errors extend SchedulerError with context
try {
  await schedulerService.getEvent('invalid-id');
} catch (error) {
  if (error instanceof EventNotFoundError) {
    console.log('Event not found:', error.eventId);
    console.log('Stack trace:', error.stack);
  }
}

Fault Tolerance

Circuit Breaker

Protects downstream systems from cascading failures:

import { CircuitBreaker } from 'nest-scheduler-engine';

const breaker = new CircuitBreaker({
  failureThreshold: 5,     // Open after 5 failures
  resetTimeout: 60000,     // Try again after 60s
});

const result = await breaker.execute(async () => {
  return await externalApiCall();
});

States:

  • CLOSED: Normal operation
  • OPEN: Too many failures, reject immediately
  • HALF_OPEN: Testing if service recovered

Advanced Retry Logic

import { retry, calculateRetryDelay } from 'nest-scheduler-engine';

// Retry with exponential backoff + jitter
const result = await retry(
  async () => await unstableOperation(),
  3,                    // Max 3 retries
  1000,                 // Base delay 1s
  'exponential',        // Backoff strategy
);

// Calculate retry delays
const delay = calculateRetryDelay(
  1000,                 // Base delay 1s
  2,                    // Retry count
  'exponential',        // Strategy
  true,                 // Add jitter
);
// Returns: ~4000ms ± 25% (exponential: 1s * 2^2 = 4s)

Features:

  • Exponential or linear backoff
  • Jitter (±25%) to prevent thundering herd
  • Capped at 5 minutes maximum delay
  • Automatic retryable error detection

Health Checks

Monitor component health for observability:

import { Injectable } from '@nestjs/common';
import { HealthCheckService } from 'nest-scheduler-engine';

@Injectable()
export class AppHealthService {
  constructor(private healthCheck: HealthCheckService) {}

  async check() {
    const status = await this.healthCheck.check();
    
    console.log('Overall healthy:', status.healthy);
    console.log('Database:', status.components.database);
    console.log('Queue:', status.components.queue);
    console.log('Poller:', status.components.poller);
    console.log('Dispatcher:', status.components.dispatcher);
    
    return status;
  }
}

// Expose as HTTP endpoint
@Controller('health')
export class HealthController {
  @Get()
  async getHealth(@Res() res: Response) {
    const health = await healthCheckService.check();
    const status = health.healthy ? 200 : 503;
    return res.status(status).json(health);
  }
}

Health Status Schema:

{
  healthy: boolean;           // Overall health
  timestamp: Date;
  components: {
    database: 'HEALTHY' | 'UNHEALTHY';
    queue: 'HEALTHY' | 'UNHEALTHY';
    poller: 'RUNNING' | 'STOPPED';
    dispatcher: 'RUNNING' | 'STOPPED';
  };
}

Graceful Shutdown

Ensures in-flight events complete before process termination:

import { GracefulShutdownManager } from 'nest-scheduler-engine';

const shutdownManager = new GracefulShutdownManager();

// Register shutdown handlers (SIGTERM, SIGINT)
shutdownManager.registerHandlers();

// Add cleanup callbacks
shutdownManager.addCallback(async () => {
  await pollingEngine.stop();
  await dispatcher.stop();
  console.log('Scheduler stopped gracefully');
});

// Trigger shutdown programmatically
await shutdownManager.shutdown(5000); // 5s timeout

Process:

  1. Signal received (SIGTERM/SIGINT)
  2. Stop accepting new work
  3. Wait for in-flight events to complete
  4. Execute cleanup callbacks
  5. Exit process

Timeout Protection

Prevent handlers from blocking indefinitely:

import { withTimeout } from 'nest-scheduler-engine';

@EventHandler('LONG_RUNNING_TASK')
export class TaskHandler implements IEventHandler {
  async handle(payload: any) {
    // Auto-timeout after 10 seconds
    const result = await withTimeout(
      this.externalService.process(payload),
      10000,
      'Task execution timeout',
    );
    
    return { success: true, result };
  }
}

Contextual Logging

Track events throughout their lifecycle:

import { ContextualLogger } from 'nest-scheduler-engine';

const logger = new ContextualLogger('MyHandler');

// Logs include correlation ID automatically
logger.log('Processing event', {
  eventId: '123',
  eventType: 'EMAIL_SEND',
});

// Performance tracking
const tracker = logger.trackPerformance('email-send');
await sendEmail();
tracker.end(); // Logs duration automatically

Log Format:

{
  "timestamp": "2025-01-15T10:30:00.000Z",
  "level": "info",
  "context": "MyHandler",
  "correlationId": "abc-123-def",
  "message": "Processing event",
  "eventId": "123",
  "eventType": "EMAIL_SEND"
}

Deployment Modes

The package supports three deployment modes:

Combined Mode (Default)

Polls + dispatches + consumes in one process.

config: {
  workerMode: true, // default
}

Use case: Simple deployments, low to medium load

Poller Only

Only polls and dispatches to queue. Separate workers consume.

config: {
  pollerOnly: true,
}

Use case: Separate concerns, scale workers independently

Worker Only

Only consumes from queue. Separate poller dispatches.

config: {
  workerMode: true,
  pollerOnly: false,
}

Use case: Horizontal scaling, high throughput

See PRODUCTION.md for detailed deployment guide, scaling strategies, and production best practices.

Testing

Testing Your Handlers

The package provides mock adapters for testing:

import { Test } from '@nestjs/testing';
import { SchedulerModule } from 'nest-scheduler-engine';
import { MockDatabaseAdapter, MockQueueAdapter } from 'nest-scheduler-engine/test';

describe('EmailHandler', () => {
  let handler: EmailHandler;
  let mockDb: MockDatabaseAdapter;
  let mockQueue: MockQueueAdapter;

  beforeEach(async () => {
    mockDb = new MockDatabaseAdapter();
    mockQueue = new MockQueueAdapter();

    const module = await Test.createTestingModule({
      imports: [
        SchedulerModule.forRoot({
          database: { useValue: mockDb },
          queue: { useValue: mockQueue },
          config: {
            pollingIntervalMs: 1000,
            workerMode: false, // Don't start polling in tests
          },
        }),
      ],
      providers: [EmailHandler],
    }).compile();

    handler = module.get(EmailHandler);
  });

  it('should send email successfully', async () => {
    const payload = { to: '[email protected]', body: 'Hello' };
    const result = await handler.handle(payload, {
      eventId: '123',
      eventType: 'EMAIL_SEND',
      attemptNumber: 1,
      correlationId: 'test-123',
    });

    expect(result.success).toBe(true);
  });
});

Running Package Tests

# Run all tests
npm test

# Run with coverage
npm run test:cov

# Run unit tests only
npm run test:unit

# Watch mode
npm run test:watch

Coverage Requirements

The package maintains test coverage:

  • Statements: 40%+
  • Branches: 40%+
  • Functions: 40%+
  • Lines: 40%+

Test Structure

test/
├── setup.ts                    # Jest setup
├── mocks/
│   └── adapters.mock.ts       # Mock implementations
├── fixtures/
│   └── test-data.fixtures.ts  # Test data factories
├── unit/
│   ├── errors.spec.ts         # Error class tests
│   ├── circuit-breaker.spec.ts
│   └── retry-helper.spec.ts
├── integration/
│   ├── event-manager.spec.ts
│   └── dispatcher.spec.ts
└── e2e/
    └── full-lifecycle.spec.ts

RRULE Examples

// Every day at 10:00 AM
rrule: 'FREQ=DAILY;BYHOUR=10;BYMINUTE=0'

// Every Monday and Friday at 9:00 AM
rrule: 'FREQ=WEEKLY;BYDAY=MO,FR;BYHOUR=9;BYMINUTE=0'

// Last day of every month at 11:59 PM
rrule: 'FREQ=MONTHLY;BYMONTHDAY=-1;BYHOUR=23;BYMINUTE=59'

// Every 2 hours
rrule: 'FREQ=HOURLY;INTERVAL=2'

See RFC 5545 for full RRULE specification.

Database Schema

The package creates the following tables:

  • event_types - Event type definitions with retry policies
  • event_instances - Scheduled event instances
  • event_executions - Execution history for audit and debugging
  • dead_letters - Failed events that exhausted retries

All tables support an optional prefix via tablePrefix config option.

Persistence note:

  • All scheduler state (event types, event instances, execution history, dead letters) is persisted to your database via the IDatabaseAdapter you provide. The package does not create or manage database connections itself — consumers must supply an adapter implementation (see src/interfaces/database-adapter.interface.ts) that performs query and transaction operations against their chosen database client. This ensures scheduled jobs survive process restarts and can be safely scaled across multiple instances: locks, idempotency keys, and database transactions provide distributed-safety for scheduling operations.

Be sure to run the shipped migrations and set tablePrefix if you want to namespace tables in a shared schema.

What's New in v2.0.0

Version 2.0.0 brings production-grade features and comprehensive testing:

✨ New Features

  • Circuit Breaker Pattern - Protect downstream systems from cascading failures
  • Health Check System - Component-level health monitoring
  • Graceful Shutdown - Ensure in-flight events complete before exit
  • Enhanced Retry Logic - Exponential backoff with jitter and capping
  • Timeout Protection - Prevent handlers from blocking indefinitely
  • Contextual Logging - Correlation IDs and performance tracking
  • Custom Error Hierarchy - 8 specialized error classes for better debugging

🧪 Testing & Quality

  • Comprehensive Test Suite - 35+ unit tests with 96%+ coverage of critical utilities
  • Mock Adapters - Easy testing with provided mocks
  • Test Fixtures - Factory functions for common test data
  • Coverage Reporting - Integrated with Jest

📚 Documentation

  • PRODUCTION.md - Complete production deployment guide
  • EDGE_CASE_HANDLING.md - Comprehensive documentation of all edge cases handled by the scheduler
  • CHANGELOG.md - Detailed migration guide from v1.x
  • Enhanced README - Production features and best practices

For detailed information about how the scheduler handles edge cases including:

  • Time-related issues (DST, leap years, timezone)
  • RRULE edge cases (infinite loops, zero occurrences)
  • Concurrency and distributed system challenges
  • Security and data integrity protections
  • State transition validation

See EDGE_CASE_HANDLING.md

🔧 Breaking Changes

See CHANGELOG.md for migration guide from v1.x to v2.0.0.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                     NestJS Application                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │   Handler    │    │   Handler    │    │   Handler    │  │
│  │ @EventHandler│    │ @EventHandler│    │ @EventHandler│  │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘  │
│         │                   │                    │          │
│         └───────────────────┴────────────────────┘          │
│                             │                               │
│                    ┌────────▼────────┐                      │
│                    │  HandlerRegistry│                      │
│                    └────────┬────────┘                      │
│                             │                               │
│         ┌───────────────────┴───────────────────┐           │
│         │                                       │           │
│  ┌──────▼──────┐                       ┌───────▼──────┐    │
│  │PollingEngine│◄──────────────────────┤  Dispatcher  │    │
│  │(Database)   │                       │   (Queue)    │    │
│  └──────┬──────┘                       └───────┬──────┘    │
│         │                                      │            │
└─────────┼──────────────────────────────────────┼────────────┘
          │                                      │
          │                                      │
   ┌──────▼──────┐                        ┌──────▼──────┐
   │  PostgreSQL │                        │MessageQueue │
   │  (Adapter)  │                        │  (Adapter)  │
   └─────────────┘                        └─────────────┘

Flow:

  1. PollingEngine polls database for due events
  2. Events are locked to prevent duplicate processing
  3. Events are dispatched to message queue
  4. Dispatcher consumes queue and invokes registered handlers
  5. Results are recorded in execution history
  6. Failed events are retried with backoff
  7. Exhausted events move to dead-letter queue

Performance Characteristics

  • Polling Overhead: ~10-50ms per poll cycle (depends on batch size)
  • Lock Contention: Database-level row locks, scales to 100+ instances
  • Throughput: 1000+ events/second with proper queue and DB tuning
  • Latency: 1-5 seconds typical (polling interval + queue latency)

For time-critical events (< 1s latency), consider reducing pollingIntervalMs to 1000ms or using a push-based system.

FAQ

How does it handle clock skew in distributed systems?

The scheduler uses database timestamps (NOW()) for all time comparisons, ensuring consistency across all instances regardless of application server clock skew.

What happens if a handler takes longer than lockDurationMs?

The event will be unlocked and potentially picked up by another instance. Set lockDurationMs to your P99 handler execution time + buffer. Use the withTimeout utility to enforce handler timeouts.

Can I use this without a message queue?

Yes! The package supports in-memory queue adapters for development and testing. However, message queues are recommended for production to ensure durability and horizontal scaling.

How do I pause all events of a specific type?

await schedulerService.deleteEventType('EVENT_TYPE_NAME');
// All instances of this type will stop being scheduled

How do I view failed events?

const deadLetters = await schedulerService.listDeadLetters();
deadLetters.forEach(dl => {
  console.log(`Event ${dl.eventInstanceId} failed: ${dl.lastError}`);
});

Can I manually trigger an event execution?

The package focuses on time-based scheduling. For manual/immediate execution, consider using a separate job queue (BullMQ, etc.) or invoke your handler directly.

Troubleshooting

Events not executing

  1. Check health status: healthCheckService.check()
  2. Verify poller is running: config.workerMode should be true
  3. Check for stuck locks:
    SELECT * FROM scheduler_event_instances 
    WHERE status = 'EXECUTING' AND locked_until < NOW();
  4. Review logs for errors

High dead-letter rate

  1. Check handler implementation for bugs
  2. Verify external dependencies are available
  3. Review retry policy (may be too aggressive)
  4. Check handler timeout settings

Poor performance

  1. Increase batchSize for higher throughput
  2. Add database indexes (included in migrations)
  3. Scale workers horizontally
  4. Optimize handler execution time
  5. Consider database connection pooling

See PRODUCTION.md for detailed troubleshooting guide.

Roadmap

  • [ ] Support for MySQL/MariaDB
  • [ ] Built-in observability with OpenTelemetry
  • [ ] Event scheduling UI/Dashboard
  • [ ] Distributed tracing integration
  • [ ] Advanced scheduling (cron, relative schedules)
  • [ ] Event dependencies (wait for parent event)

License

MIT

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for details.

Support

For issues and feature requests, please use the GitHub issue tracker.