npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@prsm/queue

v3.4.0

Published

Redis-backed distributed task queue with grouped concurrency, retries, and rate limiting

Downloads

1,775

Readme

Redis-backed distributed task queue with grouped concurrency, retries, and rate limiting.

Installation

npm install @prsm/queue

Quick Start

import Queue from '@prsm/queue'

const queue = new Queue({
  concurrency: 2,
  maxRetries: 3
})

queue.process(async (payload) => {
  return await doWork(payload)
})

queue.on('complete', ({ task, result }) => {
  console.log('Done:', task.uuid, result)
})

queue.on('failed', ({ task, error }) => {
  console.log('Failed after retries:', task.uuid, error.message)
})

await queue.ready()
await queue.push({ userId: 123, action: 'sync' })

Options

const queue = new Queue({
  concurrency: 2,           // max concurrent tasks per instance
  globalConcurrency: 10,    // max concurrent tasks across all instances (Redis-backed)
  delay: '100ms',           // pause between tasks (string or ms)
  timeout: '30s',           // max task duration
  maxRetries: 3,            // attempts before failing

  groups: {
    concurrency: 1,         // max concurrent tasks per group
    delay: '50ms',
    timeout: '10s',
    maxRetries: 3
  },

  redisOptions: {
    host: 'localhost',
    port: 6379
  }
})

Concurrency

Three independent limits compose together. A task must pass all applicable gates before processing.

concurrency - per-instance limit. Controls how many tasks this server can process simultaneously. This is the number of worker loops created for the main queue, and also caps total active tasks (including grouped) on this instance via an in-memory semaphore. Default: 1.

globalConcurrency - cross-instance limit. Controls how many tasks can run across all servers sharing the same Redis. Uses a Redis-backed semaphore with automatic lease expiry for crash safety. If an instance crashes, its slots are reclaimed after 60 seconds. Default: 0 (disabled).

groups.concurrency - per-group limit. Controls how many tasks can run concurrently within a single group. Default: 1.

Examples

Protect local resources (CPU/memory bound):

const queue = new Queue({
  concurrency: 5,
  groups: { concurrency: 1 }
})

100 groups each with 1 task - only 5 run at a time on this server.

Protect an external API (shared rate across servers):

const queue = new Queue({
  concurrency: 10,
  globalConcurrency: 20,
  groups: { concurrency: 2 }
})

3 servers, each can handle 10 concurrent tasks, but only 20 total across all servers. Each group (tenant) gets up to 2 concurrent slots.

Process Handler

queue.process(async (payload, task) => {
  console.log('Task:', task.uuid, 'Attempt:', task.attempts)
  return await someWork(payload)
})

Throw an error to trigger retry. After maxRetries, the task fails permanently.

Grouped Queues

Isolated concurrency per key - perfect for per-tenant throttling. Pass { group } as the second argument to push or pushAndWait.

const queue = new Queue({
  concurrency: 5,
  groups: { concurrency: 1, delay: '50ms' }
})

queue.process(async (payload) => {
  return await callExternalAPI(payload)
})

await queue.ready()

await queue.push({ action: 'sync' }, { group: 'tenant-123' })
await queue.push({ action: 'sync' }, { group: 'tenant-456' })

Each tenant processes independently. One slow tenant won't block others. Total concurrent tasks across all tenants is capped by concurrency. When the group is conditional, just omit the option - no branching needed.

Groups are fully distributed - any instance can push to any group, and any instance with available concurrency will automatically discover and process tasks for that group. New groups are announced via Redis pub/sub, and existing groups are discovered at startup.

Push and Wait

Push a task and wait for its result. Works across instances - instance A can push a task that instance B processes, and the result comes back to instance A via Redis pub/sub.

const result = await queue.pushAndWait({ prompt: 'summarize this' })

With timeout and groups:

const result = await queue.pushAndWait(
  { prompt: 'summarize this' },
  { group: 'tenant-123', timeout: '30s' }
)

Throws if the task fails (after retries are exhausted) or if the timeout is reached. Retries are transparent - if the handler fails twice then succeeds on the third attempt, pushAndWait resolves with the successful result.

Events

queue.on('new', ({ task }) => {})
queue.on('complete', ({ task, result }) => {})
queue.on('retry', ({ task, error, attempt }) => {})
queue.on('failed', ({ task, error }) => {})
queue.on('drain', () => {})

Task Object

{
  uuid: string,
  payload: any,
  createdAt: number,
  group?: string,     // present when pushed with { group }
  attempts: number
}

Throttling Example

Throttle LLM calls to external providers per tenant:

const queue = new Queue({
  concurrency: 20,
  groups: { concurrency: 2, delay: '50ms' },
  maxRetries: 3
})

queue.process(async ({ prompt }) => {
  return await llm.complete(prompt)
})

app.post('/api/generate', async (req, res) => {
  const { tenantId, prompt } = req.body
  const taskId = await queue.push({ prompt }, { group: tenantId })
  res.json({ queued: true, taskId })
})

Each tenant gets up to 2 concurrent LLM calls with a 50ms pause between them. Total concurrent calls across all tenants is capped at 20, protecting your server and API budget from any single tenant overwhelming the system.

Fan-out with Groups

Use groups to fan out a single event to multiple independent handlers. Each group processes and retries independently.

const queue = new Queue({
  concurrency: 10,
  groups: { concurrency: 1 },
})

const handlers = {
  "email": {
    "user:created": (data) => sendWelcomeEmail(data.email),
    "user:deleted": (data) => sendGoodbyeEmail(data.email),
  },
  "workspace": {
    "user:created": (data) => createDefaultWorkspace(data.userId),
  },
  "slack": {
    "user:created": (data) => notifySlack(`new user: ${data.email}`),
  },
}

queue.process(async ({ event, data }, task) => {
  await handlers[task.group]?.[event]?.(data)
})

await queue.ready()

// emit to all groups
await Promise.all(
  Object.keys(handlers).map((group) =>
    queue.push({ event: "user:created", data: { userId: "u1", email: "[email protected]" } }, { group })
  )
)

The handlers object is both the registry and the routing logic. Adding a new subscriber is adding a key.

WebSocket Integration

Queue events are local-only - only the server that processes a task emits complete/failed. Use @prsm/realtime to push results to connected clients in real time.

import Queue from '@prsm/queue'
import { RealtimeServer } from '@prsm/realtime'

const realtime = new RealtimeServer({ redis: { host: 'localhost', port: 6379 } })
const queue = new Queue({ concurrency: 5, groups: { concurrency: 1 } })

queue.process(async (payload) => {
  return await generateReport(payload)
})

queue.on('complete', ({ task, result }) => {
  realtime.sendTo(task.payload.connectionId, 'job:complete', result)
})

queue.on('failed', ({ task, error }) => {
  realtime.sendTo(task.payload.connectionId, 'job:failed', { error: error.message })
})

realtime.exposeCommand('generate-report', async (ctx) => {
  const taskId = await queue.push({
    connectionId: ctx.connection.id,
    ...ctx.payload,
  }, { group: ctx.connection.id })
  return { queued: true, taskId }
})

await queue.ready()
await realtime.listen(8080)

Both queue and realtime use the same Redis instance. No key conflicts (queue:* vs rt:*).

Horizontal Scaling

Multiple servers can push to the same queue. Redis coordinates via atomic operations - no duplicate processing. Use globalConcurrency to enforce a hard limit across all instances.

Cleanup

await queue.close()

License

MIT