npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

elephantmq

v1.0.3

Published

PostgreSQL-native job queue with a familiar producer/worker API

Downloads

48

Readme

elephantmq

npm version CI License: MIT Node

A PostgreSQL-native job queue for Node.js. Producers and workers talk to the same Postgres you already run; there is no separate broker, no in-memory state, and no second datastore to back up.

import { Queue, Worker } from 'elephantmq';
import { Pool } from 'pg';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

const queue = new Queue('emails', { connection: pool });

await queue.add('welcome', { userId: 42 });

new Worker('emails', async job => {
  await sendEmail(job.data.userId);
}, { connection: pool });

Why elephantmq

  • Transactional enqueue. await queue.inTransaction(async (q, sql) => { ... }) runs your block on one pinned connection so business writes and q.add(...) commit atomically — or roll back together. No outbox table, no two-phase commit.
  • One datastore. Backups, PITR, replication, monitoring, IAM — whatever you already do for Postgres now covers your queue.
  • Real SQL visibility. Want to know how many emails are stuck retrying? SELECT count(*) FROM emq_jobs WHERE state = 'delayed' AND queue_pk = .... No Redis-shaped abstractions in the way.
  • Familiar API. The producer/worker shape mirrors BullMQ on purpose, so existing patterns and tutorials still apply. If you're coming from BullMQ, see docs/MIGRATING_FROM_BULLMQ.md.

Install

npm install elephantmq pg

elephantmq targets:

  • Node.js 18+
  • PostgreSQL 14+

Quick start

1. Run migrations once

import { migrate } from 'elephantmq/migrate';
import { Pool } from 'pg';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
await migrate(pool, 'public');     // schema name
await pool.end();

The migration creates a small set of emq_* tables and stored functions in the schema you choose. By default each Queue/Worker will also lazily run migrate() on first connect; in production set skipMigrations: true and run the snippet above from a deploy job. See docs/OPERATIONS.md for the recommended setup.

2. Enqueue jobs

import { Queue } from 'elephantmq';
import { Pool } from 'pg';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const queue = new Queue('orders', { connection: pool });

// fire-and-forget
await queue.add('place', { orderId: 'ord_123' });

// delayed
await queue.add('reminder', { orderId: 'ord_123' }, { delay: 60_000 });

// priority (lower number = sooner)
await queue.add('rush', { orderId: 'ord_124' }, { priority: 1 });

// bulk, atomically
await queue.addBulk([
  { name: 'place', data: { orderId: 'ord_125' } },
  { name: 'place', data: { orderId: 'ord_126' } },
]);

3. Process jobs

import { Worker } from 'elephantmq';

const worker = new Worker('orders', async job => {
  await placeOrder(job.data.orderId);
  return { ok: true };
}, {
  connection: pool,
  concurrency: 8,
  limiter: { max: 100, duration: 1000 },   // 100 jobs/sec ceiling
});

worker.on('failed', (job, err) => {
  console.error('order failed', job?.id, err);
});

4. Transactional enqueue

The headline feature. Queue.inTransaction opens a transaction on a pinned PoolClient, hands it to your block, and commits when the block resolves (or rolls back if it throws):

await queue.inTransaction(async (q, sql) => {
  await sql.query(
    'UPDATE inventory SET reserved = reserved + 1 WHERE sku = $1',
    [sku],
  );

  await q.add('ship', { sku });
});

If the block throws, the inventory update and the job row are rolled back together. Workers never see a "ship" job for an order that didn't actually get reserved.

API surface

The default entry point exposes the producer/worker API:

| Class | Purpose | | ----- | ------- | | Queue | Enqueue jobs, manage a queue (pause, drain, obliterate). | | Worker | Pull and process jobs with concurrency and rate limiting. | | Job | Job row read/update API. | | JobScheduler | Cron / interval schedules. | | FlowProducer | Parent / child job graphs. | | QueueEvents | Subscribe to lifecycle events (waiting, active, completed, failed, ...). | | QueueEventsProducer | Emit custom events on a queue. |

Schema management lives behind a separate path (elephantmq/migrate) so apps that only enqueue jobs don't pull migration code into their bundle.

Re-exported error classes (DelayedError, RateLimitError, WaitingError, WaitingChildrenError, UnrecoverableError) let processors signal control flow back to the worker.

Operations

Trade-offs

| | | | - | - | | Throughput | Lower than Redis-backed queues for raw enqueue/claim. Most product workloads are bottlenecked on the handler, not the queue, so this rarely matters in practice. If you're sustainably pushing very high job rates, measure on your own hardware and Postgres tuning. | | Latency | Enqueue → claim is dominated by Postgres round-trip + pg_notify. Co-located DB: a couple of ms. WAN: more. | | Storage | Every job is a row. Set removeOnComplete / removeOnFail to keep the table bounded. | | Operational simplicity | One database to back up, restore, replicate, and monitor. No separate broker. |

Contributing

Bug reports, feature requests, and PRs are very welcome. Start with CONTRIBUTING.md. For security issues, follow SECURITY.md instead of opening a public issue.

License

MIT