npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@fapoli/pg-scheduler

v1.0.3

Published

PostgreSQL-backed distributed task scheduler for Node.js

Readme

@fapoli/pg-scheduler

npm GitHub repo license

A PostgreSQL-backed distributed task scheduler for Node.js. Inspired by db-scheduler.

Features

  • Distributed-safe via FOR UPDATE SKIP LOCKED
  • One-time and recurring tasks
  • Heartbeat-based dead execution recovery
  • Configurable failure handlers with exponential backoff, fixed delay, or max retries
  • Graceful shutdown
  • No external dependencies beyond pg

Installation

npm install @fapoli/pg-scheduler

pg is a peer dependency — install it if you haven't already:

npm install pg

Database setup

Run this migration against your PostgreSQL database:

CREATE TABLE IF NOT EXISTS scheduled_tasks (
  task_name            TEXT        NOT NULL,
  task_instance        TEXT        NOT NULL,
  task_data            JSONB,
  execution_time       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  picked               BOOLEAN     NOT NULL DEFAULT FALSE,
  picked_by            TEXT,
  last_success         TIMESTAMPTZ,
  last_failure         TIMESTAMPTZ,
  consecutive_failures INT         NOT NULL DEFAULT 0,
  last_heartbeat       TIMESTAMPTZ,
  version              BIGINT      NOT NULL DEFAULT 1,
  priority             INT,
  PRIMARY KEY (task_name, task_instance)
);

CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_picker
  ON scheduled_tasks (priority DESC NULLS LAST, execution_time ASC)
  WHERE picked = FALSE;

Usage

Register tasks and start the worker

import { Pool } from 'pg';
import { oneTime, recurring, scheduleTask, startWorker } from '@fapoli/pg-scheduler';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

// Register a one-time task
oneTime({
  name: 'send-welcome-email',
  run: async (data) => {
    await sendEmail(data.userId);
  },
});

// Register a recurring task
recurring({
  name: 'cleanup-expired-sessions',
  intervalMs: 60 * 60 * 1000, // every hour
  run: async () => {
    await deleteExpiredSessions();
  },
});

// Schedule initial executions (ON CONFLICT DO NOTHING — safe to call on every boot)
await scheduleTask({ pool, taskName: 'send-welcome-email', taskInstance: 'user-123', taskData: { userId: 'user-123' } });
await scheduleTask({ pool, taskName: 'cleanup-expired-sessions', taskInstance: 'default', taskData: null });

// Start the worker
const worker = startWorker({ pool });

// Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.shutdown();
  process.exit(0);
});

One-time tasks

One-time tasks are deleted after successful execution.

oneTime({
  name: 'my-task',
  run: async (data) => { /* ... */ },
});

Recurring tasks

Recurring tasks reschedule themselves after each successful execution based on intervalMs.

recurring({
  name: 'my-recurring-task',
  intervalMs: 5 * 60 * 1000, // every 5 minutes
  run: async (data) => { /* ... */ },
});

Scheduling tasks

// Schedule immediately (or supply an executionTime)
await scheduleTask({
  pool,
  taskName: 'my-task',
  taskInstance: 'unique-instance-id',
  taskData: { foo: 'bar' }, // pass null if no data
  executionTime: new Date(Date.now() + 5000), // optional, defaults to now
  tableName: 'scheduled_tasks',              // optional, defaults to 'scheduled_tasks'
});

// Reschedule an existing task (throws if not found or currently running)
await rescheduleTask({
  pool,
  taskName: 'my-task',
  taskInstance: 'unique-instance-id',
  taskData: { foo: 'baz' },
  executionTime: new Date(Date.now() + 10000),
  tableName: 'scheduled_tasks',              // optional, defaults to 'scheduled_tasks'
});

Failure handlers

import { exponentialBackoff, fixedDelay, maxRetries } from '@fapoli/pg-scheduler';

recurring({
  name: 'my-task',
  intervalMs: 60000,
  run: async () => { /* ... */ },
  failureHandler: exponentialBackoff(1000, 5), // 1s initial delay, max 5 retries
});

// Other built-ins
fixedDelay(5000, 3)   // retry every 5s, max 3 times
maxRetries(3)          // retry immediately, max 3 times

Worker options

startWorker({
  pool,
  tableName: 'scheduled_tasks',     // default
  workerId: 'my-worker-1',          // default: hostname-pid
  pollingIntervalMs: 10000,         // default: 10s
  batchSize: 10,                    // default: 10
  heartbeatTimeoutMs: 300000,       // default: 5 minutes
  heartbeatIntervalMs: 100000,      // default: heartbeatTimeoutMs / 3
  logger: console,                  // default: console — inject your own
});

Custom logger

startWorker({
  pool,
  logger: {
    info: (msg) => myLogger.info(msg),
    warn: (msg) => myLogger.warn(msg),
    error: (msg, err) => myLogger.error(msg, err),
  },
});

License

MIT