npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@penkov/tasks_queue

v1.9.0

Published

A lightweight PostgreSQL-backed task queue system with scheduling, retries, backoff strategies, and priority handling. Designed for efficiency and observability in modern Node.js applications.

Downloads

1,008

Readme

@penkov/tasks_queue

@penkov/tasks_queue is a PostgreSQL-backed task queue for Node.js applications.

It supports:

  • one-time and delayed tasks
  • retries with constant, linear, or exponential backoff
  • priority-based fetching
  • periodic scheduling with fixed rate, fixed delay, and cron
  • parent-child workflows
  • stalled task detection with heartbeat support
  • queue management APIs for operational tooling

The library is designed to work especially well with NestJS, but its runtime model is framework-agnostic.

Installation

npm install @penkov/tasks_queue

Install peer dependencies if they are not already present in your project:

npm install pg tslib application-metrics log4js scats cron-parser

For NestJS integration:

npm install @nestjs/common @nestjs/core @nestjs/swagger

Apply the schema from migration.sql to your PostgreSQL database before starting workers.

Table of Contents

Core Concepts

Task lifecycle

Tasks move through these states:

  • pending: waiting to be fetched by a worker
  • in_progress: currently owned by a worker attempt
  • blocked: waiting for a child task to finish
  • finished: completed successfully
  • error: failed terminally or timed out terminally

Payload vs result

The library distinguishes between two different kinds of data:

  • payload: task input and persisted runtime state
  • result: final output produced by the worker

Use:

  • context.setPayload(...) to checkpoint workflow state
  • context.submitResult(...) to persist final output

This distinction matters for parent-child workflows: parent tasks read child output from TaskStateSnapshot.result, not from child payload.

Failure behavior matters here:

  • successful completion persists submitted result
  • terminal failure may also persist the submitted result
  • retryable failure clears submitted result before the next attempt starts

This keeps retried attempts clean while still allowing terminal failures to retain partial output for inspection.

Queues and pools

  • A queue is a logical task type such as send-email or generate-preview.
  • A pool is a worker group with its own concurrency and polling interval.
  • Each queue is registered in exactly one pool.

This lets you isolate slow or expensive workloads from the default traffic.

Timeouts and retries

Each task may define:

  • timeout: maximum allowed time for a single processing attempt
  • retries: maximum attempts
  • backoff: base delay before retry
  • backoffType: constant, linear, or exponential

If a task fails and still has attempts left, it is re-queued as pending. If not, it ends in error.

Usage

This section shows the runtime contract once your application has access to TasksPoolsService.

The typical flow is:

  1. register a worker for a queue
  2. start the pools service
  3. schedule tasks into that queue

Create a worker

import { Injectable, OnApplicationBootstrap } from "@nestjs/common";
import {
  TaskContext,
  TasksPoolsService,
  TasksWorker,
} from "@penkov/tasks_queue";

@Injectable()
export class GeneratePreviewTaskWorker
  extends TasksWorker
  implements OnApplicationBootstrap
{
  static readonly QUEUE_NAME = "generate-preview";

  constructor(private readonly tasks: TasksPoolsService) {
    super();
  }

  async onApplicationBootstrap() {
    this.tasks.registerWorker(
      GeneratePreviewTaskWorker.QUEUE_NAME,
      this,
      "preview",
    );
  }

  override async process(payload: any, context: TaskContext): Promise<void> {
    const imageId = Number(payload["imageId"]);

    await context.ping();

    const previewUrl = `https://cdn.example.com/previews/${imageId}.jpg`;
    context.submitResult({ previewUrl });
  }
}

Schedule a task

await this.tasks.schedule({
  queue: GeneratePreviewTaskWorker.QUEUE_NAME,
  payload: { imageId: 42 },
});

Worker hooks

Override these hooks when you need lifecycle callbacks around processing:

  • starting(taskId, payload)
  • completed(taskId, payload)
  • failed(taskId, payload, finalStatus, error)

