@imj_media/fluffy-amqp
v1.0.0
Published
Librería TypeScript para mensajería orientada a eventos sobre RabbitMQ (AMQP): fachada FluffyEventBus, adaptador con reconexión, publicación/suscripción, reintentos, DLQ opcional, router por tópicos en proceso, sobres JSON validados con Zod e idempotencia
Readme
@imj_media/fluffy-amqp
Bus de eventos AMQP en TypeScript para RabbitMQ: reconexión automática, DLQ opcional, reintentos, sobres validados con Zod, enrutamiento in-process e idempotencia en subpaths (memory / redis / db).
| Recurso | Enlace |
| -------------------------------------- | -------------------------------------------------------------------------------------------- |
| Repositorio / incidencias / página | Bitbucket — desarrollo_imj/fluffy-amqp |
| npm | npm install @imj_media/fluffy-amqp |
Índice
- Visión general
- Arquitectura
- Modelo de sobre de evento
- Publicación
- Consumo
- Idempotencia
- Serialización
- Concurrencia
- Registro (logging)
- Demo
- Exports del paquete
- Requisitos e instalación
- Buenas prácticas
- Desarrollo del paquete
- Documentación complementaria
- Seguridad, contribución y licencia
Visión general
@imj_media/fluffy-amqp sirve para publicar y consumir eventos de dominio sobre RabbitMQ con una API pequeña y tipada:
- El núcleo se apoya en
MessageBroker(conectar, publicar, suscribir, cerrar), sin acoplarse a tipos de RabbitMQ: puedes probar con mocks o cambiar de transporte más adelante. FluffyAdapterimplementaMessageBrokerconamqplibyamqp-connection-manager(reconexión automática). Usa un canal de confirmación para publicar y otro para consumir; si pasasdlq: { exchange, queue, routingKey? }, elsetupdel canal de consumo declara el exchange de dead-letter y la cola DLQ.FluffyEventBuses una fachada sobre un broker inyectado:start/stop(aliasconnect/close),restart()(parar → breve pausa → arrancar),publish,subscribe. UnidempotencyStoreopcional en el bus se fusiona ensubscribesi la suscripción no define otro almacén.createFluffyEventBusFromConfig(broker, config)construye unFluffyEventBusa partir deFluffyEventBusConfig(logging, idempotencia por defecto y bloques informativos de reintento/concurrencia/RabbitMQ en la configuración).
Dependencias de ejecución relevantes: amqplib, amqp-connection-manager, zod, pino (para la factoría opcional createLogger). Si usas createLogger({ pretty: true }), instala pino-pretty en tu aplicación (no va empaquetada como dependencia dura de la librería).
Arquitectura
flowchart LR
subgraph app [Tu aplicación]
EB[FluffyEventBus]
end
subgraph contract [Contrato]
MB[MessageBroker]
end
subgraph rabbit [Pila RabbitMQ]
RA[FluffyAdapter]
AMQP[(RabbitMQ)]
end
EB --> MB
RA --> MB
RA --> AMQP- Publicación:
FluffyEventBus.publish→FluffyAdapter.publish→ publicación en canal (sobre JSON por defecto). - Consumo:
FluffyEventBus.subscribe→ assert de exchange/cola/enlace →consume→ deserializar + idempotencia opcional →handleroEventRouter.dispatch→ ACK / NACK → DLQ si está configurada.
Modelo de sobre de evento
createEventEnvelope(eventType, data, { source, correlationId?, context?, eventVersion?, eventId?, occurredAt?, metadata? })construye unEventEnvelope: enmetavaneventId,occurredAt,correlationId(UUID por defecto víanode:crypto),eventType,source, etc.validateEventEnvelope(unknown)valida con Zod (esquemas compatibles con Zod 4):meta,contextopcional,data.- Correlación: usa
correlationIdpara enlazar flujos entre servicios.eventIdes el identificador de negocio del mensaje y la clave habitual de idempotencia;PublishResult.messageIdconvencionalmente coincide conmeta.eventId.
Publicación
PublishParams: exchange, routingKey, message (EventEnvelope), headers opcionales, serializer opcional solo para esa llamada.
El publicador no declara el exchange; asegúrate de que exista (a menudo lo crea el primer consumidor que ejecuta assertExchange, u operaciones).
Consumo
SubscribeParams incluye:
| Ámbito | Notas |
| --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Enrutamiento | exchange, routingKey (binding; patrones topic como orders.#), queueName opcional, exchangeType (topic por defecto; también direct / fanout). |
| Handler vs router | handler o router (no ambos como responsabilidades separadas de primer nivel; el router envuelve el despacho). |
| Resiliencia | Atajo maxRetries o retryStrategy: maxAttempts, backoffMs(attempt), shouldRetry(error, attempt). |
| DLQ | dlq por suscripción; hereda la DLQ global del adaptador si se omite. |
| Rendimiento | prefetch (QoS AMQP), maxConcurrency (semáforo en proceso). |
| Otros | idempotencyStore, serializer. |
createEventRouter(logger) — registro con register(pattern, handler). Orden de resolución: coincidencia exacta de routing key → mejor patrón topic (*, #) → patrón igual a eventType.
Errores:
RetryableError— transitorio; la política de reintentos puede volver a intentar.NonRetryableError— sin reintentos; el consumidor hace NACK sin reencolar → DLQ si está configurada.Errorgenérico — suele reintentarse según la política hastamaxAttempts, luego NACK → DLQ.- Utilidades:
isRetryableError,isNonRetryableError.
Idempotencia
Implementa IdempotencyStore: hasProcessed(eventId), markProcessed(eventId).
Subpaths tree-shakeables:
| Import | Uso |
| ------------------------------------------- | ----------------------------------------------------------------------------------------------------- |
| @imj_media/fluffy-amqp/idempotency/memory | MemoryIdempotencyStore — un solo proceso. |
| @imj_media/fluffy-amqp/idempotency/redis | RedisIdempotencyStore — cliente con get/set, prefijo de clave y TTL opcionales. |
| @imj_media/fluffy-amqp/idempotency/db | DatabaseIdempotencyStore — SQL/ORM mediante callbacks hasProcessed / markProcessed. |
Serialización
Por defecto: JSON UTF-8 + validación Zod al deserializar — JsonEventSerializer / createJsonEventSerializer. La interfaz EventSerializer permite otros formatos en cable (p. ej. Avro/Protobuf) manteniendo EventEnvelope como contrato de negocio.
Concurrencia
createConcurrencyLimiter(maxConcurrency) devuelve un ConcurrencyController (semáforo FIFO). Combínalo con prefetch AMQP: el prefetch limita mensajes sin ACK en el canal; maxConcurrency limita ejecuciones paralelas del manejador en el proceso.
Registro (logging)
Contrato Logger: (message: string, meta?: Record<string, unknown>). createLogger({ name?, level?, pretty? }) envuelve Pino; para pretty: true, añade pino-pretty en tu app.
Demo
Los fragmentos siguientes son una mini demo de la librería. Cada proceso tiene su propio FluffyAdapter + FluffyEventBus y usa createLogger de @imj_media/fluffy-amqp (Pino adaptado al contrato Logger).
Ejecución (desde la raíz de tu proyecto): npm install, define RABBITMQ_URL con export o un .env local que crees tú (no viene en el repo), luego npm run consumer y en otra terminal npm run publisher. Para resiliencia: npm run consumer:resilience y npm run publisher:resilience (variable DEMO_SCENARIO=ok | fail_twice | non_retryable | fail_always). No ejecutes consumer.ts y consumer-resilience.ts a la vez si quieres evitar competencia en bindings compartidos; la demo de resiliencia usa cola y routing distintos.
config.ts
/**
* Nombres AMQP compartidos entre publicador y consumidor de la demo.
*/
export const EXCHANGE_ORDERS = 'orders.demo';
/** Cola durable de la demo (un consumidor de ejemplo). */
export const QUEUE_NOTIFICATIONS = 'orders-demo-notifications';
/** Prefijo lógico de routing keys de la demo. */
export const ROUTING_ORDERS_ALL = 'order.#';
/** Demo: reintentos + DLQ (parar el `consumer` normal antes de usar esta cola). */
export const QUEUE_RESILIENCE = 'orders-demo-resilience';
export const ROUTING_RESILIENCE = 'order.resilience.#';
/** Exchange DLX + cola DLQ (routing key fija hacia la cola). */
export const DLX_EXCHANGE = 'orders.demo.dlx';
export const DLQ_QUEUE = 'orders-demo-dlq';
export const DLQ_ROUTING_KEY = 'dead';load-env.ts
/**
* Si existe un `.env` local junto al ejemplo (no versionado), lo carga aunque el cwd sea otro.
*/
import { config } from 'dotenv';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
const root = join(dirname(fileURLToPath(import.meta.url)), '..');
config({ path: join(root, '.env') });Publicador — publisher.ts
/**
* Publica uno o más eventos de pedido al exchange topic `orders.demo`.
*/
import './load-env.js';
import {
FluffyEventBus,
FluffyAdapter,
createEventEnvelope,
createLogger,
} from '@imj_media/fluffy-amqp';
import { EXCHANGE_ORDERS } from './config.js';
const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';
async function main(): Promise<void> {
const logger = createLogger({
name: 'order-publisher-demo',
level: 'info',
pretty: true,
});
const adapter = new FluffyAdapter({ url, logger });
const bus = new FluffyEventBus({ broker: adapter, logger });
await bus.start();
const orderId = `demo-${Date.now()}`;
const envelope = createEventEnvelope(
'order.created',
{
orderId,
customerId: 'cust-001',
total: 199.99,
currency: 'MXN',
},
{ source: 'order-notifications-demo' }
);
const result = await bus.publish({
exchange: EXCHANGE_ORDERS,
routingKey: 'order.created',
message: envelope,
});
logger.info('Publicación completada', {
success: result.success,
messageId: result.messageId,
});
await bus.stop();
}
main().catch((err) => {
console.error(err);
process.exitCode = 1;
});Consumidor — consumer.ts
/**
* Consumidor con router in-process + idempotencia en memoria (un solo proceso).
*/
import './load-env.js';
import {
FluffyEventBus,
FluffyAdapter,
createEventRouter,
createLogger,
} from '@imj_media/fluffy-amqp';
import { MemoryIdempotencyStore } from '@imj_media/fluffy-amqp/idempotency/memory';
import { EXCHANGE_ORDERS, QUEUE_NOTIFICATIONS, ROUTING_ORDERS_ALL } from './config.js';
const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';
async function main(): Promise<void> {
const logger = createLogger({
name: 'order-consumer-demo',
level: 'info',
pretty: true,
});
const adapter = new FluffyAdapter({ url, logger });
const bus = new FluffyEventBus({
broker: adapter,
logger,
idempotencyStore: new MemoryIdempotencyStore(),
});
const router = createEventRouter(logger);
router.register('order.created', async (event) => {
//const orderId = (event.data as { orderId?: string }).orderId;
const order = event.data;
logger.info('Pedido creado recibido', { order });
});
router.register('order.cancelled', async (event) => {
const orderId = (event.data as { orderId?: string }).orderId;
logger.warn('Pedido cancelado recibido', { orderId });
});
await bus.start();
await bus.subscribe({
exchange: EXCHANGE_ORDERS,
routingKey: ROUTING_ORDERS_ALL,
queueName: QUEUE_NOTIFICATIONS,
router,
prefetch: 10,
maxRetries: 2,
});
logger.info('Consumidor listo; Ctrl+C para salir');
const shutdown = async () => {
logger.info('Cerrando bus…');
await bus.stop();
process.exit(0);
};
process.on('SIGINT', () => void shutdown());
process.on('SIGTERM', () => void shutdown());
}
main().catch((err) => {
console.error(err);
process.exitCode = 1;
});Resiliencia: errores, DLQ, backoff y reintentos
Publicador — publisher-resilience.ts
/**
* Publica un evento de prueba para `consumer-resilience.ts`.
* Escenario: env `DEMO_SCENARIO` (default `fail_always`).
*/
import './load-env.js';
import {
FluffyEventBus,
FluffyAdapter,
createEventEnvelope,
createLogger,
} from '@imj_media/fluffy-amqp';
import { EXCHANGE_ORDERS } from './config.js';
const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';
const scenario = process.env.DEMO_SCENARIO ?? 'fail_always';
async function main(): Promise<void> {
const logger = createLogger({
name: 'publisher-resilience-demo',
level: 'info',
pretty: true,
});
const adapter = new FluffyAdapter({ url, logger });
const bus = new FluffyEventBus({ broker: adapter, logger });
await bus.start();
const envelope = createEventEnvelope(
'order.resilience',
{
scenario,
hint: 'Ver consumer-resilience y cola DLQ en la UI de RabbitMQ',
},
{ source: 'order-notifications-demo' }
);
await bus.publish({
exchange: EXCHANGE_ORDERS,
routingKey: 'order.resilience.test',
message: envelope,
});
logger.info('Evento publicado', {
routingKey: 'order.resilience.test',
scenario,
eventId: envelope.meta.eventId,
});
await bus.stop();
}
main().catch((err) => {
console.error(err);
process.exitCode = 1;
});Consumidor — consumer-resilience.ts
/**
* Consumidor de demostración: reintentos con backoff, errores clasificados y DLQ.
*
* No ejecutar junto con `consumer.ts` si ambos comparten binding y quieres evitar competencia;
* para esta demo usa solo este proceso + `publisher:resilience`.
*/
import './load-env.js';
import {
FluffyEventBus,
NonRetryableError,
FluffyAdapter,
createLogger,
} from '@imj_media/fluffy-amqp';
import {
DLQ_QUEUE,
DLQ_ROUTING_KEY,
DLX_EXCHANGE,
EXCHANGE_ORDERS,
QUEUE_RESILIENCE,
ROUTING_RESILIENCE,
} from './config.js';
const url = process.env.RABBITMQ_URL ?? 'amqp://guest:[email protected]:5672';
/** Intentos por `eventId` dentro de un mismo ciclo de reintentos (mismo mensaje). */
const transientAttempts = new Map<string, number>();
async function main(): Promise<void> {
const logger = createLogger({
name: 'order-resilience-demo',
level: 'info',
pretty: true,
});
const adapter = new FluffyAdapter({
url,
logger,
dlq: {
exchange: DLX_EXCHANGE,
queue: DLQ_QUEUE,
routingKey: DLQ_ROUTING_KEY,
},
});
const bus = new FluffyEventBus({ broker: adapter, logger });
await bus.start();
await bus.subscribe({
exchange: EXCHANGE_ORDERS,
routingKey: ROUTING_RESILIENCE,
queueName: QUEUE_RESILIENCE,
retryStrategy: {
maxAttempts: 4,
/** Backoff corto para ver los reintentos en logs sin esperar minutos. */
backoffMs: (attempt) => 400 + attempt * 150,
},
handler: async (event) => {
const scenario = String((event.data as { scenario?: string }).scenario ?? 'fail_always');
const eventId = event.meta.eventId;
logger.info('Mensaje recibido', { eventId, scenario });
switch (scenario) {
case 'ok':
logger.info('Escenario ok: sin error, ACK inmediato');
return;
case 'non_retryable':
throw new NonRetryableError(
'Error de negocio simulado: no reintentar (va directo a DLQ)'
);
case 'fail_twice': {
const n = (transientAttempts.get(eventId) ?? 0) + 1;
transientAttempts.set(eventId, n);
if (n < 3) {
throw new Error(`Fallo transitorio simulado (intento ${n}/3 dentro de withRetry)`);
}
transientAttempts.delete(eventId);
logger.info('Tercer intento OK tras fallos simulados');
return;
}
case 'fail_always':
default:
throw new Error('Fallo en cada intento: se agotan reintentos y el mensaje va a la DLQ');
}
},
});
logger.info('Consumidor resiliencia listo', {
cola: QUEUE_RESILIENCE,
dlq: DLQ_QUEUE,
dlx: DLX_EXCHANGE,
escenarios: 'publica con DEMO_SCENARIO=ok | fail_twice | non_retryable | fail_always',
});
const shutdown = async () => {
logger.info('Cerrando bus…');
await bus.stop();
process.exit(0);
};
process.on('SIGINT', () => void shutdown());
process.on('SIGTERM', () => void shutdown());
}
main().catch((err) => {
console.error(err);
process.exitCode = 1;
});Exports del paquete
| Ruta de import | Contenido |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| @imj_media/fluffy-amqp | FluffyEventBus, FluffyAdapter, createFluffyEventBusFromConfig, createEventEnvelope, validateEventEnvelope, createEventRouter, createConcurrencyLimiter, JsonEventSerializer, createLogger, tipos, RetryableError / NonRetryableError, guards, etc. |
| @imj_media/fluffy-amqp/idempotency/memory | MemoryIdempotencyStore |
| @imj_media/fluffy-amqp/idempotency/redis | RedisIdempotencyStore |
| @imj_media/fluffy-amqp/idempotency/db | DatabaseIdempotencyStore |
Requisitos e instalación
| Requisito | Versión / nota |
| ------------ | -------------------------------------------------------- |
| Node.js | >= 18 (recomendado LTS 20+) |
| RabbitMQ | 3.x, AMQP 0-9-1; URLs amqp:// o amqps:// |
npm install @imj_media/fluffy-amqpTras clonar el repositorio para desarrollo local:
npm install
npm run buildMás detalle: docs/COMPATIBILIDAD.md.
Buenas prácticas
- Usa nombres estables de exchange y cola; documéntalos para operaciones.
- Prefiere
amqps://en producción si el broker soporta TLS. - Mantén secretos en variables de entorno / gestores de secretos; no subas
RABBITMQ_URLcon credenciales. - Usa idempotencia cuando haya varias réplicas de consumidor o reintentos que dupliquen entregas.
- Topic vs fanout: la semántica de enrutado cambia; alinea bindings con tu topología.
- Sigue actualizaciones en CHANGELOG.md y prueba en staging antes de producción.
Desarrollo del paquete
| Script | Propósito |
| --------------------------------------- | ----------------------------------------------------------------------------------------------- |
| npm run build | Empaquetado con tsup (CJS + ESM + .d.ts) |
| npm run build:verify | Comprueba artefactos y carga dist |
| npm run test:unit | Pruebas unitarias (sin broker) |
| npm run test:integration | Pruebas AMQP — define RABBITMQ_URL; REQUIRE_RABBITMQ=1 para fallar si no hay broker |
| npm run lint / npm run format:check | ESLint / Prettier |
Para pruebas de integración AMQP necesitas un RabbitMQ alcanzable (variable RABBITMQ_URL, por defecto suele ser amqp://127.0.0.1:5672).
Documentación complementaria
| Documento | Descripción |
| ------------------------------------------------------------------------- | --------------------------------------- |
| docs/README.md | Índice de guías |
| GUIA-CONSUMIDORES.md | Comportamiento orientado a consumidores |
| EJEMPLO-SNIPPETS.mdx | Snippets listos para copiar |
| GUIA-ARCHIVOS-Y-EJEMPLOS.mdx | Mapa de src/ |
| GUIA-MAESTRA-IMPLEMENTACION.mdx | Guía de implementación completa |
| RUNBOOK-OPERACIONES.md | Operación / UI de RabbitMQ |
| DISENO-DECISIONES.md | Decisiones de diseño |
La API pública queda documentada en JSDoc en src/ y en las guías de docs/; los tipos .d.ts publicados en dist/ completan la referencia para consumidores TypeScript.
Seguridad, contribución y licencia
- SECURITY.md — divulgación responsable
- CONTRIBUTING.md — cómo contribuir
- CODE_OF_CONDUCT.md
Licencia: ISC.
Incidencias: Bitbucket — issues (también en package.json → bugs).
Autor: Alejandro Loera (ver package.json).
