@inharness-ai/sqlite-queue
v0.1.0
Published
Embeddable TypeScript job queue backed by a single SQLite file (WAL, better-sqlite3) with at-least-once delivery via leases.
Downloads
121
Readme
sqlite-queue
Embeddable TypeScript job queue backed by a single SQLite file (WAL, better-sqlite3) with at-least-once delivery via leases.
No Redis, no separate broker — queue, worker, scheduler, and observability all live in a single Node.js process (>= 20), persisted to one .db file.
Installation
npm install sqlite-queueRequires Node.js >= 20. Runtime dependencies: better-sqlite3 (native module — compiled at install time) and cron-parser.
Quick start
import { Queue } from 'sqlite-queue';
const queue = new Queue({ databasePath: './jobs.db' });
const worker = queue.createWorker({ queues: ['default'], concurrency: 4 });
worker.define<{ to: string }>('sendEmail', async (payload, ctx) => {
console.log(`sending mail to ${payload.to} (attempt ${ctx.attempt})`);
});
worker.start();
queue.enqueue('sendEmail', { to: '[email protected]' });
// Subscribe to events
queue.on('job:succeeded', ({ job }) => {
console.log(`job ${job.id} OK`);
});
// Shutdown
await worker.stop({ drain: true, timeoutMs: 30_000 });
queue.close();The ./jobs.db file is created and migrated automatically on first open.
API
new Queue(options)
| Option | Default | Description |
| --- | --- | --- |
| databasePath | required | Path to the SQLite file (:memory: works too). |
| tablePrefix | 'sq_' | Table-name prefix (recorded on first open — cannot be changed afterwards). |
| busyTimeoutMs | 5000 | SQLite PRAGMA busy_timeout. |
| logger | console | { debug, info, warn, error } object. |
| subscriberErrorPolicy | 'log' | 'log' or 'throw' — what to do when an event listener throws. |
| maxListeners | Infinity | Per-event listener limit. |
Jobs
queue.enqueue<T>(kind: string, payload: T, opts?: EnqueueOptions): Job
queue.getJob(id: number): Job | null
queue.listJobs(filter: JobFilter, page?: { offset?, limit? }): Job[]
queue.listJobRuns(jobId: number): JobRun[]
queue.retry(id: number, opts?: { resetAttempts?: boolean }): Job
queue.cancel(id: number): Job
queue.purge(filter: JobFilter): { deleted: number }EnqueueOptions:
| Field | Default | Description |
| --- | --- | --- |
| queue | 'default' | Queue name (auto-created on first use). |
| maxAttempts | 5 | After this many attempts the job is moved to dead. |
| backoff | exp. 1s → 60s, full jitter | { type: 'exponential', baseMs, maxMs, jitter: 'full' \| 'none' }. |
| timeoutMs | null | Hard handler timeout (abort + retry). |
| priority | 0 | Higher priority is picked first within the same queue. |
| idempotencyKey | null | Unique key — duplicates in pending/running/succeeded return the existing job; in a terminal state they throw IdempotencyConflictError. |
| runAt | now | Timestamp in ms — deferred execution. |
Job statuses: pending, running, succeeded, failed, dead, cancelled.
Worker
const worker = queue.createWorker({
queues: ['default'], // queues this worker serves
concurrency: 4, // parallel jobs per worker
leaseDurationMs: 30_000, // how long a lease is held
heartbeatIntervalMs: 10_000,
pollIntervalMs: 1_000,
recoverySweepIntervalMs: 30_000,
});
worker.define<TPayload, TResult>(kind, async (payload, ctx) => {
// ctx.signal: AbortSignal (timeout, drain, or cancel)
// ctx.job, ctx.attempt
}, /* hooks? */ { beforeRun, afterRun, onError });
worker.start();
await worker.stop({ drain: true, timeoutMs: 30_000 });Register handlers before calling start(). The worker recovers abandoned leases in the background (sweep), so a crashed process never blocks jobs indefinitely.
Schedules (cron / interval)
queue.schedule('nightly-report', 'generateReport', { type: 'daily' }, {
cron: '0 2 * * *',
tz: 'Europe/Warsaw',
catchUpPolicy: 'fire-once', // 'skip' | 'fire-once' | 'fire-all'
});
queue.schedule('heartbeat', 'ping', {}, { everyMs: 60_000 });
queue.startScheduler({ tickIntervalMs: 5000 });
// ...
await queue.stopScheduler();
queue.pauseSchedule('nightly-report');
queue.resumeSchedule('nightly-report');
queue.unschedule('nightly-report');
queue.listSchedules({ enabled: true });Queue operations
queue.pauseQueue('emails');
queue.resumeQueue('emails');
queue.setQueueConcurrency('emails', 8); // or null = unlimited
queue.getQueue('emails', { withStats: true });
queue.listQueues({ withStats: true });Events (typed event emitter)
const off = queue.on('job:failed', ({ job, run, willRetry }) => { /* ... */ });
off(); // unsubscribeAvailable events (see EventName in the type definitions): job:enqueued, job:running, job:succeeded, job:failed, job:retrying, job:dead, job:cancelled, job:lease-stolen, worker:started, worker:stopped, worker:heartbeat, worker:lease-recovered, worker:claim-failed, worker:handler-aborted, schedule:fired, schedule:skipped, schedule:disabled, schedule:recomputed, schedule:tick-failed, queue:auto-created, queue:paused, queue:resumed, queue:concurrency-changed.
Stats
queue.stats({ staleHeartbeatThresholdMs: 30_000 });
// {
// jobs: { pending, running, succeeded, failed, dead, cancelled },
// byQueue: { [queue]: { ... } },
// workers: { total, running, staleHeartbeat },
// }Guarantees and execution model
- At-least-once — a handler may be invoked again after a crash, timeout, or lease steal. Write idempotent handlers (or use
idempotencyKeyat enqueue time). - Single SQLite file + WAL — multiple processes can read and write the same file, but workers are expected to run on the same host (SQLite over a network filesystem is unsafe).
- Lease + heartbeat — a worker takes a job under a valid lease; if the worker dies, another one picks it up after the lease expires.
- Exponential backoff with full jitter — defaults to 1s → 60s.
- Schema migrations — applied automatically inside
BEGIN IMMEDIATEon open.
Errors
All errors extend SqliteQueueError and carry a stable code. Common ones: ConfigValidationError, IdempotencyConflictError, JobNotFoundError, InvalidStateTransitionError, WorkerAlreadyStartedError, WorkerStoppedError, InvalidCronExpressionError, ScheduleConflictError, ScheduleNotFoundError, QueueClosedError, DatabaseLockedError, SchemaVersionMismatchError.
Development
npm install
npm test # vitest
npm run typecheck # tsc --noEmit
npm run build # tsup → dist/esm + dist/cjs + .d.tsLicense
MIT.
