pglite-queue
v0.1.0
Published
Zero-infrastructure background job queue for Node.js powered by PGlite (embedded Postgres)
Maintainers
Readme
pglite-queue
Zero-infrastructure background job queue for Node.js, powered by PGlite (embedded Postgres via WASM).
BullMQ-like DX. No Redis. No external database. Just npm install and go.
npm install pglite-queue @electric-sql/pgliteWhy?
Every existing job queue (BullMQ, bee-queue, agenda) requires you to run an external database. For many projects, that's unnecessary complexity. pglite-queue embeds a full Postgres instance directly in your Node.js process — zero infrastructure, zero Docker, zero config.
| Feature | pglite-queue | BullMQ | Agenda | |---|---|---|---| | External DB required | No | Redis | MongoDB | | Setup time | 0 | Minutes | Minutes | | Retry with backoff | Yes | Yes | Yes | | Cron jobs | Yes | Yes | Yes | | Priority queues | Yes | Yes | No | | Concurrency control | Yes | Yes | Yes | | Progress tracking | Yes | Yes | No | | TypeScript-first | Yes | Yes | No | | Bundle size | ~22 KB | ~150 KB | ~80 KB |
Quick Start
import { Queue } from 'pglite-queue'
const queue = new Queue()
// Define a handler
queue.define('send-email', async (job) => {
console.log(`Sending email to ${job.data.to}`)
// your logic here
return { sent: true }
})
// Start processing
await queue.start()
// Add a job
await queue.add('send-email', { to: '[email protected]', subject: 'Hello' })Features
Retry with Exponential Backoff
await queue.add('flaky-api-call', { url: '...' }, {
retry: 5, // retry up to 5 times (6 total attempts)
backoff: {
type: 'exponential', // or 'fixed'
baseDelay: 1000, // 1s, 2s, 4s, 8s, 16s...
maxDelay: 300000, // cap at 5 minutes
},
})Priority Queues
// Lower number = higher priority
await queue.add('critical', data, { priority: 1 })
await queue.add('normal', data, { priority: 5 })
await queue.add('low', data, { priority: 10 })Delayed Jobs
await queue.add('reminder', data, { delay: '30m' }) // run in 30 minutes
await queue.add('cleanup', data, { delay: '2h' }) // run in 2 hours
await queue.add('precise', data, { delay: 5000 }) // run in 5000msSupported units: s (seconds), m (minutes), h (hours), d (days).
Cron / Recurring Jobs
// Every day at 9:00 UTC
await queue.every('0 9 * * *', 'daily-report', { type: 'summary' })
// Every 15 minutes
await queue.every('*/15 * * * *', 'health-check', {})
// Weekdays at 6pm
await queue.every('0 18 * * 1-5', 'eod-sync', {})Standard 5-field cron format: minute hour day-of-month month day-of-week
Job Progress Tracking
queue.define('video-encode', async (job) => {
for (let i = 0; i <= 100; i += 10) {
await doWork()
await job.progress(i)
}
})
queue.on('progress', (job, pct) => {
console.log(`Job ${job.id}: ${pct}%`)
})Concurrency Control
const queue = new Queue({
concurrency: 5, // process up to 5 jobs in parallel
})Events
queue.on('active', (job) => console.log(`Started: ${job.id}`))
queue.on('completed', (job) => console.log(`Done: ${job.id}`))
queue.on('failed', (job, err) => console.log(`Failed: ${job.id} - ${err.message}`))
queue.on('retrying', (job, attempt) => console.log(`Retry #${attempt}: ${job.id}`))
queue.on('progress', (job, pct) => console.log(`${job.id}: ${pct}%`))
queue.on('drained', () => console.log('All jobs processed'))
queue.on('error', (err) => console.error('Queue error:', err))Persistent Storage
// In-memory (default) — data lost on restart
const queue = new Queue()
// Filesystem — survives restarts
const queue = new Queue({ dataDir: './my-queue-data' })
// Bring your own PGlite instance
import { PGlite } from '@electric-sql/pglite'
const db = new PGlite('./shared-db')
const queue = new Queue({ db })Graceful Shutdown
// Waits for active jobs to finish before stopping
await queue.stop()
// Or let the queue handle SIGINT/SIGTERM automatically
const queue = new Queue({ handleSignals: true })Job Management
// Get a specific job
const job = await queue.getJob(42)
// Query jobs
const failed = await queue.getJobs({ status: 'failed' })
const recent = await queue.getJobs({ task: 'send-email', limit: 10 })
// Remove a job
await queue.removeJob(42)
// Clean up old jobs
await queue.clean('completed') // remove all completed
await queue.clean('failed') // remove all failed
await queue.clean() // remove both
// Get counts
const counts = await queue.counts()
// { pending: 5, active: 2, completed: 100, failed: 3 }API Reference
new Queue(options?)
| Option | Type | Default | Description |
|---|---|---|---|
| dataDir | string | 'memory://' | PGlite data directory. Use a path for persistence. |
| db | PGlite | - | Existing PGlite instance to use |
| concurrency | number | 1 | Max parallel job processing |
| pollInterval | number | 5000 | Fallback polling interval (ms) |
| shutdownTimeout | number | 30000 | Max time to wait for jobs during shutdown (ms) |
| handleSignals | boolean | false | Auto-handle SIGINT/SIGTERM |
queue.define(task, handler, options?)
Register a handler for a task name.
queue.add(task, data, options?)
Add a job. Returns a Job object.
| Option | Type | Default | Description |
|---|---|---|---|
| retry | number | 0 | Number of retries (total attempts = retry + 1) |
| delay | number \| string | - | Delay before execution |
| priority | number | 0 | Lower = higher priority |
queue.every(cronExpr, task, data?, options?)
Register a recurring cron job.
queue.start() / queue.stop()
Start/stop the worker.
How It Works
- Uses PGlite (Postgres compiled to WASM) as an in-process database
- Jobs are stored in a Postgres table with proper indexes
- LISTEN/NOTIFY triggers instant job pickup on insert (no polling delay)
- FOR UPDATE SKIP LOCKED ensures safe concurrent processing
- Fallback polling catches delayed jobs and edge cases
- On crash recovery, stalled
activejobs are automatically reset topending
License
MIT
