npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@novahelm/jobs

v2026.6.1

Published

NovaHelm jobs — BullMQ + Redis task queue with dead-letter.

Readme

@novahelm/jobs

Background job system for NovaHelm -- built on BullMQ + Redis. Provides type-safe queue definitions, 35+ built-in workers, a worker host factory, dead letter queue support, and job metrics.


Quick Start

pnpm add @novahelm/jobs

Enqueue a Job

import { enqueue } from "@novahelm/jobs";

// Type-safe: payload is validated against QueuePayloads["email"]
await enqueue.email({
  template: "welcome",
  to: "[email protected]",
  data: { name: "Alice" },
});

await enqueue.media({
  type: "process-image",
  mediaId: "img_abc123",
});

// With options
await enqueue.billing(
  { type: "sync-role", userId: "user_123", planId: "pro" },
  { delay: 5000, priority: 1 },
);

Start the Worker Host

import { createWorkerHost } from "@novahelm/jobs";
import Redis from "ioredis";

const host = createWorkerHost({
  redis: { host: "localhost", port: 6379 },
});

host.start();

// Graceful shutdown
process.on("SIGTERM", async () => {
  await host.stop();
});

Queue Definitions

All queues are typed via the QueuePayloads interface. Every key is a queue name, and its value is the job payload type:

| Queue | Payload Type | Description | |-------|-------------|-------------| | email | { template, to, data? } | Send transactional/template emails | | media | { type, mediaId, options? } | Image/video processing | | billing | { type, userId, planId? } | Stripe sync, usage reset, trial expiry | | push | { userId, title, body?, data? } | Web push notifications | | discord | { title, body?, type, data? } | Discord webhook notifications | | sms | { to, body, from? } | SMS messages | | commerce | { type, cartId?, orderId? } | Order finalization, cart abandonment | | inventory | { type, cartId?, variantId? } | Stock release, low-stock checks | | ai | { type, mediaId?, sourceId? } | Auto-tag, embeddings, reindex | | analytics | { event, userId?, data? } | Event tracking | | data-sync | { sourceId, force? } | RSS/API data source sync | | data-pipeline | { contentType, contentId, operations } | Content processing pipeline | | campaign-send | { campaignId } | Bulk email campaign sends | | digest | { frequency } | Daily/weekly digest emails | | compute-persona | { userId, force? } | User persona recomputation | | health-probe | {} | System health checks | | perf-ingest | { events[] } | Performance metric ingestion | | perf-aggregate | {} | Performance data aggregation | | platform-backup | { action, projectSlug?, scope? } | Project backups & restore | | platform-health | {} | Platform health monitoring | | notification-route | { type, userId, title, body? } | Route notifications to channels | | webhook | { webhookId, url, secret, ... } | Outbound webhook delivery | | pipeline | { pipelineId, runId, projectId } | Pipeline execution |

Plus 10+ more queues for platform operations, builds, deploys, and module features.


Worker Host

createWorkerHost() creates and manages BullMQ workers for all queues:

import { createWorkerHost } from "@novahelm/jobs";

const host = createWorkerHost({
  // Required: Redis connection
  redis: { host: "localhost", port: 6379 },

  // Register built-in workers (default: true)
  coreWorkers: true,

  // Add custom workers
  customWorkers: [
    {
      queue: "my-custom-queue",
      processor: async (job) => {
        console.log("Processing:", job.data);
      },
      concurrency: 3,
    },
  ],

  // Module-contributed workers
  moduleWorkers: collectModuleWorkers(),

  // Platform mode: create workers for each project prefix
  projectPrefixes: ["p:my-app:", "p:other-app:"],
});

host.start();

WorkerHost Interface

interface WorkerHost {
  workers: Worker[];              // All BullMQ Worker instances
  start: () => void;              // Start processing
  stop: () => Promise<void>;      // Graceful shutdown
  registerProjectPrefix: (prefix: string) => void; // Add project at runtime
}

Module Workers

Collect workers contributed by modules:

import { createWorkerHost, collectModuleWorkers } from "@novahelm/jobs";

