npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@onlineapps/conn-infra-mq

v1.1.70

Published

A promise-based, broker-agnostic client for sending and receiving messages via RabbitMQ

Downloads

1,265

Readme

@onlineapps/conn-infra-mq

Build Status Coverage Status npm version

Message queue connector with layered architecture for workflow orchestration, fork-join, and retry patterns. Built on top of RabbitMQ with clean separation of concerns. Asynchronous workflow pattern only - synchronous RPC patterns are not supported and not aligned with our architecture philosophy.


🚀 Features

  • Layered Architecture: Clean separation into specialized layers (WorkflowRouter, QueueManager, ForkJoinHandler, RetryHandler)
  • Workflow Orchestration: Decentralized workflow routing without central orchestrator
  • Fork-Join Pattern: Parallel processing with result aggregation and built-in join strategies
  • Asynchronous First: All communication is asynchronous (fire-and-forget), no synchronous blocking patterns
  • Automatic Retry: Exponential backoff, dead letter queue management, configurable retry policies
  • Queue Management: TTL, DLQ, auto-delete, temporary queues, exchange bindings
  • Promise-based API: All operations return promises for clean async/await usage
  • Built-in Serialization: JSON with custom error handling
  • Config Validation: Strict schema validation via Ajv
  • Extensible Transport: Clear separation between core logic and transport layer

📦 Installation

npm install @onlineapps/conn-infra-mq
# or
yarn add @onlineapps/conn-infra-mq

Requires Node.js ≥12. For RabbitMQ usage, ensure an accessible AMQP server.


🏗️ Architecture

ConnectorMQClient (main orchestrator - for business services only)
    ├── BaseClient (core AMQP operations)
    ├── WorkflowRouter (workflow orchestration)
    ├── QueueManager (queue lifecycle management)
    ├── ForkJoinHandler (parallel processing)
    └── RetryHandler (error recovery & DLQ)

WorkflowRouter - How It Works

Purpose: Handles workflow routing between services in a decentralized architecture.

Key Methods:

  • publishWorkflowInit(workflow, options) - Publishes workflow to workflow.init queue (entry point)
  • publishToServiceWorkflow(serviceName, message, options) - Routes to specific service's workflow queue
  • publishWorkflowCompleted(result, options) - Publishes completed workflow to workflow.completed queue
  • consumeWorkflowInit(handler, options) - Consumes from workflow.init (competing consumers pattern)
  • consumeServiceWorkflow(serviceName, handler, options) - Consumes from service-specific workflow queue

How It Works:

  1. Gateway publishes to workflow.init via publishWorkflowInit()
  2. Business services (competing consumers) consume from workflow.init via consumeWorkflowInit()
  3. Services route to next service via publishToServiceWorkflow()
  4. Final service publishes completion via publishWorkflowCompleted()

Note: WorkflowRouter is part of conn-infra-mq connector (for business services). Infrastructure services (gateway) should use the underlying MQ client library directly, not the connector.

🔧 Quick Start

'use strict';

const ConnectorMQClient = require('@onlineapps/conn-infra-mq');

