@novahelm/jobs
v2026.6.1
Published
NovaHelm jobs — BullMQ + Redis task queue with dead-letter.
Maintainers
Readme
@novahelm/jobs
Background job system for NovaHelm -- built on BullMQ + Redis. Provides type-safe queue definitions, 35+ built-in workers, a worker host factory, dead letter queue support, and job metrics.
Quick Start
pnpm add @novahelm/jobsEnqueue a Job
import { enqueue } from "@novahelm/jobs";
// Type-safe: payload is validated against QueuePayloads["email"]
await enqueue.email({
template: "welcome",
to: "[email protected]",
data: { name: "Alice" },
});
await enqueue.media({
type: "process-image",
mediaId: "img_abc123",
});
// With options
await enqueue.billing(
{ type: "sync-role", userId: "user_123", planId: "pro" },
{ delay: 5000, priority: 1 },
);Start the Worker Host
import { createWorkerHost } from "@novahelm/jobs";
import Redis from "ioredis";
const host = createWorkerHost({
redis: { host: "localhost", port: 6379 },
});
host.start();
// Graceful shutdown
process.on("SIGTERM", async () => {
await host.stop();
});Queue Definitions
All queues are typed via the QueuePayloads interface. Every key is a queue name, and its value is the job payload type:
| Queue | Payload Type | Description |
|-------|-------------|-------------|
| email | { template, to, data? } | Send transactional/template emails |
| media | { type, mediaId, options? } | Image/video processing |
| billing | { type, userId, planId? } | Stripe sync, usage reset, trial expiry |
| push | { userId, title, body?, data? } | Web push notifications |
| discord | { title, body?, type, data? } | Discord webhook notifications |
| sms | { to, body, from? } | SMS messages |
| commerce | { type, cartId?, orderId? } | Order finalization, cart abandonment |
| inventory | { type, cartId?, variantId? } | Stock release, low-stock checks |
| ai | { type, mediaId?, sourceId? } | Auto-tag, embeddings, reindex |
| analytics | { event, userId?, data? } | Event tracking |
| data-sync | { sourceId, force? } | RSS/API data source sync |
| data-pipeline | { contentType, contentId, operations } | Content processing pipeline |
| campaign-send | { campaignId } | Bulk email campaign sends |
| digest | { frequency } | Daily/weekly digest emails |
| compute-persona | { userId, force? } | User persona recomputation |
| health-probe | {} | System health checks |
| perf-ingest | { events[] } | Performance metric ingestion |
| perf-aggregate | {} | Performance data aggregation |
| platform-backup | { action, projectSlug?, scope? } | Project backups & restore |
| platform-health | {} | Platform health monitoring |
| notification-route | { type, userId, title, body? } | Route notifications to channels |
| webhook | { webhookId, url, secret, ... } | Outbound webhook delivery |
| pipeline | { pipelineId, runId, projectId } | Pipeline execution |
Plus 10+ more queues for platform operations, builds, deploys, and module features.
Worker Host
createWorkerHost() creates and manages BullMQ workers for all queues:
import { createWorkerHost } from "@novahelm/jobs";
const host = createWorkerHost({
// Required: Redis connection
redis: { host: "localhost", port: 6379 },
// Register built-in workers (default: true)
coreWorkers: true,
// Add custom workers
customWorkers: [
{
queue: "my-custom-queue",
processor: async (job) => {
console.log("Processing:", job.data);
},
concurrency: 3,
},
],
// Module-contributed workers
moduleWorkers: collectModuleWorkers(),
// Platform mode: create workers for each project prefix
projectPrefixes: ["p:my-app:", "p:other-app:"],
});
host.start();WorkerHost Interface
interface WorkerHost {
workers: Worker[]; // All BullMQ Worker instances
start: () => void; // Start processing
stop: () => Promise<void>; // Graceful shutdown
registerProjectPrefix: (prefix: string) => void; // Add project at runtime
}Module Workers
Collect workers contributed by modules:
import { createWorkerHost, collectModuleWorkers } from "@novahelm/jobs";
const host = createWorkerHost({
redis: { host: "localhost", port: 6379 },
moduleWorkers: collectModuleWorkers(),
});Custom Workers
Define a worker with the WorkerDef interface:
import type { WorkerDef } from "@novahelm/jobs";
export const myWorkerDef: WorkerDef = {
queue: "my-queue", // must match a key in QueuePayloads
processor: async (job) => {
const { data } = job;
// Process the job...
},
concurrency: 2, // default: 1
};Job Options
Default job options applied to all enqueued jobs:
import { defaultJobOptions } from "@novahelm/jobs";
// {
// attempts: 3,
// backoff: { type: "exponential", delay: 60_000 },
// removeOnComplete: { count: 1000 },
// removeOnFail: { count: 5000 },
// }Override per-job:
await enqueue.email(payload, {
delay: 10_000, // delay 10 seconds
priority: 1, // higher priority
});Queue Access
Get the underlying BullMQ Queue instance for advanced operations:
import { getQueue, QUEUE_NAMES } from "@novahelm/jobs";
// Get a queue
const emailQueue = getQueue("email");
// Get counts
const counts = await emailQueue.getJobCounts("wait", "active", "failed");
// Drain or pause
await emailQueue.drain();
await emailQueue.pause();
// List all known queue names
console.log(QUEUE_NAMES); // ["email", "media", "billing", ...]Job Metrics
Track job completion rates and durations:
import { recordJobCompleted, recordJobFailed, readWorkerHealth } from "@novahelm/jobs";
// Read aggregated health stats from Redis
const stats = await readWorkerHealth(redis);
// => WorkerHealthStats with per-queue success/fail counts, avg durationRe-Authentication
For sensitive jobs that require role verification:
import { withReAuth, hasMinimumRole } from "@novahelm/jobs";
import type { ReAuthPayload, PlatformRole } from "@novahelm/jobs";
// Inside a worker processor
await withReAuth(job.data, async (verifiedUser) => {
// verifiedUser has been re-verified against the database
// Job fails if user no longer has required role
});Built-in Workers
All 35+ built-in workers are registered automatically when coreWorkers: true (default):
| Worker | Queue | Description |
|--------|-------|-------------|
| emailWorkerDef | email | Render React Email templates and send |
| mediaWorkerDef | media | Image resize/optimize with Sharp |
| billingWorkerDef | billing | Stripe role sync, usage tracking |
| pushWorkerDef | push | Web push via Expo Server SDK |
| discordWorkerDef | discord | Discord webhook delivery |
| commerceWorkerDef | commerce | Order finalization, cart abandonment |
| inventoryWorkerDef | inventory | Stock management |
| aiWorkerDef | ai | Auto-tagging, embedding generation |
| analyticsWorkerDef | analytics | Event processing |
| dataSyncWorkerDef | data-sync | RSS/API source synchronization |
| healthProbeWorkerDef | health-probe | System health checks |
| campaignSendWorkerDef | campaign-send | Bulk email campaigns |
| digestWorkerDef | digest | Daily/weekly digest compilation |
| platformBackupWorkerDef | platform-backup | Database/storage backups |
| webhookWorkerDef | webhook | Outbound webhook delivery |
API Reference
| Export | Description |
|--------|-------------|
| enqueue | Type-safe job enqueue proxy |
| getQueue(name, prefix?) | Get or create a BullMQ Queue |
| defaultJobOptions | Default retry/backoff/cleanup config |
| resetQueues() | Clear queue cache (testing only) |
| QUEUE_NAMES | Array of all known queue names |
| QueuePayloads | Type map of queue name to payload type |
| createWorkerHost(config) | Create and manage all workers |
| collectModuleWorkers() | Gather module-contributed workers |
| WorkerDef | Worker definition interface |
| WorkerHost | Worker host interface |
| recordJobCompleted(redis, queue, ms) | Record success metric |
| recordJobFailed(redis, queue) | Record failure metric |
| readWorkerHealth(redis) | Read aggregated metrics |
| withReAuth(payload, fn) | Re-verify user role for sensitive jobs |
| hasMinimumRole(role, required) | Role hierarchy check |
