@onlineapps/conn-infra-mq
v1.1.70
Published
A promise-based, broker-agnostic client for sending and receiving messages via RabbitMQ
Downloads
1,265
Maintainers
Readme
@onlineapps/conn-infra-mq
Message queue connector with layered architecture for workflow orchestration, fork-join, and retry patterns. Built on top of RabbitMQ with clean separation of concerns. Asynchronous workflow pattern only - synchronous RPC patterns are not supported and not aligned with our architecture philosophy.
🚀 Features
- Layered Architecture: Clean separation into specialized layers (WorkflowRouter, QueueManager, ForkJoinHandler, RetryHandler)
- Workflow Orchestration: Decentralized workflow routing without central orchestrator
- Fork-Join Pattern: Parallel processing with result aggregation and built-in join strategies
- Asynchronous First: All communication is asynchronous (fire-and-forget), no synchronous blocking patterns
- Automatic Retry: Exponential backoff, dead letter queue management, configurable retry policies
- Queue Management: TTL, DLQ, auto-delete, temporary queues, exchange bindings
- Promise-based API: All operations return promises for clean async/await usage
- Built-in Serialization: JSON with custom error handling
- Config Validation: Strict schema validation via Ajv
- Extensible Transport: Clear separation between core logic and transport layer
📦 Installation
npm install @onlineapps/conn-infra-mq
# or
yarn add @onlineapps/conn-infra-mqRequires Node.js ≥12. For RabbitMQ usage, ensure an accessible AMQP server.
🏗️ Architecture
ConnectorMQClient (main orchestrator - for business services only)
├── BaseClient (core AMQP operations)
├── WorkflowRouter (workflow orchestration)
├── QueueManager (queue lifecycle management)
├── ForkJoinHandler (parallel processing)
└── RetryHandler (error recovery & DLQ)WorkflowRouter - How It Works
Purpose: Handles workflow routing between services in a decentralized architecture.
Key Methods:
publishWorkflowInit(workflow, options)- Publishes workflow toworkflow.initqueue (entry point)publishToServiceWorkflow(serviceName, message, options)- Routes to specific service's workflow queuepublishWorkflowCompleted(result, options)- Publishes completed workflow toworkflow.completedqueueconsumeWorkflowInit(handler, options)- Consumes fromworkflow.init(competing consumers pattern)consumeServiceWorkflow(serviceName, handler, options)- Consumes from service-specific workflow queue
How It Works:
- Gateway publishes to
workflow.initviapublishWorkflowInit() - Business services (competing consumers) consume from
workflow.initviaconsumeWorkflowInit() - Services route to next service via
publishToServiceWorkflow() - Final service publishes completion via
publishWorkflowCompleted()
Note: WorkflowRouter is part of conn-infra-mq connector (for business services). Infrastructure services (gateway) should use the underlying MQ client library directly, not the connector.
🔧 Quick Start
'use strict';
const ConnectorMQClient = require('@onlineapps/conn-infra-mq');
(async () => {
// 1. Create client with configuration
const client = new ConnectorMQClient({
host: 'amqp://localhost:5672',
serviceName: 'my-service',
queue: 'default-queue',
durable: true,
prefetch: 5, // Default prefetch count for consumers
noAck: false, // Default auto-acknowledge = false
retryPolicy: { // Optional reconnection policy (not enforced in v1.0.0)
retries: 5,
initialDelayMs: 1000,
maxDelayMs: 30000,
factor: 2
}
});
// 2. Register a global error handler
client.onError(err => {
console.error('[AgentMQClient] Error:', err);
});
// 3. Connect to RabbitMQ
try {
await client.connect();
console.log('Connected to broker');
} catch (err) {
console.error('Connection failed:', err);
process.exit(1);
}
// 4. Publish a sample message
const samplePayload = { taskId: 'abc123', action: 'processData', timestamp: Date.now() };
try {
await client.publish('job_queue', samplePayload, {
persistent: true,
headers: { origin: 'quickStart' }
});
console.log('Message published:', samplePayload);
} catch (err) {
console.error('Publish error:', err);
}
// 5. Consume messages
try {
await client.consume(
'job_queue',
async (msg) => {
const data = JSON.parse(msg.content.toString('utf8'));
console.log('Received:', data);
// Process message...
await client.ack(msg);
},
{ prefetch: 5, noAck: false }
);
console.log('Consuming from "job_queue"...');
} catch (err) {
console.error('Consume error:', err);
}
// 6. Graceful shutdown on SIGINT
process.on('SIGINT', async () => {
console.log('Shutting down...');
try {
await client.disconnect();
console.log('Disconnected, exiting.');
process.exit(0);
} catch (discErr) {
console.error('Error during disconnect:', discErr);
process.exit(1);
}
});
})();📄 Configuration
Configuration can be provided to the AgentMQClient constructor or as overrides to connect(). Below is a summary of supported fields (see docs/api.md for full details):
| Field | Type | Description | Default |
| ------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- |
| type | string | Transport type: 'rabbitmq' | 'rabbitmq' |
| host | string | Connection URI or hostname. For RabbitMQ: e.g. 'amqp://user:pass@localhost:5672'. | Required |
| queue | string | Default queue name for publish/consume if not overridden per call. | '' |
| exchange | string | Default exchange name. Empty string uses the default direct exchange. | '' |
| durable | boolean | Declare queues/exchanges as durable. | true |
| prefetch | integer | Default prefetch count for consumers. | 1 |
| noAck | boolean | Default auto-acknowledge setting for consumers. If true, messages will be auto-acked. | false |
| logger | object | Custom logger with methods: info(), warn(), error(), debug(). If omitted, console is used. | null |
| retryPolicy | object | Reconnection policy with properties:‒ retries (number)‒ initialDelayMs (ms)‒ maxDelayMs (ms)‒ factor (multiplier). Not enforced in v1.0.0. | { retries: 5, initialDelayMs: 1000, maxDelayMs: 30000, factor: 2 } |
🛠️ API Reference
For full class and method documentation, including parameter descriptions, return values, and error details, see docs/api.md.
✅ Testing
npm test # All tests
npm run test:unit # Unit tests only
npm run test:component # Component tests
npm run test:integration # Integration testsTest Coverage Status
- Overall Coverage: 24.52% (improving after refactoring)
- Passing Tests: 75/104 (72%)
- Test Suites: 10/14 passing
- Well Tested: Config, Transports, Error handling (100%)
- Needs Testing: New layers (1-5%)
See Test Report for detailed coverage analysis.
🎨 Coding Standards
- Linting: ESLint (
eslint:recommended+ Prettier). - Formatting: Prettier — check with
npm run prettier:check, fix withnpm run prettier:fix. - Testing: Jest, aiming for ≥90% coverage.
🎯 Refactoring Benefits
✅ What Was Achieved
- Clean Layered Architecture - Separated into specialized layers
- Removed Technical Debt - Replaced MQWrapper with cleaner design
- Improved Extensibility - Easy to add new patterns
- Better Developer Experience - Cleaner API and documentation
📊 Quality Improvements
- Separation of Concerns - Each layer has single responsibility
- Modular Design - Use individual layers independently
- Testability - Each layer can be tested in isolation
- Maintainability - Easier to understand and modify
- Backwards Compatibility - MQWrapper alias maintained
🤝 Contributing
Contributions welcome! Please see CONTRIBUTING.md for guidelines:
- Fork the repo.
- Create a feature branch:
git checkout -b feature/your-feature. - Run tests locally and ensure linting passes.
- Commit your changes and push to your branch.
- Open a Pull Request against
main.
📜 License
This project is licensed under the MIT License. See LICENSE for details.
