@minnzen/sqliteq
v0.2.0
Published
Durable message queue backed by SQLite. Inspired by goqite.
Maintainers
Readme
sqliteq
Durable message queue over SQLite, in TypeScript.
Inspired by goqite. Zero dependencies.
Install
npm install @minnzen/sqliteq better-sqlite3sqliteq has zero runtime dependencies. You provide the SQLite binding —
better-sqlite3 is recommended.
Bun users can use the built-in bun:sqlite instead.
Usage
import Database from 'better-sqlite3'
import { Queue } from '@minnzen/sqliteq'
const db = new Database('app.db')
const q = new Queue(db, 'emails')
q.send({ to: '[email protected]', subject: 'Hello' })
const msg = q.receive()
if (msg) {
console.log(msg.body) // { to: '[email protected]', subject: 'Hello' }
q.delete(msg.id, msg.received)
}The table and indexes are created automatically on first use. WAL mode is enabled by default.
Features
- Single
sqliteqtable holds all queues -- no migrations per queue - Atomic receive via
UPDATE ... WHERE id = (SELECT ...)-- no advisory locks - Visibility timeout with automatic redelivery on failure
- Priority queues (higher priority messages are received first)
- Delayed messages
- Batch send in a single transaction
- Batch receive
- Dead letter detection (messages exceeding max receive count)
- Queue stats (ready / delayed / in-flight / dead)
- Dead letter requeue and purge operations
- Typed messages with generics (
Queue<T>) - Bring your own SQLite driver -- tested with better-sqlite3 and bun:sqlite
API
new Queue<T>(db, name, options?)
Create or connect to a named queue. Schema is created if it does not exist.
| Option | Type | Default | Description |
|---|---|---|---|
| timeout | number | 5000 | Visibility timeout in ms |
| maxReceive | number | 3 | Max receives before dead-lettering |
| maxBodyBytes | number | 1048576 | Max body size in bytes after JSON serialization |
The db parameter accepts any object matching the Database interface
(a subset of better-sqlite3 and bun:sqlite):
interface Database {
prepare(sql: string): Statement
exec(sql: string): void
transaction<R>(fn: (...args: unknown[]) => R): (...args: unknown[]) => R
}queue.send(body: T, options?): string
Send a message. Returns the message ID.
| Option | Type | Default | Description |
|---|---|---|---|
| delay | number | 0 | Delivery delay in ms |
| priority | number | 0 | Higher values are received first |
queue.sendBatch(messages): string[]
Send multiple messages in one transaction. Each entry is
{ body: T, options?: SendOptions }. Returns an array of IDs.
queue.receive(): Message<T> | null
Claim the next available message. Returns null when the queue is empty or
all messages are in-flight. The message becomes invisible to other consumers
for timeout ms.
interface Message<T> {
id: string
body: T
received: number // receive count (1 on first delivery)
}queue.receiveBatch(limit): Message<T>[]
Atomically claim up to limit available messages. Returns fewer than requested
when the queue runs dry. Claimed messages follow the same visibility timeout
and fencing semantics as receive().
queue.extend(id, received, delay): boolean
Extend a message's visibility timeout by delay ms. Returns false if the
message was already redelivered to another consumer (stale handle).
queue.delete(id, received): boolean
Acknowledge and remove a message. Returns false on stale handle (safe no-op).
queue.size(): number
Total messages in the queue (all states).
queue.stats(): QueueStats
Get queue counts grouped by state:
interface QueueStats {
total: number
ready: number
delayed: number
inFlight: number
dead: number
}queue.purge(): number
Delete all messages. Returns the count removed.
queue.deadLetters(): Message<T>[]
Get messages that exceeded maxReceive. These will never be delivered again
and should be inspected or moved.
queue.requeueDeadLetters(options?): string[]
Requeue all current dead letters as fresh messages and return their new IDs. Requeued messages preserve body and priority, optionally apply a new delay, and always get new IDs so stale handles from previous deliveries stay invalid.
| Option | Type | Default | Description |
|---|---|---|---|
| delay | number | 0 | Delay in ms before requeued messages become visible |
queue.purgeDeadLetters(): number
Delete all current dead letters. Returns the count removed.
new Processor<T>(queue, options)
Long-running consumer that polls, processes, and auto-deletes messages.
import { Queue, Processor } from '@minnzen/sqliteq'
const q = new Queue(db, 'jobs')
const p = new Processor(q, {
handler(msg) {
console.log('processing', msg.body)
// return normally = auto-delete
// throw = leave for retry after timeout
},
pollInterval: 200,
concurrency: 4,
})
p.start()
// later:
await p.stop() // waits for in-flight handlers to finish| Option | Type | Default | Description |
|---|---|---|---|
| handler | (msg) => void \| Promise<void> | required | Message handler |
| pollInterval | number | 100 | Poll interval in ms |
| concurrency | number | 1 | Max simultaneous handlers |
| extendInterval | number | queue timeout | Auto-extend period; 0 to disable |
| onError | (error, context) => void | console.error | Error callback |
The processor auto-extends visibility at 4/5 of the extend interval so
long-running handlers don't lose their message. Note: auto-extend relies
on setInterval, so handlers must yield the event loop (use async/await).
CPU-bound synchronous handlers will block the timer and may cause
unintended redelivery.
Benchmarks
On a MacBook Pro M-series, with better-sqlite3, file-backed database:
| Operation | Throughput | Latency | |---|---|---| | send + receive + delete | ~20,000 ops/sec | 49 us/op | | send only | ~31,000 ops/sec | -- | | receive + delete (100K row table, 10 queues) | ~18,000 ops/sec | 55 us/op | | sendBatch (100 msgs/transaction) | ~120,000 ops/sec | -- |
Run npx tsx bench/run.ts to reproduce.
Design
Messages live in a single sqliteq table shared across all queues.
Each message has a timeout timestamp; receiving a message atomically
bumps that timestamp forward, making the message invisible. If the consumer
crashes or fails to delete the message, it becomes visible again after the
timeout expires. After maxReceive deliveries, the message is dead-lettered.
This is the same model as SQS and goqite, adapted for synchronous SQLite.
Delivery guarantee is at-least-once. Handlers must be idempotent because a message can be delivered more than once if the previous consumer times out.
License
MIT