Use them for logging, metrics, side effects, or alerting. The main business logic still belongs in process(...).

Important callback semantics covered by integration tests:

  • completed(...) runs only when the attempt really finishes successfully
  • failed(...) receives finalStatus = pending when the task will retry
  • failed(...) receives finalStatus = error on terminal failure
  • if process(...) returns after the timeout window has already elapsed, completed(...) is skipped and failed(...) is invoked instead

Integration with NestJS

TasksQueueModule is the main integration entry point.

It:

  • creates queue services
  • starts workers on application bootstrap
  • stops them on application shutdown
  • exports TasksPoolsService and ManageTasksQueueService

Register the module

import pg from "pg";
import { Module } from "@nestjs/common";
import {
  DEFAULT_POOL,
  TasksQueueModule,
} from "@penkov/tasks_queue";

@Module({
  imports: [
    TasksQueueModule.forRootAsync({
      inject: [pg.Pool],
      useFactory: (db: pg.Pool) => ({
        db,
        runAuxiliaryWorker: true,
        pools: [
          {
            name: DEFAULT_POOL,
            loopInterval: 60_000,
            concurrency: 2,
          },
          {
            name: "preview",
            loopInterval: 60_000,
            concurrency: 5,
          },
        ],
      }),
    }),
  ],
})
export class AppModule {}

Register workers in bootstrap-aware providers

Each worker should register itself into a queue during bootstrap:

async onApplicationBootstrap() {
  this.tasks.registerWorker(MyWorker.QUEUE_NAME, this, DEFAULT_POOL);
}

Register workers with @Worker(...) decorator

As an alternative to explicit bootstrap registration, you can declare worker handlers directly on provider methods.

import { Injectable } from "@nestjs/common";
import { TaskContext, Worker } from "@penkov/tasks_queue";

@Injectable()
export class FinanceWorkers {
  @Worker({ queue: "finance-payout" })
  async processPayout(payload: any, context: TaskContext): Promise<void> {
    await context.ping();
    context.submitResult({ payoutId: payload["payoutId"] });
  }

  @Worker({ queue: "finance-documents", pool: "documents" })
  async downloadDocument(payload: any, context: TaskContext): Promise<void> {
    context.submitResult({ documentId: payload["documentId"] });
  }
}

How it works:

  • methods with @Worker(...) are discovered automatically during module init
  • each method is wrapped into an internal TasksWorker adapter
  • registration still goes through TasksPoolsService.registerWorker(...)

Constraints:

  • one queue can be registered only once
  • pool is optional and defaults to default
  • decorated methods should follow (payload, context) => Promise<void>
  • lifecycle hooks (starting, completed, failed) are available only in class-based TasksWorker

Register periodic schedules with @ScheduledTask(...) decorator

You can declare periodic workers with periodic task provisioning directly on provider methods.

import { Injectable } from "@nestjs/common";
import { ScheduledTask, TaskContext } from "@penkov/tasks_queue";

@Injectable()
export class BillingWorkers {
  @ScheduledTask({
    name: "billing-sync-cron",
    queue: "billing-sync",
    pool: "billing",
    cron: "0 */5 * * * *",
    replaceExisting: true,
    payload: { source: "bootstrap" },
  })
  async syncBilling(payload: any, context: TaskContext): Promise<void> {
    await context.ping();
    context.submitResult({ ok: true, source: payload["source"] });
  }
}

Supported schedule forms:

  • cron: { cron: "0 * * * * *" }
  • fixed rate: { fixedRate: 60_000 }
  • fixed delay: { fixedDelay: 60_000 }

Important notes:

  • @ScheduledTask(...) registers the decorated method as the queue worker and provisions periodic rows
  • pool is optional and defaults to default
  • periodic names are deduplicated by name
  • when replaceExisting is omitted or false, name conflicts are ignored
  • when replaceExisting is true, pending periodic definitions with the same name are replaced
  • @ScheduledTask cannot be combined with @Worker on the same method

When to use multiple pools

