npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@amqp-contract/worker

v0.20.0

Published

Worker utilities for consuming messages using amqp-contract

Downloads

11,531

Readme

@amqp-contract/worker

Type-safe AMQP worker for consuming messages using amqp-contract with Future/Result error handling.

CI npm version npm downloads TypeScript License: MIT

📖 Full documentation →

Installation

pnpm add @amqp-contract/worker

Features

  • Type-safe message consumption — Handlers are fully typed based on your contract
  • Automatic validation — Messages are validated before reaching your handlers
  • Automatic retry with exponential backoff — Built-in retry mechanism using RabbitMQ TTL+DLX pattern
  • Prefetch configuration — Control message flow with per-consumer prefetch settings
  • Batch processing — Process multiple messages at once for better throughput
  • Automatic reconnection — Built-in connection management with failover support

Usage

Basic Usage

import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import type { Logger } from "@amqp-contract/core";
import { Future } from "@swan-io/boxed";
import { contract } from "./contract";

// Optional: Create a logger implementation
const logger: Logger = {
  debug: (message, context) => console.debug(message, context),
  info: (message, context) => console.info(message, context),
  warn: (message, context) => console.warn(message, context),
  error: (message, context) => console.error(message, context),
};

// Create worker from contract with handlers (automatically connects and starts consuming)
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) => {
      console.log("Processing order:", payload.orderId);

      // Your business logic here
      return Future.fromPromise(Promise.all([processPayment(payload), updateInventory(payload)]))
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Order processing failed", error));
    },
  },
  urls: ["amqp://localhost"],
  logger, // Optional: logs message consumption and errors
});

// Worker is already consuming messages

// Clean up when needed
// await worker.close();

Advanced Features

For advanced features like prefetch configuration, batch processing, and automatic retry with exponential backoff, see the Worker Usage Guide.

Retry with Exponential Backoff

Retry is configured at the queue level in your contract definition. Add retry to your queue definition:

import { defineQueue, defineExchange, defineContract } from "@amqp-contract/contract";

const dlx = defineExchange("orders-dlx", "topic", { durable: true });

// Configure retry at queue level
const orderQueue = defineQueue("order-processing", {
  deadLetter: { exchange: dlx },
  retry: {
    mode: "ttl-backoff",
    maxRetries: 3, // Retry up to 3 times (default: 3)
    initialDelayMs: 1000, // Start with 1 second delay (default: 1000)
    maxDelayMs: 30000, // Max 30 seconds between retries (default: 30000)
    backoffMultiplier: 2, // Double the delay each time (default: 2)
    jitter: true, // Add randomness to prevent thundering herd (default: true)
  },
});

Then use RetryableError in your handlers:

import { TypedAmqpWorker, RetryableError } from "@amqp-contract/worker";
import { Future } from "@swan-io/boxed";

const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: ({ payload }) =>
      // If this fails with RetryableError, message is automatically retried
      Future.fromPromise(processPayment(payload))
        .mapOk(() => undefined)
        .mapError((error) => new RetryableError("Payment failed", error)),
  },
  urls: ["amqp://localhost"],
});

The retry mechanism uses RabbitMQ's native TTL and Dead Letter Exchange pattern, so it doesn't block the consumer during retry delays. See the Error Handling and Retry section in the guide for complete details.

Defining Handlers Externally

You can define handlers outside of the worker creation using defineHandler and defineHandlers for better code organization. See the Worker API documentation for details.

Error Handling

Worker handlers return Future<Result<void, HandlerError>> for explicit error handling:

import { RetryableError, NonRetryableError } from "@amqp-contract/worker";
import { Future, Result } from "@swan-io/boxed";

handlers: {
  processOrder: ({ payload }) => {
    // Validation errors - non-retryable
    if (payload.amount <= 0) {
      return Future.value(Result.Error(new NonRetryableError("Invalid amount")));
    }

    // Transient errors - retryable
    return Future.fromPromise(process(payload))
      .mapOk(() => undefined)
      .mapError((error) => new RetryableError("Processing failed", error));
  },
}

Error Types:

Worker defines error classes:

  • TechnicalError - Runtime failures (parsing, processing)
  • MessageValidationError - Message fails schema validation
  • RetryableError - Signals that the error is transient and should be retried
  • NonRetryableError - Signals permanent failure, message goes to DLQ

API

For complete API documentation, see the Worker API Reference.

Documentation

📖 Read the full documentation →

License

MIT