npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@minnzen/sqliteq

v0.2.0

Published

Durable message queue backed by SQLite. Inspired by goqite.

Readme

sqliteq

Durable message queue over SQLite, in TypeScript.

Inspired by goqite. Zero dependencies.

Install

npm install @minnzen/sqliteq better-sqlite3

sqliteq has zero runtime dependencies. You provide the SQLite binding — better-sqlite3 is recommended. Bun users can use the built-in bun:sqlite instead.

Usage

import Database from 'better-sqlite3'
import { Queue } from '@minnzen/sqliteq'

const db = new Database('app.db')
const q = new Queue(db, 'emails')

q.send({ to: '[email protected]', subject: 'Hello' })

const msg = q.receive()
if (msg) {
  console.log(msg.body) // { to: '[email protected]', subject: 'Hello' }
  q.delete(msg.id, msg.received)
}

The table and indexes are created automatically on first use. WAL mode is enabled by default.

Features

  • Single sqliteq table holds all queues -- no migrations per queue
  • Atomic receive via UPDATE ... WHERE id = (SELECT ...) -- no advisory locks
  • Visibility timeout with automatic redelivery on failure
  • Priority queues (higher priority messages are received first)
  • Delayed messages
  • Batch send in a single transaction
  • Batch receive
  • Dead letter detection (messages exceeding max receive count)
  • Queue stats (ready / delayed / in-flight / dead)
  • Dead letter requeue and purge operations
  • Typed messages with generics (Queue<T>)
  • Bring your own SQLite driver -- tested with better-sqlite3 and bun:sqlite

API

new Queue<T>(db, name, options?)

Create or connect to a named queue. Schema is created if it does not exist.

| Option | Type | Default | Description | |---|---|---|---| | timeout | number | 5000 | Visibility timeout in ms | | maxReceive | number | 3 | Max receives before dead-lettering | | maxBodyBytes | number | 1048576 | Max body size in bytes after JSON serialization |

The db parameter accepts any object matching the Database interface (a subset of better-sqlite3 and bun:sqlite):

interface Database {
  prepare(sql: string): Statement
  exec(sql: string): void
  transaction<R>(fn: (...args: unknown[]) => R): (...args: unknown[]) => R
}

queue.send(body: T, options?): string

Send a message. Returns the message ID.

| Option | Type | Default | Description | |---|---|---|---| | delay | number | 0 | Delivery delay in ms | | priority | number | 0 | Higher values are received first |

queue.sendBatch(messages): string[]

Send multiple messages in one transaction. Each entry is { body: T, options?: SendOptions }. Returns an array of IDs.

queue.receive(): Message<T> | null

Claim the next available message. Returns null when the queue is empty or all messages are in-flight. The message becomes invisible to other consumers for timeout ms.

interface Message<T> {
  id: string
  body: T
  received: number  // receive count (1 on first delivery)
}

queue.receiveBatch(limit): Message<T>[]

Atomically claim up to limit available messages. Returns fewer than requested when the queue runs dry. Claimed messages follow the same visibility timeout and fencing semantics as receive().

queue.extend(id, received, delay): boolean

Extend a message's visibility timeout by delay ms. Returns false if the message was already redelivered to another consumer (stale handle).

queue.delete(id, received): boolean

Acknowledge and remove a message. Returns false on stale handle (safe no-op).

queue.size(): number

Total messages in the queue (all states).

queue.stats(): QueueStats

Get queue counts grouped by state:

interface QueueStats {
  total: number
  ready: number
  delayed: number
  inFlight: number
  dead: number
}

queue.purge(): number

Delete all messages. Returns the count removed.

queue.deadLetters(): Message<T>[]

Get messages that exceeded maxReceive. These will never be delivered again and should be inspected or moved.

queue.requeueDeadLetters(options?): string[]

Requeue all current dead letters as fresh messages and return their new IDs. Requeued messages preserve body and priority, optionally apply a new delay, and always get new IDs so stale handles from previous deliveries stay invalid.

| Option | Type | Default | Description | |---|---|---|---| | delay | number | 0 | Delay in ms before requeued messages become visible |

queue.purgeDeadLetters(): number

Delete all current dead letters. Returns the count removed.

new Processor<T>(queue, options)

Long-running consumer that polls, processes, and auto-deletes messages.

import { Queue, Processor } from '@minnzen/sqliteq'

const q = new Queue(db, 'jobs')
const p = new Processor(q, {
  handler(msg) {
    console.log('processing', msg.body)
    // return normally = auto-delete
    // throw = leave for retry after timeout
  },
  pollInterval: 200,
  concurrency: 4,
})

p.start()
// later:
await p.stop() // waits for in-flight handlers to finish

| Option | Type | Default | Description | |---|---|---|---| | handler | (msg) => void \| Promise<void> | required | Message handler | | pollInterval | number | 100 | Poll interval in ms | | concurrency | number | 1 | Max simultaneous handlers | | extendInterval | number | queue timeout | Auto-extend period; 0 to disable | | onError | (error, context) => void | console.error | Error callback |

The processor auto-extends visibility at 4/5 of the extend interval so long-running handlers don't lose their message. Note: auto-extend relies on setInterval, so handlers must yield the event loop (use async/await). CPU-bound synchronous handlers will block the timer and may cause unintended redelivery.

Benchmarks

On a MacBook Pro M-series, with better-sqlite3, file-backed database:

| Operation | Throughput | Latency | |---|---|---| | send + receive + delete | ~20,000 ops/sec | 49 us/op | | send only | ~31,000 ops/sec | -- | | receive + delete (100K row table, 10 queues) | ~18,000 ops/sec | 55 us/op | | sendBatch (100 msgs/transaction) | ~120,000 ops/sec | -- |

Run npx tsx bench/run.ts to reproduce.

Design

Messages live in a single sqliteq table shared across all queues. Each message has a timeout timestamp; receiving a message atomically bumps that timestamp forward, making the message invisible. If the consumer crashes or fails to delete the message, it becomes visible again after the timeout expires. After maxReceive deliveries, the message is dead-lettered.

This is the same model as SQS and goqite, adapted for synchronous SQLite.

Delivery guarantee is at-least-once. Handlers must be idempotent because a message can be delivered more than once if the previous consumer times out.

License

MIT