@qualithm/mqtt-wire
v0.1.4
Published
Server-side MQTT protocol codec and connection state machine for JavaScript and TypeScript runtimes.
Downloads
665
Maintainers
Readme
MQTT Wire
Server-side MQTT protocol codec and connection state machine for JavaScript and TypeScript runtimes. Accepts connections from MQTT clients, parses bytes into typed packets, encodes packets into bytes, and manages per-connection protocol state.
Features
- MqttWire — Connection state machine with lifecycle hooks for CONNECT, PUBLISH, SUBSCRIBE, etc.
- Protocol Support — MQTT 3.1.1 and 5.0 with full packet type coverage
- Codec Utilities — BinaryReader, BinaryWriter, StreamFramer for low-level protocol handling
- QoS Tracking — QoSFlowTracker and PacketIdAllocator for reliable message delivery
- Topic Aliases — TopicAliasManager for MQTT 5.0 topic alias negotiation
- Multi-runtime — Works with Bun, Node.js 20+, and Deno
Installation
bun add @qualithm/mqtt-wire
# or
npm install @qualithm/mqtt-wireQuick Start
import * as net from "node:net"
import { MqttWire, PacketType } from "@qualithm/mqtt-wire"
const server = net.createServer((socket) => {
const wire = new MqttWire({
onSend: (data) => socket.write(data),
onConnect: (connect) => {
console.log(`Client connected: ${connect.clientId}`)
return {
type: PacketType.CONNACK,
sessionPresent: false,
reasonCode: 0x00
}
},
onPublish: (packet) => {
const payload = new TextDecoder().decode(packet.payload)
console.log(`[${packet.topic}] ${payload}`)
},
onSubscribe: (packet) => ({
type: PacketType.SUBACK,
packetId: packet.packetId,
reasonCodes: packet.subscriptions.map((s) => s.options.qos)
}),
onDisconnect: () => console.log("Client disconnected")
})
socket.on("data", (chunk) => wire.receive(chunk))
socket.on("close", () => wire.close())
})
server.listen(1883, () => console.log("MQTT server on port 1883"))Usage
Low-Level Codec
import {
BinaryReader,
BinaryWriter,
decodeVariableByteInteger,
encodeVariableByteIntegerToArray
} from "@qualithm/mqtt-wire"
// Encode a variable byte integer
const encoded = encodeVariableByteIntegerToArray(16384)
console.log(encoded) // Uint8Array [0x80, 0x80, 0x01]
// Decode it back
const decoded = decodeVariableByteInteger(encoded, 0)
if (decoded.ok) {
console.log(decoded.value.value) // 16384
}
// Build a packet manually
const writer = new BinaryWriter()
writer
.writeUint8(0x10) // CONNECT packet type
.writeVariableByteInteger(12) // Remaining length
.writeMqttString("MQTT") // Protocol name
.writeUint8(5) // Protocol version
const packet = writer.toUint8Array()Error Handling
MqttWire uses lifecycle hooks for error reporting — receive() does not throw protocol errors.
import { MqttWire, ProtocolError, StateError, type DecodeResult } from "@qualithm/mqtt-wire"
// Protocol errors from receive() are reported via the onError hook
const wire = new MqttWire({
onSend: (data) => socket.write(data),
onConnect: (connect) => ({
/* ... */
}),
onError: (error) => {
// error is a ProtocolError with an MQTT reason code
console.error(`protocol error: ${error.message}`, {
reasonCode: error.reasonCode
})
socket.destroy()
}
})
// receive() handles protocol errors internally; guard against unexpected failures
socket.on("data", (chunk) => {
wire.receive(chunk).catch((err) => {
console.error("unexpected receive error", err)
socket.destroy()
})
})
// StateError is thrown by outbound methods when called in the wrong state
try {
await wire.publish("topic", payload)
} catch (error) {
if (error instanceof StateError) {
console.error(`state error: ${error.message}`, { state: error.state })
}
}
// Codec functions return Result types (no exceptions)
const result: DecodeResult<number> = decodeVariableByteInteger(data, 0)
if (result.ok) {
console.log(result.value)
} else {
console.error(`[${result.error.code}] ${result.error.message}`)
}API Reference
Full API documentation is generated with TypeDoc:
bun run docs
# Output in docs/Examples
See the examples/ directory for runnable examples:
| Example | Description |
| ------------------------------------------------- | ------------------------- |
| node-tcp.ts | Node.js TCP server |
| bun-tcp.ts | Bun TCP server |
| deno-tcp.ts | Deno TCP server |
| websocket.ts | WebSocket server |
| basic-usage.ts | Low-level codec utilities |
| error-handling.ts | Result type patterns |
bun run examples/node-tcp.tsDevelopment
Prerequisites
Setup
bun installBuilding
bun run buildTesting
bun run test # unit tests
bun run test:coverage # with coverage reportLinting & Formatting
bun run lint
bun run format
bun run typecheckBenchmarks
bun run benchPublishing
The package is automatically published to NPM when CI passes on main. Update the version in
package.json before merging to trigger a new release.
Licence
Apache-2.0
