npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

rmqtt-js

v0.1.4

Published

High-performance MQTT server for Node.js built with Rust and Neon

Readme

rmqtt-js

npm version CI npm downloads license


📚 Table of contents


✨ Highlights

[!IMPORTANT] Designed for teams that demand predictable latency, graceful restarts, and deep observability hooks.


📦 Install

npm install rmqtt-js

[!NOTE] Ships with prebuilt binaries for Linux, macOS, and Windows; falls back to source builds when needed.

This package ships prebuilt binaries for common platforms (linux-x64/arm64, darwin-x64/arm64, win32-x64). If a prebuild for your platform isn’t available, install will build from source:

  • Node.js 18+ recommended (engines allow >=14)
  • Rust toolchain is required when building from source
    • macOS: Xcode Command Line Tools (xcode-select --install)
    • Linux: build-essential, pkg-config
    • Windows: Visual Studio Build Tools (MSVC), Rustup default toolchain

⚡️ Quick start

[!TIP] Combine rmqtt-js with your favorite MQTT client or cloud infrastructure for instant telemetry pipelines.

TypeScript

import { MqttServer, QoS } from "rmqtt-js";

const server = new MqttServer();

// Optional: simple demo auth (password must be "demo")
server.setHooks({
  onClientAuthenticate: ({ username, password }) => ({
    allow: Boolean(username) && password === "demo",
    superuser: false,
  }),
});

await server.start(MqttServer.createBasicConfig(1883));

// Publish a message
await server.publish("sensor/temperature", "23.7", { qos: QoS.AtLeastOnce });

JavaScript (CommonJS)

const { MqttServer, QoS } = require("rmqtt-js");

(async () => {
  const server = new MqttServer();
  await server.start(MqttServer.createBasicConfig(1883));
  await server.publish("hello/world", Buffer.from("hi"), { qos: QoS.AtMostOnce });
})();

🪝 Powerful hooks

Listen to broker events in real time — only invoked if you register them.

Hooks overview