Create multiple pools when different workloads need different execution characteristics:

  • a default pool for short tasks
  • a dedicated pool for CPU-heavy or IO-heavy jobs
  • a low-concurrency pool for rate-limited integrations

Scheduling Tasks

The queue supports one-time, delayed, and periodic tasks.

One-time task

await tasks.schedule({
  queue: "send-email",
  payload: { emailId: 123 },
});

Delayed task

await tasks.schedule({
  queue: "send-email",
  startAfter: new Date(Date.now() + 5 * 60_000),
  payload: { emailId: 123 },
});

Retry and timeout policy

import { BackoffType } from "@penkov/tasks_queue";

await tasks.schedule({
  queue: "send-email",
  payload: { emailId: 123 },
  timeout: 10 * 60_000,
  retries: 5,
  backoff: 60_000,
  backoffType: BackoffType.exponential,
  priority: 100,
});

Fixed-rate periodic task

Use fixed rate when the schedule should stay aligned to the configured cadence regardless of how long the previous run took.

import { MissedRunStrategy } from "@penkov/tasks_queue";

await tasks.scheduleAtFixedRate({
  name: "refresh-cache",
  queue: "refresh-cache",
  period: 15 * 60_000,
  missedRunStrategy: MissedRunStrategy.skip_missed,
  payload: {},
});

Fixed-delay periodic task

Use fixed delay when the next run should be scheduled after the current run finishes.

await tasks.scheduleAtFixedDelay({
  name: "sync-provider",
  queue: "sync-provider",
  period: 10 * 60_000,
  payload: {},
});

Cron-based periodic task

Cron expressions currently run in UTC.

await tasks.scheduleAtCron({
  name: "nightly-report",
  queue: "nightly-report",
  cronExpression: "0 0 * * *",
  payload: {},
});

Missed run strategy

Periodic tasks support two policies for downtime or missed windows:

  • skip_missed: run once and continue from the next valid schedule
  • catch_up: enqueue one run for each missed interval

Choose catch_up only when every missed execution is materially important.

Replace existing periodic schedule by name

Use replaceExisting: true when the same periodic name should be updated instead of ignored:

await tasks.scheduleAtFixedRate({
  name: "refresh-cache",
  queue: "refresh-cache",
  period: 15 * 60_000,
  replaceExisting: true,
  payload: {},
});

Conflict behavior:

  • default (replaceExisting omitted/false): on conflict (name) do nothing
  • replace mode (replaceExisting=true): upsert schedule configuration for the same name
  • replace mode updates only pending periodic rows; non-pending conflicts are left unchanged

Periodic task timeout semantics

Periodic tasks do not always reschedule on completion.

If a periodic attempt overruns its timeout, the runtime uses normal timeout retry semantics first:

  • the attempt is treated as failed
  • retry backoff controls the next startAfter
  • periodic rescheduling does not win over timeout retry

Only the currently owned periodic attempt may reschedule the row. A stale older attempt cannot shift the timer after a retry has already started.

Spawning Child Subtasks

Parent-child orchestration is a first-class feature, but the detailed model is large enough that the full guide lives in docs/multi-steps-tasks.md.

The short version:

  • a parent task can request one child via context.spawnChild(...)
  • the child is created only after parent process(...) returns successfully
  • the parent then moves to blocked
  • when the child reaches terminal finished or terminal error, the parent wakes up again
  • only one active child is supported at a time

Simple child spawn

context.spawnChild({
  queue: "encode-video-file",
  payload: {
    videoId: 42,
    path: "/uploads/video.mp4",
  },
});

Allow parent workflow to continue after child failure

context.spawnChild({
  queue: "read-video-metadata",
  allowFailure: true,
  payload: {
    videoId: 42,
    encodedPath: "/videos/42.mp4",
  },
});

Recommended abstractions

Use:

  • MultiStepTask for custom branching workflows
  • ManagedWorkflowTask for full parent-side control over spawned tasks (dynamic count and order)
  • SequentialTask for linear happy-path workflows
  • MultiStepPayload as the required parent payload envelope

