nextrans-mq
v3.0.1
Published
Message queue service for nextrans app service
Readme
nextrans-mq
RabbitMQ utility package for Nextrans services.
Installation
npm install nextrans-mqThis package uses:
amqp-connection-manageramqplib
Make sure both are available in your project (they are defined as peer dependencies).
Exports
import { MQConnection, MQService, MQPayload } from 'nextrans-mq';Basic Usage
Create your own service class by extending MQService, then expose public methods that call the protected send and consume methods.
import { MQConnection, MQService } from 'nextrans-mq';
type ExampleMessage = {
id: string;
text: string;
};
class ExampleMQService extends MQService {
constructor(connection: MQConnection, private readonly channelName: string) {
super(connection);
}
async publish(data: ExampleMessage) {
return this.send({
channelName: this.channelName,
from: 'example-service',
data,
options: {
timeout: 500,
},
});
}
async subscribe(callback: (msg: ExampleMessage) => Promise<void> | void) {
return this.consume(
{
channelName: this.channelName,
concurrency: 1,
queuesPerSecond: 1,
},
callback,
);
}
}
const connection = new MQConnection({
host: process.env.RABBITMQ_HOST || 'amqp://localhost:5672',
options: {
heartbeatIntervalInSeconds: 15,
reconnectTimeInSeconds: 5,
},
});
const mq = new ExampleMQService(connection, 'example-channel');
await mq.publish({ id: '1', text: 'Hello World' });
await mq.subscribe(async (payload) => {
console.log('Received payload:', payload);
});Message Format
When sending plain data, the library wraps it into MQPayload<T> internally:
type MQPayload<T> = {
from: string;
messageId: string;
timestamp: number;
data: T;
};You can also pass a full MQPayload<T> into send.
API Reference
MQConnection
new MQConnection({
host: 'amqp://localhost:5672',
options: {
heartbeatIntervalInSeconds?: number;
reconnectTimeInSeconds?: number;
connectionOptions?: AmqpConnectionManagerOptions['connectionOptions'];
},
});MQService (base class)
protected send<T>(params)protected consume<T>(params, callback)
Use through a subclass with public wrapper methods.
Consume Options
channelName: stringconcurrency: numberqueuesPerSecond: numberthrowOnError?: boolean
Notes
- Queue is asserted with
durable: true. - Message processing uses
ackafter successful callback execution. sendretries once when a timeout occurs.