(async () => {
  // 1. Create client with configuration
  const client = new ConnectorMQClient({
    host: 'amqp://localhost:5672',
    serviceName: 'my-service',
    queue: 'default-queue',
    durable: true,
    prefetch: 5,                        // Default prefetch count for consumers
    noAck: false,                       // Default auto-acknowledge = false
    retryPolicy: {                      // Optional reconnection policy (not enforced in v1.0.0)
      retries: 5,
      initialDelayMs: 1000,
      maxDelayMs: 30000,
      factor: 2
    }
  });

  // 2. Register a global error handler
  client.onError(err => {
    console.error('[AgentMQClient] Error:', err);
  });

  // 3. Connect to RabbitMQ
  try {
    await client.connect();
    console.log('Connected to broker');
  } catch (err) {
    console.error('Connection failed:', err);
    process.exit(1);
  }

  // 4. Publish a sample message
  const samplePayload = { taskId: 'abc123', action: 'processData', timestamp: Date.now() };
  try {
    await client.publish('job_queue', samplePayload, {
      persistent: true,
      headers: { origin: 'quickStart' }
    });
    console.log('Message published:', samplePayload);
  } catch (err) {
    console.error('Publish error:', err);
  }

  // 5. Consume messages
  try {
    await client.consume(
      'job_queue',
      async (msg) => {
        const data = JSON.parse(msg.content.toString('utf8'));
        console.log('Received:', data);
        // Process message...
        await client.ack(msg);
      },
      { prefetch: 5, noAck: false }
    );
    console.log('Consuming from "job_queue"...');
  } catch (err) {
    console.error('Consume error:', err);
  }

  // 6. Graceful shutdown on SIGINT
  process.on('SIGINT', async () => {
    console.log('Shutting down...');
    try {
      await client.disconnect();
      console.log('Disconnected, exiting.');
      process.exit(0);
    } catch (discErr) {
      console.error('Error during disconnect:', discErr);
      process.exit(1);
    }
  });
})();

📄 Configuration

Configuration can be provided to the AgentMQClient constructor or as overrides to connect(). Below is a summary of supported fields (see docs/api.md for full details):

| Field | Type | Description | Default | | ------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- | | type | string | Transport type: 'rabbitmq' | 'rabbitmq' | | host | string | Connection URI or hostname. For RabbitMQ: e.g. 'amqp://user:pass@localhost:5672'. | Required | | queue | string | Default queue name for publish/consume if not overridden per call. | '' | | exchange | string | Default exchange name. Empty string uses the default direct exchange. | '' | | durable | boolean | Declare queues/exchanges as durable. | true | | prefetch | integer | Default prefetch count for consumers. | 1 | | noAck | boolean | Default auto-acknowledge setting for consumers. If true, messages will be auto-acked. | false | | logger | object | Custom logger with methods: info(), warn(), error(), debug(). If omitted, console is used. | null | | retryPolicy | object | Reconnection policy with properties:retries (number)initialDelayMs (ms)maxDelayMs (ms)factor (multiplier). Not enforced in v1.0.0. | { retries: 5, initialDelayMs: 1000, maxDelayMs: 30000, factor: 2 } |


🛠️ API Reference

For full class and method documentation, including parameter descriptions, return values, and error details, see docs/api.md.


✅ Testing

npm test                  # All tests
npm run test:unit         # Unit tests only
npm run test:component    # Component tests
npm run test:integration  # Integration tests

Test Coverage Status

  • Overall Coverage: 24.52% (improving after refactoring)
  • Passing Tests: 75/104 (72%)
  • Test Suites: 10/14 passing
  • Well Tested: Config, Transports, Error handling (100%)
  • Needs Testing: New layers (1-5%)

See Test Report for detailed coverage analysis.


🎨 Coding Standards

  • Linting: ESLint (eslint:recommended + Prettier).
  • Formatting: Prettier — check with npm run prettier:check, fix with npm run prettier:fix.
  • Testing: Jest, aiming for ≥90% coverage.

🎯 Refactoring Benefits

✅ What Was Achieved

  1. Clean Layered Architecture - Separated into specialized layers
  2. Removed Technical Debt - Replaced MQWrapper with cleaner design
  3. Improved Extensibility - Easy to add new patterns
  4. Better Developer Experience - Cleaner API and documentation

📊 Quality Improvements

  • Separation of Concerns - Each layer has single responsibility
  • Modular Design - Use individual layers independently
  • Testability - Each layer can be tested in isolation
  • Maintainability - Easier to understand and modify
  • Backwards Compatibility - MQWrapper alias maintained

🤝 Contributing

Contributions welcome! Please see CONTRIBUTING.md for guidelines:

  1. Fork the repo.
  2. Create a feature branch: git checkout -b feature/your-feature.
  3. Run tests locally and ensure linting passes.
  4. Commit your changes and push to your branch.
  5. Open a Pull Request against main.

📜 License

This project is licensed under the MIT License. See LICENSE for details.

📚 Documentation