npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@qualithm/mqtt-wire

v0.1.4

Published

Server-side MQTT protocol codec and connection state machine for JavaScript and TypeScript runtimes.

Downloads

665

Readme

MQTT Wire

CI codecov npm

Server-side MQTT protocol codec and connection state machine for JavaScript and TypeScript runtimes. Accepts connections from MQTT clients, parses bytes into typed packets, encodes packets into bytes, and manages per-connection protocol state.

Features

  • MqttWire — Connection state machine with lifecycle hooks for CONNECT, PUBLISH, SUBSCRIBE, etc.
  • Protocol Support — MQTT 3.1.1 and 5.0 with full packet type coverage
  • Codec Utilities — BinaryReader, BinaryWriter, StreamFramer for low-level protocol handling
  • QoS Tracking — QoSFlowTracker and PacketIdAllocator for reliable message delivery
  • Topic Aliases — TopicAliasManager for MQTT 5.0 topic alias negotiation
  • Multi-runtime — Works with Bun, Node.js 20+, and Deno

Installation

bun add @qualithm/mqtt-wire
# or
npm install @qualithm/mqtt-wire

Quick Start

import * as net from "node:net"
import { MqttWire, PacketType } from "@qualithm/mqtt-wire"

const server = net.createServer((socket) => {
  const wire = new MqttWire({
    onSend: (data) => socket.write(data),

    onConnect: (connect) => {
      console.log(`Client connected: ${connect.clientId}`)
      return {
        type: PacketType.CONNACK,
        sessionPresent: false,
        reasonCode: 0x00
      }
    },

    onPublish: (packet) => {
      const payload = new TextDecoder().decode(packet.payload)
      console.log(`[${packet.topic}] ${payload}`)
    },

    onSubscribe: (packet) => ({
      type: PacketType.SUBACK,
      packetId: packet.packetId,
      reasonCodes: packet.subscriptions.map((s) => s.options.qos)
    }),

    onDisconnect: () => console.log("Client disconnected")
  })

  socket.on("data", (chunk) => wire.receive(chunk))
  socket.on("close", () => wire.close())
})

server.listen(1883, () => console.log("MQTT server on port 1883"))

Usage

Low-Level Codec

import {
  BinaryReader,
  BinaryWriter,
  decodeVariableByteInteger,
  encodeVariableByteIntegerToArray
} from "@qualithm/mqtt-wire"

// Encode a variable byte integer
const encoded = encodeVariableByteIntegerToArray(16384)
console.log(encoded) // Uint8Array [0x80, 0x80, 0x01]

// Decode it back
const decoded = decodeVariableByteInteger(encoded, 0)
if (decoded.ok) {
  console.log(decoded.value.value) // 16384
}

// Build a packet manually
const writer = new BinaryWriter()
writer
  .writeUint8(0x10) // CONNECT packet type
  .writeVariableByteInteger(12) // Remaining length
  .writeMqttString("MQTT") // Protocol name
  .writeUint8(5) // Protocol version

const packet = writer.toUint8Array()

Error Handling

MqttWire uses lifecycle hooks for error reporting — receive() does not throw protocol errors.

import { MqttWire, ProtocolError, StateError, type DecodeResult } from "@qualithm/mqtt-wire"

// Protocol errors from receive() are reported via the onError hook
const wire = new MqttWire({
  onSend: (data) => socket.write(data),
  onConnect: (connect) => ({
    /* ... */
  }),

  onError: (error) => {
    // error is a ProtocolError with an MQTT reason code
    console.error(`protocol error: ${error.message}`, {
      reasonCode: error.reasonCode
    })
    socket.destroy()
  }
})

// receive() handles protocol errors internally; guard against unexpected failures
socket.on("data", (chunk) => {
  wire.receive(chunk).catch((err) => {
    console.error("unexpected receive error", err)
    socket.destroy()
  })
})

// StateError is thrown by outbound methods when called in the wrong state
try {
  await wire.publish("topic", payload)
} catch (error) {
  if (error instanceof StateError) {
    console.error(`state error: ${error.message}`, { state: error.state })
  }
}

// Codec functions return Result types (no exceptions)
const result: DecodeResult<number> = decodeVariableByteInteger(data, 0)
if (result.ok) {
  console.log(result.value)
} else {
  console.error(`[${result.error.code}] ${result.error.message}`)
}

API Reference

Full API documentation is generated with TypeDoc:

bun run docs
# Output in docs/

Examples

See the examples/ directory for runnable examples:

| Example | Description | | ------------------------------------------------- | ------------------------- | | node-tcp.ts | Node.js TCP server | | bun-tcp.ts | Bun TCP server | | deno-tcp.ts | Deno TCP server | | websocket.ts | WebSocket server | | basic-usage.ts | Low-level codec utilities | | error-handling.ts | Result type patterns |

bun run examples/node-tcp.ts

Development

Prerequisites

  • Bun (recommended), Node.js 20+, or Deno

Setup

bun install

Building

bun run build

Testing

bun run test              # unit tests
bun run test:coverage     # with coverage report

Linting & Formatting

bun run lint
bun run format
bun run typecheck

Benchmarks

bun run bench

Publishing

The package is automatically published to NPM when CI passes on main. Update the version in package.json before merging to trigger a new release.

Licence

Apache-2.0