npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@imj_media/fluffy-amqp

v1.0.0

Published

Librería TypeScript para mensajería orientada a eventos sobre RabbitMQ (AMQP): fachada FluffyEventBus, adaptador con reconexión, publicación/suscripción, reintentos, DLQ opcional, router por tópicos en proceso, sobres JSON validados con Zod e idempotencia

Readme

@imj_media/fluffy-amqp

Bus de eventos AMQP en TypeScript para RabbitMQ: reconexión automática, DLQ opcional, reintentos, sobres validados con Zod, enrutamiento in-process e idempotencia en subpaths (memory / redis / db).

| Recurso | Enlace | | -------------------------------------- | -------------------------------------------------------------------------------------------- | | Repositorio / incidencias / página | Bitbucket — desarrollo_imj/fluffy-amqp | | npm | npm install @imj_media/fluffy-amqp |


Índice

  1. Visión general
  2. Arquitectura
  3. Modelo de sobre de evento
  4. Publicación
  5. Consumo
  6. Idempotencia
  7. Serialización
  8. Concurrencia
  9. Registro (logging)
  10. Demo
  11. Exports del paquete
  12. Requisitos e instalación
  13. Buenas prácticas
  14. Desarrollo del paquete
  15. Documentación complementaria
  16. Seguridad, contribución y licencia

Visión general

@imj_media/fluffy-amqp sirve para publicar y consumir eventos de dominio sobre RabbitMQ con una API pequeña y tipada:

  • El núcleo se apoya en MessageBroker (conectar, publicar, suscribir, cerrar), sin acoplarse a tipos de RabbitMQ: puedes probar con mocks o cambiar de transporte más adelante.
  • FluffyAdapter implementa MessageBroker con amqplib y amqp-connection-manager (reconexión automática). Usa un canal de confirmación para publicar y otro para consumir; si pasas dlq: { exchange, queue, routingKey? }, el setup del canal de consumo declara el exchange de dead-letter y la cola DLQ.
  • FluffyEventBus es una fachada sobre un broker inyectado: start / stop (alias connect / close), restart() (parar → breve pausa → arrancar), publish, subscribe. Un idempotencyStore opcional en el bus se fusiona en subscribe si la suscripción no define otro almacén.
  • createFluffyEventBusFromConfig(broker, config) construye un FluffyEventBus a partir de FluffyEventBusConfig (logging, idempotencia por defecto y bloques informativos de reintento/concurrencia/RabbitMQ en la configuración).

Dependencias de ejecución relevantes: amqplib, amqp-connection-manager, zod, pino (para la factoría opcional createLogger). Si usas createLogger({ pretty: true }), instala pino-pretty en tu aplicación (no va empaquetada como dependencia dura de la librería).


Arquitectura

flowchart LR
  subgraph app [Tu aplicación]
    EB[FluffyEventBus]
  end
  subgraph contract [Contrato]
    MB[MessageBroker]
  end
  subgraph rabbit [Pila RabbitMQ]
    RA[FluffyAdapter]
    AMQP[(RabbitMQ)]
  end
  EB --> MB
  RA --> MB
  RA --> AMQP
  • Publicación: FluffyEventBus.publishFluffyAdapter.publish → publicación en canal (sobre JSON por defecto).
  • Consumo: FluffyEventBus.subscribe → assert de exchange/cola/enlace → consume → deserializar + idempotencia opcional → handler o EventRouter.dispatch → ACK / NACK → DLQ si está configurada.

Modelo de sobre de evento

  • createEventEnvelope(eventType, data, { source, correlationId?, context?, eventVersion?, eventId?, occurredAt?, metadata? }) construye un EventEnvelope: en meta van eventId, occurredAt, correlationId (UUID por defecto vía node:crypto), eventType, source, etc.
  • validateEventEnvelope(unknown) valida con Zod (esquemas compatibles con Zod 4): meta, context opcional, data.
  • Correlación: usa correlationId para enlazar flujos entre servicios. eventId es el identificador de negocio del mensaje y la clave habitual de idempotencia; PublishResult.messageId convencionalmente coincide con meta.eventId.

Publicación