const host = createWorkerHost({
  redis: { host: "localhost", port: 6379 },
  moduleWorkers: collectModuleWorkers(),
});

Custom Workers

Define a worker with the WorkerDef interface:

import type { WorkerDef } from "@novahelm/jobs";

export const myWorkerDef: WorkerDef = {
  queue: "my-queue",             // must match a key in QueuePayloads
  processor: async (job) => {
    const { data } = job;
    // Process the job...
  },
  concurrency: 2,               // default: 1
};

Job Options

Default job options applied to all enqueued jobs:

import { defaultJobOptions } from "@novahelm/jobs";

// {
//   attempts: 3,
//   backoff: { type: "exponential", delay: 60_000 },
//   removeOnComplete: { count: 1000 },
//   removeOnFail: { count: 5000 },
// }

Override per-job:

await enqueue.email(payload, {
  delay: 10_000,      // delay 10 seconds
  priority: 1,        // higher priority
});

Queue Access

Get the underlying BullMQ Queue instance for advanced operations:

import { getQueue, QUEUE_NAMES } from "@novahelm/jobs";

// Get a queue
const emailQueue = getQueue("email");

// Get counts
const counts = await emailQueue.getJobCounts("wait", "active", "failed");

// Drain or pause
await emailQueue.drain();
await emailQueue.pause();

// List all known queue names
console.log(QUEUE_NAMES); // ["email", "media", "billing", ...]

Job Metrics

Track job completion rates and durations:

import { recordJobCompleted, recordJobFailed, readWorkerHealth } from "@novahelm/jobs";

// Read aggregated health stats from Redis
const stats = await readWorkerHealth(redis);
// => WorkerHealthStats with per-queue success/fail counts, avg duration

Re-Authentication

For sensitive jobs that require role verification:

import { withReAuth, hasMinimumRole } from "@novahelm/jobs";
import type { ReAuthPayload, PlatformRole } from "@novahelm/jobs";

// Inside a worker processor
await withReAuth(job.data, async (verifiedUser) => {
  // verifiedUser has been re-verified against the database
  // Job fails if user no longer has required role
});

Built-in Workers

All 35+ built-in workers are registered automatically when coreWorkers: true (default):

| Worker | Queue | Description | |--------|-------|-------------| | emailWorkerDef | email | Render React Email templates and send | | mediaWorkerDef | media | Image resize/optimize with Sharp | | billingWorkerDef | billing | Stripe role sync, usage tracking | | pushWorkerDef | push | Web push via Expo Server SDK | | discordWorkerDef | discord | Discord webhook delivery | | commerceWorkerDef | commerce | Order finalization, cart abandonment | | inventoryWorkerDef | inventory | Stock management | | aiWorkerDef | ai | Auto-tagging, embedding generation | | analyticsWorkerDef | analytics | Event processing | | dataSyncWorkerDef | data-sync | RSS/API source synchronization | | healthProbeWorkerDef | health-probe | System health checks | | campaignSendWorkerDef | campaign-send | Bulk email campaigns | | digestWorkerDef | digest | Daily/weekly digest compilation | | platformBackupWorkerDef | platform-backup | Database/storage backups | | webhookWorkerDef | webhook | Outbound webhook delivery |


API Reference

| Export | Description | |--------|-------------| | enqueue | Type-safe job enqueue proxy | | getQueue(name, prefix?) | Get or create a BullMQ Queue | | defaultJobOptions | Default retry/backoff/cleanup config | | resetQueues() | Clear queue cache (testing only) | | QUEUE_NAMES | Array of all known queue names | | QueuePayloads | Type map of queue name to payload type | | createWorkerHost(config) | Create and manage all workers | | collectModuleWorkers() | Gather module-contributed workers | | WorkerDef | Worker definition interface | | WorkerHost | Worker host interface | | recordJobCompleted(redis, queue, ms) | Record success metric | | recordJobFailed(redis, queue) | Record failure metric | | readWorkerHealth(redis) | Read aggregated metrics | | withReAuth(payload, fn) | Re-verify user role for sensitive jobs | | hasMinimumRole(role, required) | Role hierarchy check |