@happyvertical/smrt-jobs
v0.36.0
Published
Background job processing for SMRT objects with persistence and scheduling
Downloads
2,790
Readme
@happyvertical/smrt-jobs
Background job execution for SMRT objects. Provides persistent queue storage, retry strategies, cron-based scheduling, and a fluent JobBuilder API via the withBackgroundJobs() mixin.
Installation
pnpm add @happyvertical/smrt-jobsUsage
Add background capabilities to a SmrtObject
import { withBackgroundJobs, TaskRunner } from '@happyvertical/smrt-jobs';
import { Document } from './document.js';
// Mixin adds .bg() and .background() to any SmrtObject class
const BackgroundDocument = withBackgroundJobs(Document);
const doc = new BackgroundDocument({ db });
await doc.initialize();
// Quick enqueue — runs immediately when a TaskRunner picks it up
const handle = await doc.bg('generateSummary', { format: 'md' });
// Fluent builder for advanced options
const handle2 = await doc.background('generateSummary', { format: 'md' })
.delay('5m')
.priority('high')
.retries(5)
.queue('analysis')
.timeout(600000)
.enqueue();
// Wait for result (polling-based)
const result = await handle2.wait({ timeout: 60000, pollInterval: 100 });Run a TaskRunner to process jobs
import { TaskRunner } from '@happyvertical/smrt-jobs';
const runner = new TaskRunner({
concurrency: 5,
pollInterval: 1000,
queues: ['default', 'analysis'],
});
await runner.initialize(db);
await runner.start();
// Listen for events
runner.on('job:completed', (job, result) => { /* ... */ });
runner.on('job:failed', (job, error) => { /* ... */ });
// Graceful shutdown
process.on('SIGTERM', () => runner.stop());Heartbeat-safe job execution
TaskRunner keeps jobs alive with a heartbeat timer. If your job blocks the
Node.js event loop for longer than the effective stale-job threshold, the runner
will recover that work as stale and mark it failed.
Common causes:
execSync,spawnSync, or other synchronous subprocess APIs- long CPU-bound loops in the job method itself
- large synchronous filesystem work
Prefer async subprocess APIs (spawn, execFile, streamed stdio) for long
exports/builds/uploads, or move CPU-heavy work into a separate process or
worker thread. If a job is intentionally long-running, tune
heartbeatInterval and staleJobThresholdMs together so the stale threshold
comfortably exceeds the longest gap between heartbeats.
ScheduleRunner uses the same stale-heartbeat recovery rules when reconciling
scheduled jobs.
Schedule recurring jobs with ScheduleRunner
The ScheduleRunner polls the _smrt_agent_schedules table for due cron entries and creates SmrtJob records for the TaskRunner to execute. Wire them together via events:
import { ScheduleRunner } from '@happyvertical/smrt-jobs';
const scheduleRunner = new ScheduleRunner({ pollInterval: 30000 });
await scheduleRunner.initialize(db);
await scheduleRunner.start();
// Connect TaskRunner events to update schedule state
taskRunner.on('job:completed', (job) => {
const scheduleId = job.args?._scheduleId;
if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, true);
});
taskRunner.on('job:failed', (job, error) => {
const scheduleId = job.args?._scheduleId;
if (scheduleId) scheduleRunner.handleJobCompletion(scheduleId, false, error.message);
});System Tables
| Table | Purpose |
|-------|---------|
| _smrt_jobs | Persistent job queue (SmrtJob records) |
| _smrt_agent_schedules | Cron schedule entries polled by ScheduleRunner |
API
Classes
| Export | Description |
|--------|------------|
| SmrtJob | Persistent job record stored in _smrt_jobs |
| SmrtJobCollection | Collection with claimReady(), listReady(), listByStatus(), stats(), cleanup() |
| JobBuilder | Fluent API: .delay(), .priority(), .retries(), .queue(), .timeout(), .enqueue() |
| JobHandle | Track, wait, cancel, or retry an enqueued job |
| JobContextLogger | Logger that auto-injects job context (jobId, attempt, queue) |
| TaskRunner | Polling-based execution engine with concurrency control and heartbeats |
| ScheduleRunner | Polls for due cron schedules and creates SmrtJob entries |
TaskRunner uses SmrtJobCollection.claimReady() so multiple workers can poll
the same queue without duplicate-claiming a pending row.
Functions
| Export | Description |
|--------|------------|
| createTaskRunner(config?) | Factory for creating a configured TaskRunner |
| createScheduleRunner(config?) | Factory for creating a configured ScheduleRunner |
| withBackgroundJobs(Class) | Mixin that adds .bg() and .background() to any SmrtObject class |
| parseDelay(delay) | Parse human-readable delay strings ('5m', '1h', '30s') to milliseconds |
| priorityToNumber(priority) | Convert priority label ('critical'/'high'/'normal'/'low') to number |
Key Types
Priority, JobStatus, JobResult, WaitOptions, BgOptions, BackgroundCapable, TaskRunnerConfig, TaskRunnerEvents, ScheduleRunnerConfig, ScheduleRunnerEvents, ScheduleInfo, JobContext, TimeoutBehavior, SmrtJobData, ListReadyOptions
Dependencies
@happyvertical/smrt-core-- ORM and code generation@happyvertical/smrt-config-- configuration loading@happyvertical/smrt-types-- shared type definitions@happyvertical/jobs-- retry strategies@happyvertical/sql-- database interface@happyvertical/logger-- structured logging@happyvertical/utils-- ID generation utilities- Peer (optional):
@happyvertical/smrt-svelte,svelte
