@penkov/tasks_queue
v1.9.0
Published
A lightweight PostgreSQL-backed task queue system with scheduling, retries, backoff strategies, and priority handling. Designed for efficiency and observability in modern Node.js applications.
Downloads
1,008
Maintainers
Readme
@penkov/tasks_queue
@penkov/tasks_queue is a PostgreSQL-backed task queue for Node.js applications.
It supports:
- one-time and delayed tasks
- retries with constant, linear, or exponential backoff
- priority-based fetching
- periodic scheduling with fixed rate, fixed delay, and cron
- parent-child workflows
- stalled task detection with heartbeat support
- queue management APIs for operational tooling
The library is designed to work especially well with NestJS, but its runtime model is framework-agnostic.
Installation
npm install @penkov/tasks_queueInstall peer dependencies if they are not already present in your project:
npm install pg tslib application-metrics log4js scats cron-parserFor NestJS integration:
npm install @nestjs/common @nestjs/core @nestjs/swaggerApply the schema from migration.sql to your PostgreSQL database before starting workers.
Table of Contents
- Core Concepts
- Usage
- Integration with NestJS
- Scheduling Tasks
- Spawning Child Subtasks
- Stalled Tasks and Heartbeat
- Managing Tasks
- Operational Notes
- Additional Documentation
Core Concepts
Task lifecycle
Tasks move through these states:
pending: waiting to be fetched by a workerin_progress: currently owned by a worker attemptblocked: waiting for a child task to finishfinished: completed successfullyerror: failed terminally or timed out terminally
Payload vs result
The library distinguishes between two different kinds of data:
payload: task input and persisted runtime stateresult: final output produced by the worker
Use:
context.setPayload(...)to checkpoint workflow statecontext.submitResult(...)to persist final output
This distinction matters for parent-child workflows: parent tasks read child output from TaskStateSnapshot.result, not from child payload.
Failure behavior matters here:
- successful completion persists submitted
result - terminal failure may also persist the submitted
result - retryable failure clears submitted
resultbefore the next attempt starts
This keeps retried attempts clean while still allowing terminal failures to retain partial output for inspection.
Queues and pools
- A queue is a logical task type such as
send-emailorgenerate-preview. - A pool is a worker group with its own concurrency and polling interval.
- Each queue is registered in exactly one pool.
This lets you isolate slow or expensive workloads from the default traffic.
Timeouts and retries
Each task may define:
timeout: maximum allowed time for a single processing attemptretries: maximum attemptsbackoff: base delay before retrybackoffType:constant,linear, orexponential
If a task fails and still has attempts left, it is re-queued as pending. If not, it ends in error.
Usage
This section shows the runtime contract once your application has access to TasksPoolsService.
The typical flow is:
- register a worker for a queue
- start the pools service
- schedule tasks into that queue
Create a worker
import { Injectable, OnApplicationBootstrap } from "@nestjs/common";
import {
TaskContext,
TasksPoolsService,
TasksWorker,
} from "@penkov/tasks_queue";
@Injectable()
export class GeneratePreviewTaskWorker
extends TasksWorker
implements OnApplicationBootstrap
{
static readonly QUEUE_NAME = "generate-preview";
constructor(private readonly tasks: TasksPoolsService) {
super();
}
async onApplicationBootstrap() {
this.tasks.registerWorker(
GeneratePreviewTaskWorker.QUEUE_NAME,
this,
"preview",
);
}
override async process(payload: any, context: TaskContext): Promise<void> {
const imageId = Number(payload["imageId"]);
await context.ping();
const previewUrl = `https://cdn.example.com/previews/${imageId}.jpg`;
context.submitResult({ previewUrl });
}
}Schedule a task
await this.tasks.schedule({
queue: GeneratePreviewTaskWorker.QUEUE_NAME,
payload: { imageId: 42 },
});Worker hooks
Override these hooks when you need lifecycle callbacks around processing:
starting(taskId, payload)completed(taskId, payload)failed(taskId, payload, finalStatus, error)
Use them for logging, metrics, side effects, or alerting. The main business logic still belongs in process(...).
Important callback semantics covered by integration tests:
completed(...)runs only when the attempt really finishes successfullyfailed(...)receivesfinalStatus = pendingwhen the task will retryfailed(...)receivesfinalStatus = erroron terminal failure- if
process(...)returns after the timeout window has already elapsed,completed(...)is skipped andfailed(...)is invoked instead
Integration with NestJS
TasksQueueModule is the main integration entry point.
It:
- creates queue services
- starts workers on application bootstrap
- stops them on application shutdown
- exports
TasksPoolsServiceandManageTasksQueueService
Register the module
import pg from "pg";
import { Module } from "@nestjs/common";
import {
DEFAULT_POOL,
TasksQueueModule,
} from "@penkov/tasks_queue";
@Module({
imports: [
TasksQueueModule.forRootAsync({
inject: [pg.Pool],
useFactory: (db: pg.Pool) => ({
db,
runAuxiliaryWorker: true,
pools: [
{
name: DEFAULT_POOL,
loopInterval: 60_000,
concurrency: 2,
},
{
name: "preview",
loopInterval: 60_000,
concurrency: 5,
},
],
}),
}),
],
})
export class AppModule {}Register workers in bootstrap-aware providers
Each worker should register itself into a queue during bootstrap:
async onApplicationBootstrap() {
this.tasks.registerWorker(MyWorker.QUEUE_NAME, this, DEFAULT_POOL);
}Register workers with @Worker(...) decorator
As an alternative to explicit bootstrap registration, you can declare worker handlers directly on provider methods.
import { Injectable } from "@nestjs/common";
import { TaskContext, Worker } from "@penkov/tasks_queue";
@Injectable()
export class FinanceWorkers {
@Worker({ queue: "finance-payout" })
async processPayout(payload: any, context: TaskContext): Promise<void> {
await context.ping();
context.submitResult({ payoutId: payload["payoutId"] });
}
@Worker({ queue: "finance-documents", pool: "documents" })
async downloadDocument(payload: any, context: TaskContext): Promise<void> {
context.submitResult({ documentId: payload["documentId"] });
}
}How it works:
- methods with
@Worker(...)are discovered automatically during module init - each method is wrapped into an internal
TasksWorkeradapter - registration still goes through
TasksPoolsService.registerWorker(...)
Constraints:
- one queue can be registered only once
poolis optional and defaults todefault- decorated methods should follow
(payload, context) => Promise<void> - lifecycle hooks (
starting,completed,failed) are available only in class-basedTasksWorker
Register periodic schedules with @ScheduledTask(...) decorator
You can declare periodic workers with periodic task provisioning directly on provider methods.
import { Injectable } from "@nestjs/common";
import { ScheduledTask, TaskContext } from "@penkov/tasks_queue";
@Injectable()
export class BillingWorkers {
@ScheduledTask({
name: "billing-sync-cron",
queue: "billing-sync",
pool: "billing",
cron: "0 */5 * * * *",
replaceExisting: true,
payload: { source: "bootstrap" },
})
async syncBilling(payload: any, context: TaskContext): Promise<void> {
await context.ping();
context.submitResult({ ok: true, source: payload["source"] });
}
}Supported schedule forms:
cron:{ cron: "0 * * * * *" }- fixed rate:
{ fixedRate: 60_000 } - fixed delay:
{ fixedDelay: 60_000 }
Important notes:
@ScheduledTask(...)registers the decorated method as the queue worker and provisions periodic rowspoolis optional and defaults todefault- periodic names are deduplicated by
name - when
replaceExistingis omitted orfalse, name conflicts are ignored - when
replaceExistingistrue, pending periodic definitions with the same name are replaced @ScheduledTaskcannot be combined with@Workeron the same method
When to use multiple pools
Create multiple pools when different workloads need different execution characteristics:
- a default pool for short tasks
- a dedicated pool for CPU-heavy or IO-heavy jobs
- a low-concurrency pool for rate-limited integrations
Scheduling Tasks
The queue supports one-time, delayed, and periodic tasks.
One-time task
await tasks.schedule({
queue: "send-email",
payload: { emailId: 123 },
});Delayed task
await tasks.schedule({
queue: "send-email",
startAfter: new Date(Date.now() + 5 * 60_000),
payload: { emailId: 123 },
});Retry and timeout policy
import { BackoffType } from "@penkov/tasks_queue";
await tasks.schedule({
queue: "send-email",
payload: { emailId: 123 },
timeout: 10 * 60_000,
retries: 5,
backoff: 60_000,
backoffType: BackoffType.exponential,
priority: 100,
});Fixed-rate periodic task
Use fixed rate when the schedule should stay aligned to the configured cadence regardless of how long the previous run took.
import { MissedRunStrategy } from "@penkov/tasks_queue";
await tasks.scheduleAtFixedRate({
name: "refresh-cache",
queue: "refresh-cache",
period: 15 * 60_000,
missedRunStrategy: MissedRunStrategy.skip_missed,
payload: {},
});Fixed-delay periodic task
Use fixed delay when the next run should be scheduled after the current run finishes.
await tasks.scheduleAtFixedDelay({
name: "sync-provider",
queue: "sync-provider",
period: 10 * 60_000,
payload: {},
});Cron-based periodic task
Cron expressions currently run in UTC.
await tasks.scheduleAtCron({
name: "nightly-report",
queue: "nightly-report",
cronExpression: "0 0 * * *",
payload: {},
});Missed run strategy
Periodic tasks support two policies for downtime or missed windows:
skip_missed: run once and continue from the next valid schedulecatch_up: enqueue one run for each missed interval
Choose catch_up only when every missed execution is materially important.
Replace existing periodic schedule by name
Use replaceExisting: true when the same periodic name should be updated instead of ignored:
await tasks.scheduleAtFixedRate({
name: "refresh-cache",
queue: "refresh-cache",
period: 15 * 60_000,
replaceExisting: true,
payload: {},
});Conflict behavior:
- default (
replaceExistingomitted/false):on conflict (name) do nothing - replace mode (
replaceExisting=true): upsert schedule configuration for the samename - replace mode updates only pending periodic rows; non-pending conflicts are left unchanged
Periodic task timeout semantics
Periodic tasks do not always reschedule on completion.
If a periodic attempt overruns its timeout, the runtime uses normal timeout retry semantics first:
- the attempt is treated as failed
- retry
backoffcontrols the nextstartAfter - periodic rescheduling does not win over timeout retry
Only the currently owned periodic attempt may reschedule the row. A stale older attempt cannot shift the timer after a retry has already started.
Spawning Child Subtasks
Parent-child orchestration is a first-class feature, but the detailed model is large enough that the full guide lives in docs/multi-steps-tasks.md.
The short version:
- a parent task can request one child via
context.spawnChild(...) - the child is created only after parent
process(...)returns successfully - the parent then moves to
blocked - when the child reaches terminal
finishedor terminalerror, the parent wakes up again - only one active child is supported at a time
Simple child spawn
context.spawnChild({
queue: "encode-video-file",
payload: {
videoId: 42,
path: "/uploads/video.mp4",
},
});Allow parent workflow to continue after child failure
context.spawnChild({
queue: "read-video-metadata",
allowFailure: true,
payload: {
videoId: 42,
encodedPath: "/videos/42.mp4",
},
});Recommended abstractions
Use:
MultiStepTaskfor custom branching workflowsManagedWorkflowTaskfor full parent-side control over spawned tasks (dynamic count and order)SequentialTaskfor linear happy-path workflowsMultiStepPayloadas the required parent payload envelope
SequentialTask now also supports intermediate in-process steps that do not
spawn a child task. If a step returns without calling context.spawnChild(...),
the workflow automatically advances to the next configured step in the same
parent execution.
ManagedWorkflowTask is a policy wrapper on top of MultiStepTask for
orchestrators where parent code fully controls which child tasks are spawned
and in what order, while each run must explicitly do one of two things:
- schedule a child (
context.spawnChild(...)), or - finish with output (
context.submitResult(...)).
It also tracks technical parent-run counters in workflowPayload (runCount)
so business data remains isolated in userPayload (persisted on successful
parent transitions).
Detailed guide:
Stalled Tasks and Heartbeat
Long-running workers may need to refresh liveness while staying in in_progress.
Use context.ping() when a task:
- waits on slow external systems
- performs polling or streaming
- runs for a long time inside a single
process(...) - stays healthy without changing payload
Example
override async process(payload: any, context: TaskContext): Promise<void> {
await context.ping();
const job = await this.backend.startExport(payload["reportId"]);
while (!(await this.backend.isReady(job.id))) {
await new Promise((resolve) => setTimeout(resolve, 5000));
await context.ping();
}
context.submitResult({
reportId: payload["reportId"],
exportId: job.id,
});
}Important behavior:
- stalled detection uses the latest of
startedandlast_heartbeat - heartbeat writes are throttled to at most once per minute
- if
ping()happens after the timeout window already expired, the attempt times out - if
process(...)returns after the timeout window expired, that attempt also times out
Detailed guide:
Managing Tasks
ManageTasksQueueService is the operational API for admin panels, support tooling, and maintenance jobs.
It supports:
- fetch task by id
- list tasks with filters and pagination
- count or clear failed tasks
- update pending tasks
- update pending periodic schedules
- safely delete pending, finished, or failed tasks
Typical use cases
- inspect a stuck task
- build an internal queue admin UI
- reschedule a pending task
- edit retry and timeout settings before execution
- clean up failed tasks after triage
- restart selected failed tasks
- expose queue count and wait/work latency stats
Example
const task = await manageTasksQueueService.findById(taskId);
const page = await manageTasksQueueService.findByParameters({
status: TaskStatus.error,
queue: "send-email",
offset: 0,
limit: 50,
});
await manageTasksQueueService.updatePendingTask(taskId, {
startAfter: new Date(Date.now() + 60_000),
priority: 50,
timeout: 15 * 60_000,
payload: { emailId: 123, force: true },
retries: 5,
backoff: 60_000,
backoffType: BackoffType.linear,
});Deletion is intentionally conservative: active tasks are not deleted, and child tasks with unfinished ancestors are protected.
The management API also supports:
- restarting one failed task
- restarting all failed tasks in a selected queue
- queue-level counts grouped by status
- queue-level wait-time and work-time statistics
These methods are intended for admin tooling and operational dashboards.
Auxiliary Worker
When runAuxiliaryWorker is enabled, the auxiliary worker runs periodic maintenance and metrics sync in the background.
Its maintenance pass covers:
- stalled task processing
- resetting eligible failed tasks back to
pending - clearing expired finished tasks
Its metrics sync registers queue/status gauges using sanitized metric names derived from queue names.
Operational Notes
- Apply migration.sql before starting workers.
cronExpressioncurrently uses UTC evaluation.- Long-running tasks should set
timeoutcomfortably above the heartbeat persistence interval. payloadis runtime state, not final output.- retryable failure clears submitted
result; terminal failure may retain it. - Parent workflows should persist child-facing orchestration state through
MultiStepPayload. - Periodic tasks should not call
spawnChild(...)directly. - periodic timeout uses failure/retry semantics before normal periodic rescheduling.
- A queue must be registered in a pool before tasks can be processed immediately. Unregistered queues may still accumulate
pendingtasks.
Additional Documentation
- docs/multi-steps-tasks.md: detailed guide for
MultiStepTask,SequentialTask, child completion, failure handling, and payload shape - docs/managed-workflow-task.md: detailed guide for
ManagedWorkflowTask, full child-task control model, and orchestrator examples - docs/heartbeat.md: detailed guide for heartbeat behavior and stalled detection
- docs/nest-worker-decorator.md: declarative NestJS worker registration using
@Worker(...) - docs/nest-scheduled-task-decorator.md: declarative periodic task provisioning via
@ScheduledTask(...)
