@klsakura/rocketmq-native-sdk
v0.1.2
Published
High-performance Node.js client SDK for RocketMQ with Native Addon support (Node.js 16+)
Downloads
4
Maintainers
Readme
RocketMQ Native SDK
High-performance Node.js client for Apache RocketMQ with native addon support.
Installation
npm install @klsakura/rocketmq-native-sdkRequirements
- Node.js 16.0.0 or higher
- Supported platforms: macOS (ARM64/x64), Linux (x64), Windows (x64)
Quick Start
CommonJS
const { MQClient, MessageProperties } = require('@klsakura/rocketmq-native-sdk');
// Create client
const config = {
endpoint: 'your-rocketmq-endpoint',
accessKeyId: 'your-access-key-id',
accessKeySecret: 'your-access-key-secret',
instanceId: 'your-instance-id'
};
const client = new MQClient(config);
// Create producer
const producer = await client.getProducer('your-instance-id', 'your-topic');
// Send message
const result = await producer.publishMessage('Hello RocketMQ!', 'demo-tag');
console.log('Message sent:', result.messageId);
// Create consumer
const consumer = await client.getConsumer('your-instance-id', 'your-topic', 'your-group');
// Set message handler
consumer.onMessage(async (message) => {
console.log('Received:', message.body);
await consumer.ackMessage(message.receiptHandle);
});
// Start consuming
consumer.startReceiving();ES Module
import { MQClient, MessageProperties } from '@klsakura/rocketmq-native-sdk';
const config = {
endpoint: 'your-rocketmq-endpoint',
accessKeyId: 'your-access-key-id',
accessKeySecret: 'your-access-key-secret',
instanceId: 'your-instance-id'
};
const client = new MQClient(config);
const producer = await client.getProducer('instance-id', 'topic');
// Send message with properties
const properties = new MessageProperties()
.putProperty('source', 'demo')
.messageKey('unique-key-001');
await producer.publishMessage(
{ message: 'Hello from ES Module!' },
'demo-tag',
properties
);Producer Examples
1. Normal Message (普通消息)
const producer = await client.getProducer('instance-id', 'your-topic');
// Simple message
await producer.publishMessage('Hello World');
// Message with tag
await producer.publishMessage('Hello World', 'order-tag');
// Message with properties
const properties = new MessageProperties()
.putProperty('userId', '12345')
.putProperty('source', 'web')
.messageKey('order-001');
const result = await producer.publishMessage({
orderId: 'ORDER-001',
amount: 99.99
}, 'order-tag', properties);
console.log('Normal message sent:', result.messageId);2. Ordered Message (顺序消息)
// Ordered messages with same sharding key will be delivered in order
const shardingKey = 'user-12345'; // Messages with same key are ordered
await producer.publishOrderedMessage(
{ step: 1, action: 'create_order' },
'order-step',
properties,
shardingKey
);
await producer.publishOrderedMessage(
{ step: 2, action: 'pay_order' },
'order-step',
properties,
shardingKey
);
await producer.publishOrderedMessage(
{ step: 3, action: 'ship_order' },
'order-step',
properties,
shardingKey
);
console.log('Ordered messages sent');3. Delay Message (延迟消息)
// Method 1: Using delayTimeLevel (1-18, each level represents different delay time)
const delayOptions1 = {
delayTimeLevel: 3 // Level 3 = 10 seconds delay
};
await producer.publishDelayMessage(
'This message will be delivered after 10 seconds',
'delay-tag',
null,
delayOptions1
);
// Method 2: Using specific timestamp
const delayOptions2 = {
startDeliverTime: Date.now() + 60000 // Deliver after 1 minute
};
await producer.publishDelayMessage(
{ reminder: 'Meeting in 1 minute' },
'reminder-tag',
properties,
delayOptions2
);
// Method 3: Using MessageProperties for delay
const delayProperties = new MessageProperties()
.putProperty('businessType', 'reminder')
.startDeliverTime(Date.now() + 300000); // 5 minutes delay
await producer.publishDelayMessage(
'Delayed reminder message',
'reminder-tag',
delayProperties
);
console.log('Delay messages sent');4. Advanced Message Properties
const properties = new MessageProperties()
.putProperty('userId', '12345') // Custom property
.putProperty('orderType', 'premium') // Business property
.messageKey('unique-business-key-001') // Message key for deduplication
.shardingKey('user-12345') // Sharding key for ordered messages
.startDeliverTime(Date.now() + 30000) // Delay 30 seconds
.transCheckImmunityTime(60); // Transaction check immunity time
const result = await producer.publishMessage(
{
orderId: 'ORDER-001',
userId: '12345',
amount: 199.99,
items: ['item1', 'item2']
},
'premium-order',
properties
);API Reference
MQClient
const client = new MQClient(config);Config object:
endpoint- RocketMQ endpoint URLaccessKeyId- Access key IDaccessKeySecret- Access key secretinstanceId- Instance ID
Methods:
getProducer(instanceId, topic)- Create producergetConsumer(instanceId, topic, groupId, tagExpression)- Create consumerhealthCheck()- Health check
Producer
const producer = await client.getProducer(instanceId, topic);Methods:
publishMessage(body, tag?, properties?)- Send normal messagepublishOrderedMessage(body, tag?, properties?, shardingKey?)- Send ordered messagepublishDelayMessage(body, tag?, properties?, options?)- Send delay messageshutdown()- Close producer
DelayOptions:
delayTimeLevel- Delay level (1-18), each level represents different delay timestartDeliverTime- Specific timestamp for message delivery
Consumer
const consumer = await client.getConsumer(instanceId, topic, groupId);Methods:
onMessage(handler)- Set message handlerstartReceiving(tagExpression?)- Start consumingackMessage(receiptHandle)- Acknowledge messageshutdown()- Close consumer
MessageProperties
const properties = new MessageProperties()
.putProperty('key', 'value') // Custom properties
.messageKey('unique-key') // Message key
.shardingKey('partition-key') // Sharding key for ordering
.startDeliverTime(Date.now() + 60000) // Delay delivery
.transCheckImmunityTime(60); // Transaction immunity timeMethods:
putProperty(key, value)- Add custom propertymessageKey(key)- Set message key for deduplicationshardingKey(key)- Set sharding key for ordered messagesstartDeliverTime(timestamp)- Set delay delivery timetransCheckImmunityTime(seconds)- Set transaction check immunity time
License
MIT
