hazo_jobs
v0.9.1
Published
Generic typed job queue persisted via hazo_connect (PG or SQLite). Submit, claim, heartbeat, complete, fail; ships an admin UI.
Readme
hazo_jobs
Generic typed job queue persisted via hazo_connect. Supports PostgreSQL (default) and SQLite, plus a PostgREST adapter for environments without a direct DB connection.
Install
npm install hazo_jobs hazo_connectQuick start (PostgreSQL)
import { createJobsClient } from "hazo_jobs/server";
import { createHazoConnect } from "hazo_connect/server";
const adapter = await createHazoConnect({ /* pg config */ });
const jobs = createJobsClient({ connect: { adapter } });
await jobs.submit({ type: "echo", payload: { hello: "world" } });Quick start (SQLite)
import Database from "better-sqlite3";
import { readFileSync } from "fs";
import { createJobsClient, applyDdl } from "hazo_jobs/server";
const db = new Database("./jobs.sqlite");
const adapter = {
raw: async (sql, values = []) => {
const stmt = db.prepare(sql);
return /^\s*(SELECT|WITH)/i.test(sql) || /RETURNING/i.test(sql)
? stmt.all(...values)
: (stmt.run(...values), []);
},
};
await applyDdl(adapter, readFileSync(require.resolve("hazo_jobs/ddl/sqlite.sql"), "utf8"));
const jobs = createJobsClient({ connect: { adapter }, dialect: "sqlite" });Schema
DDL ships in the package:
hazo_jobs/dist/ddl/postgres.sqlhazo_jobs/dist/ddl/sqlite.sql
Apply once per database. Use applyDdl(adapter, sql) from hazo_jobs/server to run the multi-statement DDL through your adapter. The table is named hazo_jobs by default; override with tablePrefix.
Upgrading an existing database
If your DB was created with an older schema, call migrateSchema before applyDdl. It adds any missing columns (using PRAGMA table_info for SQLite or ADD COLUMN IF NOT EXISTS for PostgreSQL) so new indexes can be created without errors.
import { migrateSchema, applyDdl } from "hazo_jobs/server";
import { readFileSync } from "fs";
const ddl = readFileSync(require.resolve("hazo_jobs/ddl/sqlite.sql"), "utf8");
// Run once at startup:
await migrateSchema(adapter, "sqlite"); // adds any missing columns
await applyDdl(adapter, ddl); // creates table + indexes (idempotent)Safe to call on a fresh database — migrateSchema is a no-op when the table does not yet exist.
API
createJobsClient({ connect, dialect?, tablePrefix?, logger? }) returns a JobsClient with: submit, get, list, cancel, retry, delete, watch, cleanup, and worker(opts).run(handler).
Admin UI
import { JobsAdminPanel } from "hazo_jobs/ui";
import "hazo_jobs/ui/styles.css";
<JobsAdminPanel fetchFn={fetch} basePath="/api/admin/jobs" />Pair with createRouteHandlers(jobs) from hazo_jobs/server to wire the routes the panel calls.
Dialect notes
dialect: "pg"(default) usesFOR UPDATE SKIP LOCKEDfor atomic claim.dialect: "sqlite"uses optimistic locking (read candidate → conditional update). Adequate for single-worker setups and functional testing; for high-concurrency production use PostgreSQL.
Worker pool (v0.4+)
For production workloads that need multiple workers, run a supervisor process that scales the worker pool over an HTTP control port.
// supervisor.ts
import { createWorkerSupervisor } from "hazo_jobs/server";
const sup = createWorkerSupervisor({
workerScript: "/path/to/worker.ts",
dataDir: "/var/lib/hazo_jobs",
host: "127.0.0.1", // or "0.0.0.0" if admin is on another host
port: 7777,
token: process.env.SUPERVISOR_TOKEN, // required for non-localhost binds
});
await sup.start();Each worker file uses createWorkerProcess:
// worker.ts
import { createWorkerProcess } from "hazo_jobs/server";
const proc = createWorkerProcess({
adapter,
dialect: "pg",
types: ["echo", "sleep"],
handlers: {
echo: async (job) => job.payload,
sleep: async (job) => { /* ... */ },
},
});
await proc.run();Wire the admin's createRouteHandlers with the supervisor URL:
const handlers = createRouteHandlers(jobs, {
supervisorUrl: process.env.SUPERVISOR_URL,
supervisorToken: process.env.SUPERVISOR_TOKEN,
});The admin panel gains a "Workers" tab that lets you set the pool size at runtime.
Scheduling (v0.6+ one-shot, v0.7+ recurring)
One-shot — submit with runAt
await jobs.submit({
type: "send-digest",
payload: { userId: 42 },
runAt: new Date(Date.now() + 60 * 60_000).toISOString(), // 1h from now
});The row is inserted with status='scheduled' + run_at=<iso>. A scheduler component inside the supervisor promotes it to pending when due — workers then claim it like any other job. Past/now/unset runAt keeps the existing immediate-pending behavior.
Enable the scheduler by passing scheduler to createWorkerSupervisor:
const sup = createWorkerSupervisor({
workerScript: "/path/to/worker.ts",
// ... pool options ...
scheduler: { adapter, dialect: "pg" }, // promotes due `scheduled` jobs + fires recurring schedules
});The "Scheduled" tab in the admin panel lists due-soonest first; cancel from there works on scheduled rows.
Recurring — cron schedules
await jobs.schedules.create({
name: "Daily digest",
cron: "0 9 * * *", // 09:00 UTC daily
type: "send-digest",
payload: { audience: "all" },
});The supervisor's scheduler INSERTs a child job each time the cron fires, advances next_run_at to the next future slot, and (by spec) never replays missed slots in burst. Each spawned job carries schedule_id so the admin shows the lineage.
Admin routes (all proxy through createRouteHandlers(jobs)):
GET /api/jobs/schedulesPOST /api/jobs/schedules— body{ name, cron, type, payload?, priority?, maxAttempts?, expiresInSec?, enabled? }GET /api/jobs/schedules/:idPATCH /api/jobs/schedules/:id— partial update; new cron recomputesnext_run_atDELETE /api/jobs/schedules/:idPOST /api/jobs/schedules/:id/fire— Run-now (manual fire, also advancesnext_run_at)
The "Schedules" tab in the admin panel renders these as a table with an enable/disable toggle, side-drawer editor, and Run-now button.
Recurring schedules require a raw DB adapter — PostgREST connections throw. The supervisor's
/healthreportsscheduler: { enabled, last_tick_at }so you can monitor liveness.
Per-job logs (v0.5+)
Every job handler runs inside a hazo_logs session keyed by job.id. The worker emits job.started / job.completed / job.failed lifecycle lines automatically; handlers can also use the injected logger:
import { createWorkerProcess } from "hazo_jobs/server";
createWorkerProcess({
adapter, dialect: "pg", types: ["send-email"],
handlers: {
"send-email": async (job, log) => {
log?.info("dispatching", { to: job.payload.to });
await sendEmail(job.payload);
log?.info("sent");
},
},
});The admin UI's job detail view shows a three-pane layout: Input (payload, status, attempts), Execution (live-tailing log lines with level chips and search), Output (result / error / duration). Live tail closes when the detail panel is closed or the job reaches a terminal state.
Two new routes power this:
GET /api/jobs/:id/logs?level=info&page=1— paginated read (proxiesreadLogs({ reference: [id], ... })).GET /api/jobs/:id/logs/stream— SSE live tail (proxiescreateLogStreamApiHandler({ reference: [id] })fromhazo_logs/ui/server).
Retry policy (0.9+)
createWorker({
// ...
retryPolicy: {
pdf_render: { strategy: 'exponential', baseMs: 1000, capMs: 60_000, jitter: true, timeoutMs: 30_000 },
'*': { strategy: 'exponential', baseMs: 500, capMs: 30_000, jitter: true },
},
});On handler throw or timeout, the worker sets run_at = now() + computeBackoffMs(policy, attempts+1) and keeps status='pending'. The row is re-claimed when due. Once attempts >= max_attempts, the row goes terminal failed.
Per-attempt history (0.9+)
hazo_jobs.attempts_history (JSONB) records every attempt:
[
{ "at": "2026-05-23T10:00:00Z", "attempt": 1, "status": "failed", "worker_id": "w1", "duration_ms": 12, "error": "Error: boom" },
{ "at": "2026-05-23T10:00:05Z", "attempt": 2, "status": "completed", "worker_id": "w1", "duration_ms": 8, "error": null }
]Session ID propagation (0.9+)
submit() automatically captures hazo_logs.getLogContext()?.sessionId. Override per-call with sessionId: or disable with sessionId: null. The worker re-enters that session before invoking the handler, so any logger.*() call inside the handler carries the same id as the request that submitted the job.
Bulk DLQ replay (0.9+)
POST /admin/jobs/bulk-retry
{ "type": "pdf_render", "limit": 100 }Returns { retried: N, newJobIds: [...] }. Each new row has retry_of pointing back to the original failed job.
Derived registry (0.9+)
GET /admin/jobs/types returns one row per type with pending, running, failed, completed, last_submitted counts. No registry table; aggregate query over hazo_jobs.
Admin UI — run_at countdown (0.9.1+)
The job list shows a live countdown below the PENDING badge for any job with run_at set (i.e. a job waiting out a backoff delay):
- Blue — "retries in 4m 30s" — retry is scheduled in the future
- Amber — "overdue" —
run_athas passed but no worker has claimed it yet (worker offline or busy)
No click-through required; the label refreshes on each list poll.