| Hook | Type | When it fires | JS return | Default if not set | Timeout/error | | ---------------------------------------------- | ------------------- | --------------------------------------------- | -------------------------------------------- | ------------------------------ | ------------- | | onClientConnect(info) | notification | Server receives CONNECT | void | — | — | | onClientAuthenticate(auth) | decision | A client connects | { allow, superuser?, reason? } | RMQTT defaults | deny | | onClientSubscribeAuthorize(session, sub) | decision | Client subscribes | { allow, qos?, reason? } | RMQTT defaults | deny | | onClientPublishAuthorize(session, packet) | decision + mutation | Client publishes | { allow, topic?, payload?, qos?, reason? } | allow (except $SYS/* denied) | deny | | onMessagePublish(session, from, msg) | notification | Any publish observed | void | — | — | | onClientSubscribe(session, sub) | notification | Client subscribes | void | — | — | | onClientUnsubscribe(session, unsub) | notification | Client unsubscribes | void | — | — | | onMessageDelivered(session, from, msg) | notification | Before delivering a message to a client | void | — | — | | onMessageAcked(session, from, msg) | notification | After client acknowledges a delivered message | void | — | — | | onMessageDropped(session, from?, msg, info?) | notification | Message could not be delivered (dropped) | void | — | — | | onSessionCreated(session) | notification | Session created | void | — | — | | onClientConnected(session) | notification | Client connected | void | — | — | | onClientConnack(info) | notification | CONNACK (success/fail) | void | — | — | | onClientDisconnected(session, info?) | notification | Client disconnected | void | — | — | | onSessionSubscribed(session, sub) | notification | Client subscribed | void | — | — | | onSessionUnsubscribed(session, unsub) | notification | Client unsubscribed | void | — | — | | onSessionTerminated(session, info?) | notification | Session terminated | void | — | — |

Auth (JS → Rust decision flow)

server.setHooks({
  onClientAuthenticate: ({ username, password, clientId, remoteAddr }) => {
    const allow = Boolean(username) && password === "demo";
    return { allow, superuser: false, reason: allow ? "ok" : "bad creds" };
  },
});

Subscribe ACLs with QoS override

server.setHooks({
  onClientSubscribeAuthorize: (session, sub) => {
    const user = session?.username ?? "";
    const allowed = sub.topicFilter.startsWith(`${user}/`) || sub.topicFilter.startsWith("server/public/");
    return allowed ? { allow: true, qos: 1 } : { allow: false, reason: "not authorized" };
  },
});

Promise-based Subscribe ACL

server.setHooks({
  onClientSubscribeAuthorize: async (session, sub) => {
    // e.g., async lookup against a policy service
    await new Promise(r => setTimeout(r, 50));
    const user = session?.username ?? "";
    const allowed = sub.topicFilter.startsWith(`${user}/`);
    // You can also override QoS (0/1/2); invalid values are ignored and a WARN is logged.
    return allowed ? { allow: true, qos: 1 } : { allow: false };
  },
});

Publish/Subscribe notifications

server.setHooks({
  onMessagePublish: (_session, from, msg) => {
    console.log(`Message ${msg.topic}`, msg.payload.toString());
  },
  onClientSubscribe: (_session, sub) => console.log("SUB", sub),
  onClientUnsubscribe: (_session, unsub) => console.log("UNSUB", unsub),
});

Message delivery notifications

server.setHooks({
  onMessageDelivered: (_session, from, message) => {
    console.log(`Delivered: ${message.topic} (from ${from.type})`);
  },
  onMessageAcked: (_session, from, message) => {
    console.log(`Acked: ${message.topic} (from ${from.type})`);
  },
  onMessageDropped: (_session, from, message, info) => {
    console.warn(`Dropped: ${message.topic} (from ${from?.type ?? 'unknown'}) reason=${info?.reason ?? 'n/a'}`);
  },
});

Lifecycle notifications

server.setHooks({
  onClientConnected: (session) => console.log("CONNECTED", session.clientId),
  // Emitted when broker sends CONNACK (success or failure). Filter non-success if desired.
  onClientConnack: (info) => console.log("CONNACK", info.clientId, info.connAck),
  onClientDisconnected: (session, info) => console.log("DISCONNECTED", session.clientId, info?.reason),
  // Subscribe/session lifecycle
  onSessionSubscribed: (session, sub) => console.log("SUBSCRIBED", session.clientId, sub.topicFilter),
});

Notes:

  • onClientConnack fires for both success and non-success outcomes. To handle errors only, check info.connAck !== "Success".
  • Typical connAck values include: "Success", "BadUsernameOrPassword", "NotAuthorized", and other broker reasons depending on protocol version.
  • The node field currently uses a single-node placeholder value of 1. If/when multi-node/clustering is introduced, this will reflect the real node id.

Publish ACL with optional mutation

server.setHooks({
  onClientPublishAuthorize: (session, packet) => {
    // Deny system topics unless explicitly allowed
    if (packet.topic.startsWith("$SYS")) {
      return { allow: false, reason: "system topic" };
    }
    // Example: rewrite topic and uppercase payload
    return {
      allow: true,
      topic: `users/${session?.username ?? "anon"}/out`,
      payload: Buffer.from(packet.payload.toString("utf8").toUpperCase()),
      qos: 0,
    };
  },
});

Hook semantics:

  • Not registered → not called (RMQTT defaults apply)
  • Auth timeouts or callback errors → deny with WARN
  • Subscribe ACL timeouts or callback errors → deny with WARN
  • Publish hook: invalid mutation fields (e.g., qos not 0/1/2 or topic with wildcards +/#) are ignored and the original values are used; a WARN is logged.

⚙️ Configuration

Quick helpers

// Single TCP listener
const basic = MqttServer.createBasicConfig(1883);

// Multi‑protocol listeners (TLS/WSS require cert+key)
const multi = MqttServer.createMultiProtocolConfig({
  tcpPort: 1883,
  wsPort: 8080,
  tlsPort: 8883,
  wssPort: 8443,
  address: "0.0.0.0",
  // tlsCert: "/path/to/cert.pem",
  // tlsKey: "/path/to/key.pem",
  allowAnonymous: true,
});

Full shape (TypeScript)

interface ListenerConfig {
  name: string;
  address: string;      // e.g. "0.0.0.0"
  port: number;         // 1–65535
  protocol: "tcp" | "tls" | "ws" | "wss";
  tlsCert?: string;     // required for tls/wss
  tlsKey?: string;      // required for tls/wss
  allowAnonymous?: boolean; // default: true
}

interface ServerConfig {
  listeners: ListenerConfig[];
  pluginsConfigDir?: string;
  pluginsDefaultStartups?: string[];
}

📡 Pub/Sub API

await server.publish(topic: string, payload: string | Buffer, options?: {
  qos?: QoS;       // 0 | 1 | 2; default 0
  retain?: boolean // default false
});

Notes:

  • Publishing requires the server to be running.
  • QoS values are validated; invalid values are rejected with an ERROR log.
  • Payloads are sent as bytes; strings are encoded as UTF‑8.

🧪 Example app

This repo ships with a comprehensive example showing auth, ACLs, and server‑side publishing.

Run it:

npm run example

What it demonstrates:

  • TCP server on port 1883
  • JavaScript authentication (password = "demo")
  • Subscribe ACL: only /* and server/public/* are allowed; granted QoS capped at 1
  • Real‑time logs for publish/subscribe/unsubscribe
  • Periodic heartbeat publishing

The full source lives in examples/simple-server.ts.

See also: examples/CHEATSHEET.md for quick mqtt.js and Python client snippets (TCP/WS/TLS/WSS).


🧾 TypeScript types

export enum QoS { AtMostOnce = 0, AtLeastOnce = 1, ExactlyOnce = 2 }

export interface SessionInfo { node: number; remoteAddr: string | null; clientId: string; username: string | null; }
export interface MessageFrom { type: "client" | "system" | "bridge" | "admin" | "lastwill" | "custom"; node: number; remoteAddr: string | null; clientId: string; username: string | null; }
export interface MessageInfo { dup: boolean; qos: QoS; retain: boolean; topic: string; payload: Buffer; createTime: number; }
export interface SubscriptionInfo { topicFilter: string; qos: QoS }
export interface UnsubscriptionInfo { topicFilter: string }

export interface MqttMessage {
  topic: string;
  payload: Buffer;
  qos: QoS;
  retain: boolean;
}

export interface AuthenticationRequest { clientId: string; username: string | null; password: string | null; protocolVersion: number; remoteAddr: string; keepAlive: number; cleanSession: boolean; }
export interface AuthenticationResult { allow: boolean; superuser?: boolean; reason?: string }
export interface SubscribeAuthorizeResult { allow: boolean; qos?: number; reason?: string }
export interface PublishAuthorizeResult { allow: boolean; topic?: string; payload?: Buffer; qos?: number; reason?: string }

export interface HookCallbacks {
  onClientAuthenticate?(auth: AuthenticationRequest): AuthenticationResult | Promise<AuthenticationResult>;
  onClientSubscribeAuthorize?(session: SessionInfo | null, sub: SubscriptionInfo): SubscribeAuthorizeResult | Promise<SubscribeAuthorizeResult>;
  onClientPublishAuthorize?(session: SessionInfo | null, packet: MqttMessage): PublishAuthorizeResult | Promise<PublishAuthorizeResult>;
  onMessagePublish?(session: SessionInfo | null, from: MessageFrom, msg: MessageInfo): void;
  onClientSubscribe?(session: SessionInfo | null, sub: SubscriptionInfo): void;
  onClientUnsubscribe?(session: SessionInfo | null, unsub: UnsubscriptionInfo): void;
  onMessageDelivered?(session: SessionInfo | null, from: MessageFrom, message: MessageInfo): void;
  onMessageAcked?(session: SessionInfo | null, from: MessageFrom, message: MessageInfo): void;
  onMessageDropped?(session: SessionInfo | null, from: MessageFrom | null, message: MessageInfo, info?: { reason?: string }): void;
  onClientConnect?(info: ConnectInfo): void;
  onClientConnack?(info: ConnackInfo): void;
  onClientConnected?(session: SessionInfo): void;
  onClientDisconnected?(session: SessionInfo, info?: { reason?: string }): void;
  onSessionCreated?(session: SessionInfo): void;
  onSessionSubscribed?(session: SessionInfo, subscription: SubscriptionInfo): void;
  onSessionUnsubscribed?(session: SessionInfo, unsubscription: UnsubscriptionInfo): void;
  onSessionTerminated?(session: SessionInfo, info?: { reason?: string }): void;
}

// Connection lifecycle payloads
export interface ConnectInfo {
  node: number;
  remoteAddr: string | null;
  clientId: string;
  username: string | null;
  keepAlive: number;
  protoVer: number;
  cleanSession?: boolean; // MQTT 3.1/3.1.1
  cleanStart?: boolean;   // MQTT 5.0
}

export interface ConnackInfo extends ConnectInfo {
  connAck: string; // e.g., "Success", "BadUsernameOrPassword", "NotAuthorized"
}

Implementation notes:

  • MessageFrom is a best-effort attribution based on RMQTT origin. When the origin is a client, future versions may populate clientId and username more precisely. If multi-node is planned, threading the real node id into SessionInfo/MessageFrom early will avoid downstream assumptions.

🏗️ Architecture (how it works)

  • Rust core runs RMQTT on a dedicated Tokio runtime thread, separate from Node’s event loop
  • Neon bridges expose a minimal, typed API to Node.js
  • JS→Rust commands use an internal channel; Rust→JS hooks use a Neon event Channel
  • Hook behavior: if a JS hook is registered, it’s authoritative; if not, RMQTT defaults apply
  • Safety valves: 5s timeouts for JS decisions (auth/subscribe ACL). Timeout or channel error → deny with WARN
  • Resource lifecycle: start/stop/close are graceful; a small shared state waits for server readiness before publishing

✅ Testing

npm test

What the suite covers:

  • Auth allow/deny, timeouts, and missing‑hook behavior
  • Subscribe ACL allow/deny with QoS override and session marshalling
  • Publish path and client round‑trips using a real MQTT client
  • Robust readiness using event‑driven port checks (no arbitrary sleeps)

🛠️ Troubleshooting

  • ERROR: Attempted to publish while server is not running → Call start() before publish()
  • ERROR: Invalid configuration → Ensure at least one listener; TLS/WSS require both tlsCert and tlsKey
  • WARN: Hook timeout/channel error → Your JS hook may be slow or threw; decisions fall back to deny
  • Native build fails during install → Ensure Rust toolchain and platform build tools are available

🔐 Security best practices

  • Always enable TLS in production (tls/wss) and use strong certificates
    • Generate or provision certs from a trusted CA; rotate regularly
    • In createMultiProtocolConfig, only enable tls/wss when both tlsCert and tlsKey are present
  • Never hard‑code secrets (passwords, tokens); pull them from env/secret managers
  • Implement onClientAuthenticate on the JS side to enforce your auth model
    • Consider rate limiting and account lockouts outside the hook as needed
  • Restrict subscription scope with onClientSubscribeAuthorize (e.g., <username>/*)
  • Prefer unique client IDs; reject anonymous if your threat model requires it
  • Log only WARN/ERROR and avoid sensitive data in logs
  • Keep Node and Rust dependencies up to date; rebuild native module on updates

🗓️ Changelog

See CHANGELOG.md for release notes.


📄 License

MIT