mqtt-client-tss
v1.1.0
Published
Generic MQTT Client supporting MQTT 3.1, 3.1.1, and 5.0 with TCP, TLS, WebSocket, and WSS transports
Maintainers
Readme
mqtt-client-tss
A modular, enterprise-grade MQTT client library written in TypeScript. Designed for reliability and performance, this library supports MQTT 3.1, 3.1.1, and 5.0 protocols with TCP, TLS/SSL, WebSocket, and Secure WebSocket transports.
Table of Contents
- Features
- Installation
- Quick Start
- Connection Options
- API Reference
- Transport Examples
- Protocol Versions
- MQTT 5.0 Features
- Advanced Features
- Modular Architecture
- TypeScript Support
- Building from Source
- License
Features
Protocol Support
- MQTT 3.1, 3.1.1, and 5.0
- Full QoS 0, 1, and 2 support with proper acknowledgment handling
- Will Messages (Last Will and Testament)
- Retained messages
- Clean session / Clean start
Transport Options
- TCP (plain socket)
- TLS/SSL (encrypted socket with certificate support)
- WebSocket (WS)
- Secure WebSocket (WSS)
Reliability
- Automatic reconnection with configurable backoff
- Keep-alive ping/pong handling
- Message queuing for offline publishing
- Persistent message store for QoS 1/2 delivery guarantees
Performance
- Rate limiting with token bucket algorithm
- Connection pooling for load balancing
- Metrics collection for monitoring
- Efficient packet parsing with buffering
Developer Experience
- Written in TypeScript with full type definitions
- Modular architecture for selective imports
- Configurable logging with multiple levels
- Event-driven API
Installation
npm install mqtt-client-tssDependencies
The library requires the ws package for WebSocket support:
npm install wsFor TypeScript projects, install the type definitions:
npm install --save-dev @types/ws @types/node typescriptQuick Start
import { createMqttClient } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 1883,
transport: 'tcp',
clientId: 'my-app-001',
});
client.on('connect', () => {
console.log('Connected to broker');
});
client.on('message', (topic, payload, message) => {
console.log(`${topic}: ${payload.toString()}`);
});
client.on('error', (error) => {
console.error('Connection error:', error.message);
});
async function main() {
await client.connect();
await client.subscribe('sensors/#', { qos: 1 });
await client.publish('sensors/temperature', '23.5', { qos: 1 });
}
main();Connection Options
interface MqttClientOptions {
// Required
host: string;
port: number;
// Transport (default: 'tcp')
transport?: 'tcp' | 'tls' | 'ws' | 'wss';
// Protocol version (default: 4, which is MQTT 3.1.1)
protocolVersion?: 3 | 4 | 5;
// Client identification
clientId?: string;
username?: string;
password?: string;
// Session management
cleanSession?: boolean; // MQTT 3.x
cleanStart?: boolean; // MQTT 5.0
sessionExpiryInterval?: number; // MQTT 5.0, in seconds
// Timing
keepAlive?: number; // Seconds (default: 60)
connectTimeout?: number; // Milliseconds (default: 30000)
// Reconnection
reconnect?: boolean; // Enable auto-reconnect (default: true)
reconnectInterval?: number; // Milliseconds (default: 1000)
maxReconnectAttempts?: number; // (default: 10)
// TLS options
rejectUnauthorized?: boolean;
ca?: string | Buffer | Array<string | Buffer>;
cert?: string | Buffer;
key?: string | Buffer;
passphrase?: string;
tlsOptions?: tls.ConnectionOptions;
// WebSocket options
wsPath?: string; // (default: '/mqtt')
wsHeaders?: Record<string, string>;
wsProtocols?: string | string[];
// Will message
will?: {
topic: string;
payload: string | Buffer;
qos?: 0 | 1 | 2;
retain?: boolean;
properties?: MqttProperties;
};
// MQTT 5.0 connection properties
properties?: MqttProperties;
// Advanced options
queueSize?: number; // Offline queue size (default: 1000)
rateLimit?: number; // Messages per second
enableMetrics?: boolean; // Enable metrics collection (default: true)
enableLogging?: boolean; // Enable logging (default: false)
logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent';
}API Reference
Creating a Client
import { createMqttClient, MqttClient } from 'mqtt-client-tss';
// Using factory function
const client = createMqttClient({
host: 'localhost',
port: 1883,
});
// Using constructor
const client = new MqttClient({
host: 'localhost',
port: 1883,
});Connecting
await client.connect();The connect method returns a Promise that resolves when the connection is established and the CONNACK packet is received. It rejects if the connection fails or times out.
Publishing Messages
// QoS 0 - Fire and forget
await client.publish('topic/test', 'Hello World');
// QoS 1 - At least once delivery
await client.publish('topic/important', 'Important message', { qos: 1 });
// QoS 2 - Exactly once delivery
await client.publish('topic/critical', 'Critical data', { qos: 2 });
// With retain flag
await client.publish('device/status', 'online', { qos: 1, retain: true });
// With MQTT 5.0 properties
await client.publish('data/sensor', JSON.stringify({ temp: 25.5 }), {
qos: 1,
properties: {
messageExpiryInterval: 3600,
contentType: 'application/json',
userProperties: [
{ key: 'device-id', value: 'sensor-001' }
]
}
});Subscribing to Topics
// Single topic
await client.subscribe('sensors/temperature');
// Single topic with QoS
await client.subscribe('sensors/+/temperature', { qos: 1 });
// Multiple topics with same options
await client.subscribe(['sensors/#', 'alerts/#'], { qos: 1 });
// Multiple topics with different options
const grants = await client.subscribe({
'sensors/temperature': { qos: 0 },
'sensors/humidity': { qos: 1 },
'alerts/critical': { qos: 2 }
});
// Check granted QoS levels
grants.forEach(grant => {
console.log(`${grant.topic}: QoS ${grant.qos}`);
});MQTT 5.0 subscription options:
await client.subscribe('commands/#', {
qos: 1,
nl: true, // No Local - do not receive own messages
rap: true, // Retain As Published - preserve retain flag
rh: 1 // Retain Handling: 0=always, 1=if new sub, 2=never
});Unsubscribing
// Single topic
await client.unsubscribe('sensors/temperature');
// Multiple topics
await client.unsubscribe(['sensors/#', 'alerts/#']);Disconnecting
// Normal disconnect
await client.disconnect();
// MQTT 5.0 disconnect with reason code
import { ReasonCode } from 'mqtt-client-tss';
await client.disconnect(ReasonCode.NormalDisconnection, {
reasonString: 'Client shutting down'
});
// Force close without sending DISCONNECT packet
client.end(true);Events
// Connection established
client.on('connect', ({ sessionPresent, returnCode, properties }) => {
console.log('Connected, session present:', sessionPresent);
});
// Connection closed
client.on('close', () => {
console.log('Disconnected');
});
// Reconnecting
client.on('reconnect', ({ attempt, delay }) => {
console.log(`Reconnect attempt ${attempt} in ${delay}ms`);
});
// Reconnection failed
client.on('reconnect_failed', () => {
console.log('Max reconnection attempts reached');
});
// Message received
client.on('message', (topic, payload, message) => {
console.log(`Topic: ${topic}`);
console.log(`Payload: ${payload.toString()}`);
console.log(`QoS: ${message.qos}`);
console.log(`Retain: ${message.retain}`);
if (message.properties) {
console.log(`Properties:`, message.properties);
}
});
// Error occurred
client.on('error', (error) => {
console.error('Error:', error.message);
});
// Ping response (keep-alive)
client.on('pingresp', () => {
console.log('Ping response received');
});
// MQTT 5.0 disconnect from broker
client.on('disconnect', ({ reasonCode, properties }) => {
console.log('Broker disconnected us:', reasonCode);
});Transport Examples
TCP
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 1883,
transport: 'tcp',
});TLS/SSL
import * as fs from 'fs';
const client = createMqttClient({
host: 'broker.example.com',
port: 8883,
transport: 'tls',
rejectUnauthorized: true,
ca: fs.readFileSync('ca.crt'),
cert: fs.readFileSync('client.crt'),
key: fs.readFileSync('client.key'),
passphrase: 'key-password',
});WebSocket
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 8000,
transport: 'ws',
wsPath: '/mqtt',
});Secure WebSocket
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 8884,
transport: 'wss',
wsPath: '/mqtt',
wsHeaders: {
'Authorization': 'Bearer your-token'
},
});Protocol Versions
MQTT 3.1
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 3,
cleanSession: true,
});MQTT 3.1.1 (Default)
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 4,
cleanSession: true,
});MQTT 5.0
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 5,
cleanStart: true,
sessionExpiryInterval: 3600,
properties: {
receiveMaximum: 100,
topicAliasMaximum: 10,
requestResponseInformation: 1,
},
});MQTT 5.0 Features
Connection Properties
| Property | Type | Description | |----------|------|-------------| | sessionExpiryInterval | number | Session expiry in seconds | | receiveMaximum | number | Max inflight QoS 1/2 messages | | maximumPacketSize | number | Max packet size in bytes | | topicAliasMaximum | number | Max topic alias value | | requestResponseInformation | 0 or 1 | Request response info from broker | | requestProblemInformation | 0 or 1 | Request problem info on failures | | userProperties | Array | Custom key-value pairs | | authenticationMethod | string | Authentication method name | | authenticationData | Buffer | Authentication data |
Publish Properties
| Property | Type | Description | |----------|------|-------------| | payloadFormatIndicator | 0 or 1 | 0=bytes, 1=UTF-8 string | | messageExpiryInterval | number | Message expiry in seconds | | topicAlias | number | Topic alias to use | | responseTopic | string | Response topic for request/response | | correlationData | Buffer | Correlation data for request/response | | contentType | string | MIME content type | | userProperties | Array | Custom key-value pairs |
Reason Codes
import { ReasonCode } from 'mqtt-client-tss';
ReasonCode.Success // 0x00
ReasonCode.NormalDisconnection // 0x00
ReasonCode.UnspecifiedError // 0x80
ReasonCode.MalformedPacket // 0x81
ReasonCode.ProtocolError // 0x82
ReasonCode.NotAuthorized // 0x87
ReasonCode.ServerUnavailable // 0x88
ReasonCode.TopicFilterInvalid // 0x8F
ReasonCode.TopicNameInvalid // 0x90
ReasonCode.QuotaExceeded // 0x97Advanced Features
Metrics Collection
The library includes built-in metrics collection for monitoring performance:
import { createMqttClient, createMetricsCollector } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'localhost',
port: 1883,
enableMetrics: true,
});
// Get metrics after some activity
const stats = client.getMetrics();
console.log(stats);
// Output:
// {
// messages: { published: 150, received: 42, publishRate: '2.50/s', receiveRate: '0.70/s' },
// bytes: { published: '15.23 KB', received: '4.12 KB', publishRate: '253.83 B/s', receiveRate: '68.67 B/s' },
// latency: { average: '12.34ms', p50: '10.00ms', p95: '25.00ms', p99: '45.00ms' },
// connection: { reconnections: 0, errors: 0, uptime: '1h 23m 45s', connectedAt: '2024-01-15T10:30:00.000Z' }
// }You can also use the MetricsCollector independently:
import { createMetricsCollector } from 'mqtt-client-tss';
const metrics = createMetricsCollector();
metrics.recordPublish(256);
metrics.recordReceive(128);
metrics.recordLatency(15);
console.log(metrics.getAverageLatency());
console.log(metrics.getLatencyPercentile(95));
console.log(metrics.getPublishRate());Rate Limiting
Prevent overwhelming the broker with a token bucket rate limiter:
import { createMqttClient } from 'mqtt-client-tss';
// Limit to 100 messages per second
const client = createMqttClient({
host: 'localhost',
port: 1883,
rateLimit: 100,
});
// Messages exceeding the rate will be queued automaticallyUsing the rate limiter independently:
import { createRateLimiter, createSlidingWindowRateLimiter } from 'mqtt-client-tss';
// Token bucket rate limiter
const limiter = createRateLimiter({
tokensPerSecond: 100,
bucketSize: 150, // Allow bursts up to 150
});
async function sendMessage() {
await limiter.acquire();
// Send message
}
// Check without blocking
if (limiter.tryAcquire()) {
// Token available, send immediately
}
// Sliding window rate limiter (more accurate for short bursts)
const slidingLimiter = createSlidingWindowRateLimiter({
windowMs: 1000, // 1 second window
maxRequests: 100, // Max 100 requests per window
});Message Queue
Messages published while disconnected are queued and sent upon reconnection:
import { createMqttClient } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'localhost',
port: 1883,
reconnect: true,
queueSize: 1000, // Queue up to 1000 messages
});
// If disconnected, this message is queued
await client.publish('status', 'online', { qos: 1 });
// Message will be sent automatically when reconnectedUsing the message queue independently:
import { createMessageQueue } from 'mqtt-client-tss';
const queue = createMessageQueue({
maxSize: 1000,
maxAge: 60000, // Messages expire after 1 minute
maxRetries: 3,
priorityLevels: 3, // Higher QoS = higher priority
});
queue.enqueue('topic', Buffer.from('payload'), { qos: 1 });
const message = queue.dequeue();
if (message) {
// Process message
}
// Get queue statistics
const stats = queue.getStats();Connection Pool
Manage multiple connections for load balancing and failover:
import { createConnectionPool, createMqttClient } from 'mqtt-client-tss';
const pool = createConnectionPool({
minConnections: 2,
maxConnections: 10,
loadBalancing: 'round-robin', // or 'least-connections', 'weighted', 'random'
healthCheckInterval: 30000,
});
// Add connections
pool.addConnection(
{ host: 'broker1.example.com', port: 1883 },
() => createMqttClient({ host: 'broker1.example.com', port: 1883 }).connect(),
{ weight: 2, priority: 1 }
);
pool.addConnection(
{ host: 'broker2.example.com', port: 1883 },
() => createMqttClient({ host: 'broker2.example.com', port: 1883 }).connect(),
{ weight: 1, priority: 0 }
);
// Start health checks
pool.startHealthChecks();
// Acquire a connection from the pool
const { id, connection } = await pool.acquire();
await connection.publish('topic', 'message');
pool.release(id);
// Get pool statistics
const stats = pool.getStats();
// Clean up
await pool.close();Topic Matcher
MQTT wildcard topic matching utility:
import { TopicMatcher, createTopicMatcher } from 'mqtt-client-tss';
// Static method for simple matching
TopicMatcher.matches('sensors/+/temperature', 'sensors/living-room/temperature'); // true
TopicMatcher.matches('sensors/#', 'sensors/a/b/c'); // true
TopicMatcher.matches('sensors/+', 'sensors/a/b'); // false
// Instance for subscription management
const matcher = createTopicMatcher();
matcher.addSubscription('sensors/#', 'subscriber-1');
matcher.addSubscription('alerts/+/critical', 'subscriber-2');
const patterns = matcher.getMatchingPatterns('sensors/temperature');
// ['sensors/#']
const subscribers = matcher.getSubscriberIds('alerts/fire/critical');
// ['subscriber-2']
// Validation
TopicMatcher.isValidTopic('sensors/temperature'); // true
TopicMatcher.isValidTopic('sensors/+/temperature'); // false (wildcards not allowed in topics)
TopicMatcher.isValidFilter('sensors/+/temperature'); // true
TopicMatcher.isValidFilter('sensors/#/more'); // false (# must be last)
// Shared subscriptions (MQTT 5.0)
TopicMatcher.isSharedSubscription('$share/group1/sensors/#'); // true
const parsed = TopicMatcher.parseSharedSubscription('$share/group1/sensors/#');
// { groupId: 'group1', topic: 'sensors/#' }Retry Manager
Intelligent retry logic with exponential backoff:
import { createRetryManager, RetryError } from 'mqtt-client-tss';
const retry = createRetryManager({
maxRetries: 5,
initialDelay: 1000,
maxDelay: 30000,
multiplier: 2,
jitter: true,
jitterFactor: 0.2,
onRetry: (attempt, error, delay) => {
console.log(`Retry ${attempt} after ${delay}ms: ${error.message}`);
}
});
// Execute with automatic retry
try {
const result = await retry.execute(async () => {
// Your operation that might fail
return await fetchData();
});
} catch (error) {
if (error instanceof RetryError) {
console.log(`Failed after ${error.attempts} attempts`);
console.log(`Last error: ${error.lastError.message}`);
}
}
// Wrap a function for reuse
const fetchWithRetry = retry.wrap(fetchData);
const result = await fetchWithRetry();
// Get the retry schedule
const schedule = retry.getSchedule();
// [1000, 2000, 4000, 8000, 16000]
const maxDelay = retry.getMaxTotalDelay();
// 31000Message Store
Persistence layer for QoS 1 and QoS 2 messages:
import { MemoryMessageStore, FileMessageStore, createMessageStore } from 'mqtt-client-tss';
// In-memory store (messages lost on restart)
const memoryStore = new MemoryMessageStore();
// File-based store (persistent across restarts)
const fileStore = new FileMessageStore('./mqtt-messages.json');
// Factory function
const store = createMessageStore({ type: 'file', filePath: './messages.json' });
// Store a message
await store.put({
packetId: 1,
topic: 'test',
payload: Buffer.from('hello'),
qos: 1,
retain: false,
timestamp: Date.now(),
retries: 0,
state: 'pending'
});
// Retrieve a message
const message = await store.get(1);
// Get all pending messages (for resending on reconnect)
const pending = await store.getAll();
// Delete after acknowledgment
await store.delete(1);Logger
Flexible logging with multiple levels:
import { createLogger, silentLogger } from 'mqtt-client-tss';
const logger = createLogger({
level: 'debug',
prefix: 'MyApp',
timestamps: true,
colors: true,
});
logger.trace('Very detailed info');
logger.debug('Debugging info');
logger.info('General info');
logger.warn('Warning message');
logger.error('Error occurred');
// Create child logger with sub-prefix
const subLogger = logger.child('Connection');
subLogger.info('Connected'); // Output: [MyApp:Connection] Connected
// Silent logger for testing
const client = createMqttClient({
host: 'localhost',
port: 1883,
enableLogging: false, // Uses silent logger
});Modular Architecture
The library is organized into independent modules that can be imported separately:
mqtt-client-tss/
src/
index.ts # Main entry point, exports everything
types.ts # TypeScript type definitions
client/
mqtt-client.ts # Main MQTT client implementation
index.ts
protocol/
packet-builder.ts # Constructs MQTT packets
packet-parser.ts # Parses incoming MQTT packets
property-encoder.ts # Encodes MQTT 5.0 properties
property-decoder.ts # Decodes MQTT 5.0 properties
index.ts
utils/
logger.ts # Logging utility
topic-matcher.ts # Wildcard topic matching
message-store.ts # Message persistence
metrics.ts # Performance metrics
rate-limiter.ts # Rate limiting
message-queue.ts # Offline message queue
retry-manager.ts # Retry with backoff
connection-pool.ts # Connection pooling
index.tsSelective Imports
// Import only what you need
import { MqttClient, createMqttClient } from 'mqtt-client-tss';
import { TopicMatcher } from 'mqtt-client-tss';
import { createRateLimiter } from 'mqtt-client-tss';
// Or import from specific modules
import { PacketBuilder, PacketParser } from 'mqtt-client-tss/dist/protocol';
import { Logger, MetricsCollector } from 'mqtt-client-tss/dist/utils';TypeScript Support
The library is written in TypeScript and provides complete type definitions:
import {
MqttClient,
createMqttClient,
MqttClientOptions,
MqttMessage,
MqttProperties,
PublishOptions,
SubscribeOptions,
SubscriptionGrant,
QoS,
TransportType,
MqttProtocolVersion,
ReasonCode,
ConnectionState,
PacketType,
} from 'mqtt-client-tss';
const options: MqttClientOptions = {
host: 'localhost',
port: 1883,
transport: 'tcp' as TransportType,
protocolVersion: 5 as MqttProtocolVersion,
};
const client: MqttClient = createMqttClient(options);
client.on('message', (topic: string, payload: Buffer, message: MqttMessage) => {
const qos: QoS = message.qos;
const properties: MqttProperties | undefined = message.properties;
});
const publishOpts: PublishOptions = {
qos: 1,
retain: false,
properties: {
messageExpiryInterval: 3600,
},
};
const grants: SubscriptionGrant[] = await client.subscribe('test/#', { qos: 1 });Building from Source
# Clone the repository
git clone <repository-url>
cd mqtt-client-tss
# Install dependencies
npm install
# Build TypeScript
npm run build
# Clean and rebuild
npm run clean && npm run buildThe compiled JavaScript files will be in the dist directory, along with TypeScript declaration files (.d.ts) and source maps.
File Structure After Build
dist/
index.js # Main entry point
index.d.ts # Type declarations
types.js
types.d.ts
client/
mqtt-client.js
mqtt-client.d.ts
index.js
index.d.ts
protocol/
packet-builder.js
packet-builder.d.ts
packet-parser.js
packet-parser.d.ts
property-encoder.js
property-encoder.d.ts
property-decoder.js
property-decoder.d.ts
index.js
index.d.ts
utils/
logger.js
logger.d.ts
topic-matcher.js
topic-matcher.d.ts
message-store.js
message-store.d.ts
metrics.js
metrics.d.ts
rate-limiter.js
rate-limiter.d.ts
message-queue.js
message-queue.d.ts
retry-manager.js
retry-manager.d.ts
connection-pool.js
connection-pool.d.ts
index.js
index.d.tsLicense
MIT License
Contributing
Contributions are welcome. Please feel free to submit issues and pull requests.