SequentialTask now also supports intermediate in-process steps that do not spawn a child task. If a step returns without calling context.spawnChild(...), the workflow automatically advances to the next configured step in the same parent execution.

ManagedWorkflowTask is a policy wrapper on top of MultiStepTask for orchestrators where parent code fully controls which child tasks are spawned and in what order, while each run must explicitly do one of two things:

  • schedule a child (context.spawnChild(...)), or
  • finish with output (context.submitResult(...)).

It also tracks technical parent-run counters in workflowPayload (runCount) so business data remains isolated in userPayload (persisted on successful parent transitions).

Detailed guide:

Stalled Tasks and Heartbeat

Long-running workers may need to refresh liveness while staying in in_progress.

Use context.ping() when a task:

  • waits on slow external systems
  • performs polling or streaming
  • runs for a long time inside a single process(...)
  • stays healthy without changing payload

Example

override async process(payload: any, context: TaskContext): Promise<void> {
  await context.ping();

  const job = await this.backend.startExport(payload["reportId"]);

  while (!(await this.backend.isReady(job.id))) {
    await new Promise((resolve) => setTimeout(resolve, 5000));
    await context.ping();
  }

  context.submitResult({
    reportId: payload["reportId"],
    exportId: job.id,
  });
}

Important behavior:

  • stalled detection uses the latest of started and last_heartbeat
  • heartbeat writes are throttled to at most once per minute
  • if ping() happens after the timeout window already expired, the attempt times out
  • if process(...) returns after the timeout window expired, that attempt also times out

Detailed guide:

Managing Tasks

ManageTasksQueueService is the operational API for admin panels, support tooling, and maintenance jobs.

It supports:

  • fetch task by id
  • list tasks with filters and pagination
  • count or clear failed tasks
  • update pending tasks
  • update pending periodic schedules
  • safely delete pending, finished, or failed tasks

Typical use cases

  • inspect a stuck task
  • build an internal queue admin UI
  • reschedule a pending task
  • edit retry and timeout settings before execution
  • clean up failed tasks after triage
  • restart selected failed tasks
  • expose queue count and wait/work latency stats

Example

const task = await manageTasksQueueService.findById(taskId);

const page = await manageTasksQueueService.findByParameters({
  status: TaskStatus.error,
  queue: "send-email",
  offset: 0,
  limit: 50,
});

await manageTasksQueueService.updatePendingTask(taskId, {
  startAfter: new Date(Date.now() + 60_000),
  priority: 50,
  timeout: 15 * 60_000,
  payload: { emailId: 123, force: true },
  retries: 5,
  backoff: 60_000,
  backoffType: BackoffType.linear,
});

Deletion is intentionally conservative: active tasks are not deleted, and child tasks with unfinished ancestors are protected.

The management API also supports:

  • restarting one failed task
  • restarting all failed tasks in a selected queue
  • queue-level counts grouped by status
  • queue-level wait-time and work-time statistics

These methods are intended for admin tooling and operational dashboards.

Auxiliary Worker

When runAuxiliaryWorker is enabled, the auxiliary worker runs periodic maintenance and metrics sync in the background.

Its maintenance pass covers:

  • stalled task processing
  • resetting eligible failed tasks back to pending
  • clearing expired finished tasks

Its metrics sync registers queue/status gauges using sanitized metric names derived from queue names.

Operational Notes

  • Apply migration.sql before starting workers.
  • cronExpression currently uses UTC evaluation.
  • Long-running tasks should set timeout comfortably above the heartbeat persistence interval.
  • payload is runtime state, not final output.
  • retryable failure clears submitted result; terminal failure may retain it.
  • Parent workflows should persist child-facing orchestration state through MultiStepPayload.
  • Periodic tasks should not call spawnChild(...) directly.
  • periodic timeout uses failure/retry semantics before normal periodic rescheduling.
  • A queue must be registered in a pool before tasks can be processed immediately. Unregistered queues may still accumulate pending tasks.

Additional Documentation