mq-client.js
v1.1.0
Published
A lightweight Message Queue client library for Node.js with MQTT-like functionality and request-response patterns.
Readme
MQ Client.js
A lightweight Message Queue client library for Node.js with MQTT-like functionality and request-response patterns.
Installation
npm install mq-client.jsQuick Start
import { MQ } from 'mq-client.js';
// Connect to MQ server
await MQ.connect('mq://root:root@localhost:40000');
// Subscribe to topics
await MQ.subscribe('sensors/temperature', (topic, data) => {
console.log(`Received from ${topic}:`, data.toString());
});
// Publish messages
await MQ.publish('sensors/temperature', '25°C');
// Request-response pattern
const response = await MQ.request('device/info', 'get_status');
console.log('Response:', response.toString());Connection URL Format
The library uses a custom URL format for connection:
mq://username:password@host:portExample:
await MQ.connect('mq://root:root@localhost:40000');
await MQ.connect('mq://admin:[email protected]:40000');API Reference
MQClient Class
Core Methods
connect(url: string): Promise<void>
Connects to the message queue server.
await MQ.connect('mq://root:root@localhost:40000');subscribe(topic: string, callback: MessageCallback): Promise<void>
Subscribes to a topic and receives messages.
await MQ.subscribe('sensors/#', (topic, data) => {
console.log(`Message on ${topic}:`, data.toString());
});publish(topic: string, data: Buffer | string): Promise<void>
Publishes a message to a topic.
await MQ.publish('sensors/temperature', '25°C');
await MQ.publish('commands/restart', Buffer.from([0x01]));request(topic: string, data: Buffer | string, timeout?: number): Promise<Buffer>
Sends a request and waits for a response.
try {
const response = await MQ.request('device/info', 'get_status', 5000);
console.log('Device info:', response.toString());
} catch (error) {
console.error('Request failed:', error);
}service(topic: string, callback: RequestCallback): Promise<void>
Registers a service to handle requests.
await MQ.service('math/add', (data, reply) => {
const numbers = JSON.parse(data.toString());
const result = numbers.a + numbers.b;
reply(null, Buffer.from(JSON.stringify({ result })));
});MQTT Compatibility Methods
mqttSubscribe(topic: string, cb: MessageCallback): Promise<void>
MQTT-style subscription.
mqttPublish(topic: string, data: Buffer | string): Promise<void>
MQTT-style publishing.
Utility Methods
getInfo(timeout?: number): Promise<Buffer>
Gets server information.
const info = await MQ.getInfo();
console.log('Server info:', info.toString());getLog(data: Buffer | string, timeout?: number): Promise<Buffer>
Retrieves logs from server.
getData(data: Buffer | string, timeout?: number): Promise<Buffer>
Gets data from server.
getAlarm(data: Buffer | string, timeout?: number): Promise<Buffer>
Retrieves alarm information.
getGEO(data: Buffer | string, timeout?: number): Promise<Buffer>
Gets geographical data.
sendMail(data: Buffer | string, timeout?: number): Promise<Buffer>
Sends email through server.
set(topic: string, data: Buffer | string): Promise<Buffer>
Sets data for a topic.
get(topic: string): Promise<Buffer>
Gets data from a topic.
ping(): Promise<void>
Sends ping to server.
pong(): Promise<void>
Sends pong response.
close(): void
Closes the connection.
Message Class
Represents a message in the system.
import { Message } from 'mq-client.js';
const message = new Message('PUBLISH', 'sensors/temp', 'app1', 'req123', Buffer.from('data'));
// Convert to bytes for transmission
const bytes = message.toBytes();
// Recreate from received bytes
const receivedMessage = new Message().fromBytes(bytes);Static constructors:
Message.newMessage(command, topic, topicApp, requestId, data)Message.newSimpleMessage(command, data)
RequestData Class
Wrapper for request response data.
import { RequestData } from 'mq-client.js';
const response = new RequestData(Buffer.from('success'), null);Types
MessageCallback
type MessageCallback = (topic: string, data: Buffer) => void;RequestCallback
type RequestCallback = (data: Buffer, reply: (err: Error | null, data?: Buffer) => void) => void;Examples
Basic Pub/Sub Pattern
import { MQ } from 'mq-client.js';
// Connect to server
await MQ.connect('mq://root:root@localhost:40000');
// Publisher
await MQ.publish('news/updates', JSON.stringify({
title: 'Latest News',
content: 'Breaking news content...'
}));
// Subscriber
await MQ.subscribe('news/updates', (topic, data) => {
const news = JSON.parse(data.toString());
console.log('Received news:', news.title);
});Request/Response Service
import { MQ } from 'mq-client.js';
await MQ.connect('mq://root:root@localhost:40000');
// Service provider (server side)
await MQ.service('calculator/multiply', (data, reply) => {
try {
const { a, b } = JSON.parse(data.toString());
const result = a * b;
reply(null, Buffer.from(JSON.stringify({ result })));
} catch (error) {
reply(new Error('Invalid input'));
}
});
// Client (requestor)
const numbers = { a: 5, b: 7 };
const response = await MQ.request('calculator/multiply', JSON.stringify(numbers));
const { result } = JSON.parse(response.toString());
console.log(`5 * 7 = ${result}`); // 5 * 7 = 35WebSocket Room Emission
await MQ.connect('mq://root:root@localhost:40000');
// Emit to specific room
await MQ.ws_emit('room1', 'chat/message', JSON.stringify({
user: 'john',
message: 'Hello everyone!'
}));Error Handling
import { MQ } from 'mq-client.js';
try {
await MQ.connect('mq://invalid:credentials@localhost:40000');
} catch (error) {
console.error('Connection failed:', error.message);
}
try {
const response = await MQ.request('unknown/service', 'data', 3000);
} catch (error) {
console.error('Request timeout:', error.message);
}
// Service error handling
await MQ.service('api/users', (data, reply) => {
try {
// Process data
const result = processUserData(data);
reply(null, Buffer.from(JSON.stringify(result)));
} catch (error) {
// Send error back to client
reply(error);
}
});Using Buffer Data
import { MQ } from 'mq-client.js';
await MQ.connect('mq://root:root@localhost:40000');
// Send binary data
const binaryData = Buffer.from([0x01, 0x02, 0x03, 0x04]);
await MQ.publish('binary/data', binaryData);
await MQ.subscribe('binary/data', (topic, data) => {
console.log('Received binary data:', data); // Buffer object
console.log('As hex:', data.toString('hex')); // As hexadecimal string
});Features
- ✅ MQTT-like pub/sub messaging
- ✅ Request-response pattern
- ✅ Service registration and discovery
- ✅ Binary data support (Buffer)
- ✅ Authentication support
- ✅ WebSocket room functionality
- ✅ Built-in utility methods for common operations
- ✅ TypeScript definitions included
License
MIT
