rocketmq-client-nodejs-bate
v1.2.3
Published
RocketMQ Node.js Client
Readme
The Node.js Implementation of Apache RocketMQ Client
English | 简体中文 | RocketMQ Website
Overview
Here are some preparations you may need to know (or refer to quick start).
- Node.js 16.19.0 is the minimum version required, Node.js >= 18.17.0 is the recommended version.
- Setup namesrv, broker, and proxy.
Getting Started
We are using npm as the dependency management & publishing tool. You can find out more details about npm from its website. Here is the related command of npm you may use for development.
# Installs the project dependencies.
npm install
# Init grpc codes.
npm run init
# Run the unit tests.
npm test
# Installs rocketmq nodejs client
npm i rocketmq-client-nodejsEnable trace debug log for grpc-js:
GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm testPublishing Steps
To publish a package to npm, please register an account in advance, then execute the following command.
# Builds a package and publishes it to the npm repository.
npm publishExamples
Normal Message
Producer
import { Producer } from 'rocketmq-client-nodejs';
const producer = new Producer({
endpoints: '127.0.0.1:8081',
});
await producer.startup();
const receipt = await producer.send({
topic: 'TopicTest',
tag: 'nodejs-demo',
body: Buffer.from(JSON.stringify({
hello: 'rocketmq-client-nodejs world 😄',
now: Date(),
})),
});
console.log(receipt);Delay Message with Recall
Send and recall a delayed message:
import { Producer } from 'rocketmq-client-nodejs';
const producer = new Producer({
endpoints: '127.0.0.1:8081',
});
await producer.startup();
// Send a delay message (will be delivered after 10 seconds)
const receipt = await producer.send({
topic: 'DelayTopic',
tag: 'delay-recall',
delay: 10000, // 10 seconds delay
body: Buffer.from('This is a delayed message'),
});
console.log('Message sent:', {
messageId: receipt.messageId,
recallHandle: receipt.recallHandle, // Handle for recalling the message
});
// Recall the message before it's delivered (within 10 seconds)
try {
const recallReceipt = await producer.recallMessage(
'DelayTopic',
receipt.recallHandle
);
console.log('Message recalled successfully:', recallReceipt.messageId);
} catch (error) {
console.error('Failed to recall message:', error);
}
await producer.shutdown();SimpleConsumer
import { SimpleConsumer } from 'rocketmq-client-nodejs';
const simpleConsumer = new SimpleConsumer({
consumerGroup: 'nodejs-demo-group',
endpoints: '127.0.0.1:8081',
subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
});
await simpleConsumer.startup();
const messages = await simpleConsumer.receive(20);
console.log('got %d messages', messages.length);
for (const message of messages) {
console.log(message);
console.log('body=%o', message.body.toString());
await simpleConsumer.ack(message);
}Push Consumer
PushConsumer actively pulls messages from the server and pushes them to the listener for processing:
import { PushConsumer, ConsumeResult, type MessageView } from 'rocketmq-client-nodejs';
// Create PushConsumer instance
const pushConsumer = new PushConsumer({
namespace: '', // Namespace, can be empty string
endpoints: '127.0.0.1:8081',
consumerGroup: 'yourConsumerGroup',
// Subscribe to topic and TAG
subscriptions: new Map([
['yourTopic', '*'], // Subscribe to yourTopic, receive all TAGs
]),
// Message listener - core processing logic
messageListener: {
async consume(messageView: MessageView): Promise<ConsumeResult> {
console.log('Received message:', messageView.body.toString('utf-8'));
// TODO: Process your business logic here
return ConsumeResult.SUCCESS; // Return SUCCESS after successful processing
},
},
});
try {
// Start consumer
await pushConsumer.startup();
console.log('PushConsumer started, waiting for messages...');
// Keep running, waiting for messages
await new Promise(() => {});
} catch (error) {
console.error('Error:', error);
await pushConsumer.shutdown();
throw error;
}Current Progress
Message Type
- [x] NORMAL
- [x] FIFO
- [x] DELAY
- [x] TRANSACTION
Client Type
- [x] PRODUCER
- [x] SIMPLE_CONSUMER
- [x] PUSH_CONSUMER
- [ ] PULL_CONSUMER