PublishParams: exchange, routingKey, message (EventEnvelope), headers opcionales, serializer opcional solo para esa llamada.

El publicador no declara el exchange; asegúrate de que exista (a menudo lo crea el primer consumidor que ejecuta assertExchange, u operaciones).


Consumo

SubscribeParams incluye:

| Ámbito | Notas | | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | Enrutamiento | exchange, routingKey (binding; patrones topic como orders.#), queueName opcional, exchangeType (topic por defecto; también direct / fanout). | | Handler vs router | handler o router (no ambos como responsabilidades separadas de primer nivel; el router envuelve el despacho). | | Resiliencia | Atajo maxRetries o retryStrategy: maxAttempts, backoffMs(attempt), shouldRetry(error, attempt). | | DLQ | dlq por suscripción; hereda la DLQ global del adaptador si se omite. | | Rendimiento | prefetch (QoS AMQP), maxConcurrency (semáforo en proceso). | | Otros | idempotencyStore, serializer. |

createEventRouter(logger) — registro con register(pattern, handler). Orden de resolución: coincidencia exacta de routing keymejor patrón topic (*, #) → patrón igual a eventType.

Errores:

  • RetryableError — transitorio; la política de reintentos puede volver a intentar.
  • NonRetryableError — sin reintentos; el consumidor hace NACK sin reencolar → DLQ si está configurada.
  • Error genérico — suele reintentarse según la política hasta maxAttempts, luego NACK → DLQ.
  • Utilidades: isRetryableError, isNonRetryableError.

Idempotencia

Implementa IdempotencyStore: hasProcessed(eventId), markProcessed(eventId).

Subpaths tree-shakeables:

| Import | Uso | | ------------------------------------------- | ----------------------------------------------------------------------------------------------------- | | @imj_media/fluffy-amqp/idempotency/memory | MemoryIdempotencyStore — un solo proceso. | | @imj_media/fluffy-amqp/idempotency/redis | RedisIdempotencyStore — cliente con get/set, prefijo de clave y TTL opcionales. | | @imj_media/fluffy-amqp/idempotency/db | DatabaseIdempotencyStore — SQL/ORM mediante callbacks hasProcessed / markProcessed. |


Serialización

Por defecto: JSON UTF-8 + validación Zod al deserializar — JsonEventSerializer / createJsonEventSerializer. La interfaz EventSerializer permite otros formatos en cable (p. ej. Avro/Protobuf) manteniendo EventEnvelope como contrato de negocio.


Concurrencia

createConcurrencyLimiter(maxConcurrency) devuelve un ConcurrencyController (semáforo FIFO). Combínalo con prefetch AMQP: el prefetch limita mensajes sin ACK en el canal; maxConcurrency limita ejecuciones paralelas del manejador en el proceso.


Registro (logging)

Contrato Logger: (message: string, meta?: Record<string, unknown>). createLogger({ name?, level?, pretty? }) envuelve Pino; para pretty: true, añade pino-pretty en tu app.


Demo

Los fragmentos siguientes son una mini demo de la librería. Cada proceso tiene su propio FluffyAdapter + FluffyEventBus y usa createLogger de @imj_media/fluffy-amqp (Pino adaptado al contrato Logger).

Ejecución (desde la raíz de tu proyecto): npm install, define RABBITMQ_URL con export o un .env local que crees tú (no viene en el repo), luego npm run consumer y en otra terminal npm run publisher. Para resiliencia: npm run consumer:resilience y npm run publisher:resilience (variable DEMO_SCENARIO=ok | fail_twice | non_retryable | fail_always). No ejecutes consumer.ts y consumer-resilience.ts a la vez si quieres evitar competencia en bindings compartidos; la demo de resiliencia usa cola y routing distintos.

config.ts

/**
 * Nombres AMQP compartidos entre publicador y consumidor de la demo.
 */
export const EXCHANGE_ORDERS = 'orders.demo';
/** Cola durable de la demo (un consumidor de ejemplo). */
export const QUEUE_NOTIFICATIONS = 'orders-demo-notifications';
/** Prefijo lógico de routing keys de la demo. */
export const ROUTING_ORDERS_ALL = 'order.#';

/** Demo: reintentos + DLQ (parar el `consumer` normal antes de usar esta cola). */
export const QUEUE_RESILIENCE = 'orders-demo-resilience';
export const ROUTING_RESILIENCE = 'order.resilience.#';

/** Exchange DLX + cola DLQ (routing key fija hacia la cola). */
export const DLX_EXCHANGE = 'orders.demo.dlx';
export const DLQ_QUEUE = 'orders-demo-dlq';
export const DLQ_ROUTING_KEY = 'dead';

load-env.ts

/**
 * Si existe un `.env` local junto al ejemplo (no versionado), lo carga aunque el cwd sea otro.
 */
import { config } from 'dotenv';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';

const root = join(dirname(fileURLToPath(import.meta.url)), '..');
config({ path: join(root, '.env') });

Publicador — publisher.ts

/**
 * Publica uno o más eventos de pedido al exchange topic `orders.demo`.
 */
import './load-env.js';
import {
  FluffyEventBus,
  FluffyAdapter,
  createEventEnvelope,
  createLogger,
} from '@imj_media/fluffy-amqp';
import { EXCHANGE_ORDERS } from './config.js';

const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';

async function main(): Promise<void> {
  const logger = createLogger({
    name: 'order-publisher-demo',
    level: 'info',
    pretty: true,
  });

  const adapter = new FluffyAdapter({ url, logger });
  const bus = new FluffyEventBus({ broker: adapter, logger });

  await bus.start();

  const orderId = `demo-${Date.now()}`;
  const envelope = createEventEnvelope(
    'order.created',
    {
      orderId,
      customerId: 'cust-001',
      total: 199.99,
      currency: 'MXN',
    },
    { source: 'order-notifications-demo' }
  );

  const result = await bus.publish({
    exchange: EXCHANGE_ORDERS,
    routingKey: 'order.created',
    message: envelope,
  });

  logger.info('Publicación completada', {
    success: result.success,
    messageId: result.messageId,
  });

  await bus.stop();
}

main().catch((err) => {
  console.error(err);
  process.exitCode = 1;
});

Consumidor — consumer.ts

/**
 * Consumidor con router in-process + idempotencia en memoria (un solo proceso).
 */
import './load-env.js';
import {
  FluffyEventBus,
  FluffyAdapter,
  createEventRouter,
  createLogger,
} from '@imj_media/fluffy-amqp';
import { MemoryIdempotencyStore } from '@imj_media/fluffy-amqp/idempotency/memory';
import { EXCHANGE_ORDERS, QUEUE_NOTIFICATIONS, ROUTING_ORDERS_ALL } from './config.js';

const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';

async function main(): Promise<void> {
  const logger = createLogger({
    name: 'order-consumer-demo',
    level: 'info',
    pretty: true,
  });

  const adapter = new FluffyAdapter({ url, logger });
  const bus = new FluffyEventBus({
    broker: adapter,
    logger,
    idempotencyStore: new MemoryIdempotencyStore(),
  });

  const router = createEventRouter(logger);
  router.register('order.created', async (event) => {
    //const orderId = (event.data as { orderId?: string }).orderId;
    const order = event.data;
    logger.info('Pedido creado recibido', { order });
  });
  router.register('order.cancelled', async (event) => {
    const orderId = (event.data as { orderId?: string }).orderId;
    logger.warn('Pedido cancelado recibido', { orderId });
  });

  await bus.start();

  await bus.subscribe({
    exchange: EXCHANGE_ORDERS,
    routingKey: ROUTING_ORDERS_ALL,
    queueName: QUEUE_NOTIFICATIONS,
    router,
    prefetch: 10,
    maxRetries: 2,
  });

  logger.info('Consumidor listo; Ctrl+C para salir');

  const shutdown = async () => {
    logger.info('Cerrando bus…');
    await bus.stop();
    process.exit(0);
  };
  process.on('SIGINT', () => void shutdown());
  process.on('SIGTERM', () => void shutdown());
}

main().catch((err) => {
  console.error(err);
  process.exitCode = 1;
});

Resiliencia: errores, DLQ, backoff y reintentos

Publicador — publisher-resilience.ts

/**
 * Publica un evento de prueba para `consumer-resilience.ts`.
 * Escenario: env `DEMO_SCENARIO` (default `fail_always`).
 */
import './load-env.js';
import {
  FluffyEventBus,
  FluffyAdapter,
  createEventEnvelope,
  createLogger,
} from '@imj_media/fluffy-amqp';
import { EXCHANGE_ORDERS } from './config.js';

const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';

const scenario = process.env.DEMO_SCENARIO ?? 'fail_always';

async function main(): Promise<void> {
  const logger = createLogger({
    name: 'publisher-resilience-demo',
    level: 'info',
    pretty: true,
  });

  const adapter = new FluffyAdapter({ url, logger });
  const bus = new FluffyEventBus({ broker: adapter, logger });

  await bus.start();

  const envelope = createEventEnvelope(
    'order.resilience',
    {
      scenario,
      hint: 'Ver consumer-resilience y cola DLQ en la UI de RabbitMQ',
    },
    { source: 'order-notifications-demo' }
  );

  await bus.publish({
    exchange: EXCHANGE_ORDERS,
    routingKey: 'order.resilience.test',
    message: envelope,
  });

  logger.info('Evento publicado', {
    routingKey: 'order.resilience.test',
    scenario,
    eventId: envelope.meta.eventId,
  });

  await bus.stop();
}

main().catch((err) => {
  console.error(err);
  process.exitCode = 1;
});

Consumidor — consumer-resilience.ts

/**
 * Consumidor de demostración: reintentos con backoff, errores clasificados y DLQ.
 *
 * No ejecutar junto con `consumer.ts` si ambos comparten binding y quieres evitar competencia;
 * para esta demo usa solo este proceso + `publisher:resilience`.
 */
import './load-env.js';
import {
  FluffyEventBus,
  NonRetryableError,
  FluffyAdapter,
  createLogger,
} from '@imj_media/fluffy-amqp';
import {
  DLQ_QUEUE,
  DLQ_ROUTING_KEY,
  DLX_EXCHANGE,
  EXCHANGE_ORDERS,
  QUEUE_RESILIENCE,
  ROUTING_RESILIENCE,
} from './config.js';

const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';

/** Intentos por `eventId` dentro de un mismo ciclo de reintentos (mismo mensaje). */
const transientAttempts = new Map<string, number>();

async function main(): Promise<void> {
  const logger = createLogger({
    name: 'order-resilience-demo',
    level: 'info',
    pretty: true,
  });

  const adapter = new FluffyAdapter({
    url,
    logger,
    dlq: {
      exchange: DLX_EXCHANGE,
      queue: DLQ_QUEUE,
      routingKey: DLQ_ROUTING_KEY,
    },
  });
  const bus = new FluffyEventBus({ broker: adapter, logger });

  await bus.start();

  await bus.subscribe({
    exchange: EXCHANGE_ORDERS,
    routingKey: ROUTING_RESILIENCE,
    queueName: QUEUE_RESILIENCE,
    retryStrategy: {
      maxAttempts: 4,
      /** Backoff corto para ver los reintentos en logs sin esperar minutos. */
      backoffMs: (attempt) => 400 + attempt * 150,
    },
    handler: async (event) => {
      const scenario = String((event.data as { scenario?: string }).scenario ?? 'fail_always');
      const eventId = event.meta.eventId;

      logger.info('Mensaje recibido', { eventId, scenario });

      switch (scenario) {
        case 'ok':
          logger.info('Escenario ok: sin error, ACK inmediato');
          return;

        case 'non_retryable':
          throw new NonRetryableError(
            'Error de negocio simulado: no reintentar (va directo a DLQ)'
          );

        case 'fail_twice': {
          const n = (transientAttempts.get(eventId) ?? 0) + 1;
          transientAttempts.set(eventId, n);
          if (n < 3) {
            throw new Error(`Fallo transitorio simulado (intento ${n}/3 dentro de withRetry)`);
          }
          transientAttempts.delete(eventId);
          logger.info('Tercer intento OK tras fallos simulados');
          return;
        }

        case 'fail_always':
        default:
          throw new Error('Fallo en cada intento: se agotan reintentos y el mensaje va a la DLQ');
      }
    },
  });

  logger.info('Consumidor resiliencia listo', {
    cola: QUEUE_RESILIENCE,
    dlq: DLQ_QUEUE,
    dlx: DLX_EXCHANGE,
    escenarios: 'publica con DEMO_SCENARIO=ok | fail_twice | non_retryable | fail_always',
  });

  const shutdown = async () => {
    logger.info('Cerrando bus…');
    await bus.stop();
    process.exit(0);
  };
  process.on('SIGINT', () => void shutdown());
  process.on('SIGTERM', () => void shutdown());
}

main().catch((err) => {
  console.error(err);
  process.exitCode = 1;
});

Exports del paquete

| Ruta de import | Contenido | | ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | @imj_media/fluffy-amqp | FluffyEventBus, FluffyAdapter, createFluffyEventBusFromConfig, createEventEnvelope, validateEventEnvelope, createEventRouter, createConcurrencyLimiter, JsonEventSerializer, createLogger, tipos, RetryableError / NonRetryableError, guards, etc. | | @imj_media/fluffy-amqp/idempotency/memory | MemoryIdempotencyStore | | @imj_media/fluffy-amqp/idempotency/redis | RedisIdempotencyStore | | @imj_media/fluffy-amqp/idempotency/db | DatabaseIdempotencyStore |


Requisitos e instalación

| Requisito | Versión / nota | | ------------ | -------------------------------------------------------- | | Node.js | >= 18 (recomendado LTS 20+) | | RabbitMQ | 3.x, AMQP 0-9-1; URLs amqp:// o amqps:// |

npm install @imj_media/fluffy-amqp

Tras clonar el repositorio para desarrollo local:

npm install
npm run build

Más detalle: docs/COMPATIBILIDAD.md.


Buenas prácticas

  • Usa nombres estables de exchange y cola; documéntalos para operaciones.
  • Prefiere amqps:// en producción si el broker soporta TLS.
  • Mantén secretos en variables de entorno / gestores de secretos; no subas RABBITMQ_URL con credenciales.
  • Usa idempotencia cuando haya varias réplicas de consumidor o reintentos que dupliquen entregas.
  • Topic vs fanout: la semántica de enrutado cambia; alinea bindings con tu topología.
  • Sigue actualizaciones en CHANGELOG.md y prueba en staging antes de producción.

Desarrollo del paquete

| Script | Propósito | | --------------------------------------- | ----------------------------------------------------------------------------------------------- | | npm run build | Empaquetado con tsup (CJS + ESM + .d.ts) | | npm run build:verify | Comprueba artefactos y carga dist | | npm run test:unit | Pruebas unitarias (sin broker) | | npm run test:integration | Pruebas AMQP — define RABBITMQ_URL; REQUIRE_RABBITMQ=1 para fallar si no hay broker | | npm run lint / npm run format:check | ESLint / Prettier |

Para pruebas de integración AMQP necesitas un RabbitMQ alcanzable (variable RABBITMQ_URL, por defecto suele ser amqp://127.0.0.1:5672).


Documentación complementaria

| Documento | Descripción | | ------------------------------------------------------------------------- | --------------------------------------- | | docs/README.md | Índice de guías | | GUIA-CONSUMIDORES.md | Comportamiento orientado a consumidores | | EJEMPLO-SNIPPETS.mdx | Snippets listos para copiar | | GUIA-ARCHIVOS-Y-EJEMPLOS.mdx | Mapa de src/ | | GUIA-MAESTRA-IMPLEMENTACION.mdx | Guía de implementación completa | | RUNBOOK-OPERACIONES.md | Operación / UI de RabbitMQ | | DISENO-DECISIONES.md | Decisiones de diseño |

La API pública queda documentada en JSDoc en src/ y en las guías de docs/; los tipos .d.ts publicados en dist/ completan la referencia para consumidores TypeScript.


Seguridad, contribución y licencia

Licencia: ISC.

Incidencias: Bitbucket — issues (también en package.jsonbugs).

Autor: Alejandro Loera (ver package.json).