npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@qified/rabbitmq

v0.8.0

Published

RabbitMQ message provider for qified

Readme

@qified/rabbitmq

RabbitMQ message and task provider for Qified.

This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, and dead-letter queues.

Table of Contents

Installation

pnpm add @qified/rabbitmq

Usage with Qified

Message Provider

import { createQified } from "@qified/rabbitmq";
import type { Message } from "qified";

const qified = createQified({ uri: "amqp://localhost:5672" });

await qified.subscribe("example-topic", {
        async handler(message: Message) {
                console.log(message);
        },
});

await qified.publish("example-topic", { id: "1", data: "Hello from RabbitMQ!" });

await qified.disconnect();

Task Provider

import { RabbitMqTaskProvider } from "@qified/rabbitmq";

const taskProvider = new RabbitMqTaskProvider({ uri: "amqp://localhost:5672" });

// Enqueue a task
await taskProvider.enqueue("my-queue", {
        data: { action: "send-email", to: "[email protected]" },
});

// Dequeue and process tasks
await taskProvider.dequeue("my-queue", {
        id: "email-handler",
        handler: async (task, ctx) => {
                console.log("Processing task:", task.data);

                // Access attempt metadata
                console.log(`Attempt ${ctx.metadata.attempt} of ${ctx.metadata.maxRetries}`);

                // Extend the deadline if needed
                await ctx.extend(10_000);

                // Acknowledge the task on success
                await ctx.ack();
        },
});

// Get queue statistics
const stats = await taskProvider.getQueueStats("my-queue");
console.log(stats); // { waiting, processing, deadLetter }

// Get dead-letter tasks for inspection
const deadLetters = await taskProvider.getDeadLetterTasks("my-queue");

// Clean up
await taskProvider.disconnect();

API

RabbitMqMessageProviderOptions

Configuration options for the RabbitMQ message provider.

defaultRabbitMqUri

Default RabbitMQ connection string ("amqp://localhost:5672").

RabbitMqMessageProvider

Implements the MessageProvider interface using RabbitMQ queues.

constructor(options?: RabbitMqMessageProviderOptions)

Creates a new provider.

Options:

  • uri: RabbitMQ connection URI (defaults to "amqp://localhost:5672").

publish(topic: string, message: Message)

Publishes a message to a topic.

subscribe(topic: string, handler: TopicHandler)

Subscribes a handler to a topic.

unsubscribe(topic: string, id?: string)

Unsubscribes a handler by id or all handlers for a topic.

disconnect()

Cancels all subscriptions and closes the underlying RabbitMQ connection.

createQified(options?: RabbitMqMessageProviderOptions)

Convenience factory that returns a Qified instance configured with RabbitMqMessageProvider.

RabbitMqTaskProviderOptions

Configuration options for the RabbitMQ task provider. Extends TaskProviderOptions.

  • uri?: RabbitMQ connection URI. Defaults to "amqp://localhost:5672".
  • id?: Unique identifier for this provider instance. Defaults to "@qified/rabbitmq-task".
  • timeout?: Default timeout in milliseconds for task processing. Defaults to 30000.
  • retries?: Default maximum retry attempts before a task is moved to the dead-letter queue. Defaults to 3.
  • reconnectTimeInSeconds?: Time in seconds to wait before reconnecting after connection loss. Set to 0 to disable. Defaults to 5.

RabbitMqTaskProvider

Implements the TaskProvider interface using RabbitMQ durable queues for reliable task processing. Extends Hookified for event emission. Features include:

  • Automatic retries with configurable max attempts
  • Task timeouts with automatic rejection on expiry
  • Dead-letter queue for failed tasks
  • Automatic reconnection on connection loss

constructor(options?: RabbitMqTaskProviderOptions)

Creates a new task provider instance.

connect()

Explicitly connects to RabbitMQ. Called automatically on first enqueue or dequeue if not called manually.

enqueue(queue: string, taskData: EnqueueTask)

Enqueues a task to the specified queue. Returns a Promise<string> with the generated task ID.

Task data options:

  • data: The task payload (any serializable value).
  • id?: Custom task ID. Auto-generated if omitted.
  • timeout?: Per-task timeout override in milliseconds.
  • maxRetries?: Per-task max retry override.
  • priority?: Task priority value.

dequeue(queue: string, handler: TaskHandler)

Registers a handler to process tasks from the specified queue. The handler receives a Task and a TaskContext.

TaskContext methods:

  • ack(): Acknowledge the task (removes it from the queue).
  • reject(requeue?: boolean): Reject the task. If requeue is true (default), re-enqueues for retry. After max retries, moves to dead-letter queue.
  • extend(ms: number): Extend the processing deadline by the given milliseconds.
  • metadata: Object with { attempt, maxRetries } for the current task.

unsubscribe(queue: string, id?: string)

Removes a handler by id, or all handlers for the queue if no id is provided.

disconnect(force?: boolean)

Disconnects from RabbitMQ and cleans up all consumers, timers, and in-memory state. If force is true, skips graceful channel close.

getDeadLetterTasks(queue: string)

Returns an array of tasks that have been moved to the dead-letter queue for the given queue.

getQueueStats(queue: string)

Returns statistics for the given queue:

{ waiting: number; processing: number; deadLetter: number }

clearQueue(queue: string)

Purges all tasks from the queue and its dead-letter queue, and clears all in-memory tracking state.

Contributing

Contributions are welcome! Please read the CONTRIBUTING.md and CODE_OF_CONDUCT.md for details on our process.

License

MIT © Jared Wray. See LICENSE for details.