npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@goatlab/tasks-adapter-bullmq

v0.10.3

Published

BullMQ Tasks adapter for Goatlab tasks

Readme

@goatlab/tasks-adapter-bullmq

BullMQ adapter implementing the TaskConnector interface from @goatlab/tasks-core. The production queue backend for @goatlab/delphi-core. Supports single-job enqueue, bulk enqueue (addBulk), worker listen loops with per-queue concurrency, graceful shutdown, and multi-tenant key prefixing.

What it is

A thin, production-hardened BullMQ wrapper that:

  • Implements TaskConnector so the same calling code works with the GCP Tasks and Hatchet adapters
  • Exposes BullMQ-native features (getQueue, addJob, addBulk, getJob, getWorkers, getJobCounts) for callers that want raw access (e.g., IngestBuffer in @goatlab/delphi-core)
  • Shares one ioredis connection across all queues and workers to avoid connection storms
  • Prefixes all Redis keys with a tenant prefix when _tenantId is set

Install

pnpm add @goatlab/tasks-adapter-bullmq @goatlab/tasks-core bullmq ioredis

Requires Redis 6+.

Quick start

import { BullMQConnector } from '@goatlab/tasks-adapter-bullmq'

const connector = new BullMQConnector({
  connection: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT!, 10),
    maxRetriesPerRequest: null, // required by BullMQ workers
  },
})

// Enqueue a single job (TaskConnector interface)
await connector.queue({
  uniqueTaskName: 'send-email-user123',
  taskName: 'email',
  postUrl: '/noop',
  taskBody: { to: '[email protected]', subject: 'Hi' },
  handle: async () => {},
})

// Bulk enqueue (BullMQ-native, fastest path)
const queue = connector.getQueue('email')
await queue.addBulk([
  { name: 'email', data: { to: '[email protected]' }, opts: { jobId: 'unique-1' } },
  { name: 'email', data: { to: '[email protected]' }, opts: { jobId: 'unique-2' } },
])

// Start workers
const handle = await connector.listen({
  tasks: [
    { taskName: 'email',  handle: async (data) => { /* send */ }, concurrency: 20 },
    { taskName: 'render', handle: async (data) => { /* render */ }, concurrency: 5 },
  ],
})

// Graceful shutdown
await handle.stop()
await connector.close()

Key behaviour

  • Job IDs: pass your own jobId for deduplication. BullMQ will reject duplicates silently — perfect for idempotent producers.
  • onAfterQueue hook: optional callback fired after a job is successfully enqueued; used by @goatlab/delphi-core to log dispatches or update auxiliary state.
  • Default job options: removeOnComplete: true and removeOnFail: 100 by default — avoids Redis memory growth. Override per-queue or per-job via opts.
  • Per-queue concurrency: listen({ tasks: [{ taskName, concurrency }] }) creates one BullMQ Worker per task name. Concurrency is applied per worker instance.
  • Shared connection: a single ioredis connection is shared across all queues and workers, exposed via getSharedConnection() for advanced use.
  • Tenant prefixing: set connector.setTenantId('acme-corp') and all subsequent Redis keys become acme-corp:bull:<queue>:<state> — allowing safe multi-tenant use on shared Redis.

Why bulk matters

addBulk(N jobs) executes one Lua script against Redis for N jobs — typically ~0.5–2ms. Calling queue.add() N times is ~N Redis roundtrips. For high-throughput ingestion paths this is a 10–50× difference. @goatlab/delphi-core's IngestBuffer exploits this: HTTP handlers push triggers into an in-memory accumulator, which flushes via addBulk every 50ms (or 200 entries).

Keep bulk sizes ≤1000 — BullMQ's Lua script latency starts to dominate above that. See BullMQ issue #1670.

Worker semantics

  • listen() returns a handle with .stop() that drains in-flight jobs before resolving
  • Job handlers run inside BullMQ's own try/catch; throwing = job moves to failed (with retry if configured)
  • Job payloads are JSON-serialized to Redis; keep them small (<100KB) and put heavy data in Postgres if needed
  • prefetchCount defaults to 1; set to ~2× concurrency for small jobs to hide Redis BRPOP latency

Testing

pnpm test   # requires Docker (Redis testcontainer)

Key exports

| Export | Purpose | |---|---| | BullMQConnector | The main class | | BullMQConnector.prototype.queue | Single-job enqueue (TaskConnector interface) | | BullMQConnector.prototype.addJob | Single-job enqueue with custom BullMQ options | | BullMQConnector.prototype.getQueue | Returns a raw BullMQ Queue (use for addBulk, custom ops) | | BullMQConnector.prototype.listen | Start N workers | | BullMQConnector.prototype.close | Close queues + shared Redis connection |

License

MIT