npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

mqtt-json-rpc

v3.0.2

Published

JSON-RPC protocol over MQTT communication

Downloads

1,450

Readme

MQTT-JSON-RPC

JSON-RPC protocol over MQTT communication.

github (author stars) github (author followers)

[!Note] This package is no longer actively developed, as it already evolved into a more sophisticated solution named MQTT+. MQTT+ encodes packets as CBOR by default, uses an own packet format (allowing sender and receiver information), uses shorter NanoIDs instead of longer UUIDs for identification of sender, receiver and requests, and additionally provides bi-directional resource transfer via chunk streaming.

Installation

$ npm install mqtt mqtt-json-rpc

About

This is an addon API for the excellent MQTT.js JavaScript/TypeScript API for Remote Procedure Call (RPC) communication based on the JSON-RPC protocol. This allows a bi-directional request/response-style communication over the technically uni-directional message protocol MQTT.

Conceptually, this RPC API provides two types of communication patterns:

  • Event Emission: Event Emission is a uni-directional communication pattern. An Event is the combination of an event name and optionally zero or more arguments. You subscribe to events. When an event is emitted, either a single particular subscriber (in case of a directed event emission) or all subscribers are called and receive the arguments as extra information.

  • Service Call: Service Call is a bi-directional communication pattern. A Service is the combination of a service name and optionally zero or more arguments. You register for a service. When a service is called, a single particular registrator (in case of a directed service call) or one arbitrary registrator is called and receives the arguments as the request. The registrator then has to provide the service response.

Notice: while the provided Event Emission functionality is just a very thing wrapper around the regular MQTT message publishing API of MQTT.js, the Service Call functionality is the core and heart of this addon API.

Usage

API:

export type API = {
    "example/sample": (a1: string, a2: boolean) => void
    "example/hello":  (a1: string, a2: number)  => string
}

Server:

import MQTT         from "mqtt"
import RPC          from "mqtt-json-rpc"
import type { API } from [...]

const mqtt = MQTT.connect("wss://127.0.0.1:8883", { ... })
const rpc  = new RPC<API>(mqtt)

mqtt.on("connect", async () => {
    rpc.subscribe("example/sample", (a1, a2) => {
        console.log("example/sample: ", a1, a2)
    })
    rpc.register("example/hello", (a1, a2) => {
        console.log("example/hello: ", a1, a2)
        return `${a1}:${a2}`
    })
})

Client:

import MQTT         from "mqtt"
import RPC          from "mqtt-json-rpc"
import type { API } from [...]

const mqtt = MQTT.connect("wss://127.0.0.1:8883", { ... })
const rpc  = new RPC<API>(mqtt)

mqtt.on("connect", () => {
    rpc.emit("example/sample", "foo", true)
    rpc.call("example/hello", "world", 42).then((response) => {
        console.log("example/hello response: ", response)
        mqtt.end()
    })
})

Application Programming Interface

The RPC API provides the following methods:

  • Construction:

    constructor(
        mqtt: MqttClient,
        options?: {
            clientId:                  string
            codec:                     "cbor" | "json"
            timeout:                   number
            topicEventNoticeMake:      (topic: string) => TopicMatching | null
            topicServiceRequestMake:   (topic: string) => TopicMatching | null
            topicServiceResponseMake:  (topic: string) => TopicMatching | null
            topicEventNoticeMatch:     { name: string, clientId?: string }
            topicServiceRequestMatch:  { name: string, clientId?: string }
            topicServiceResponseMatch: { name: string, clientId?: string }
        }
    )

    The mqtt is the MQTT.js instance, which has to be establish separately. The optional options object supports the following fields:

    • clientId: Custom client identifier (default: auto-generated UUID v1).
    • codec: Encoding format (default: cbor).
    • timeout: Communication timeout in milliseconds (default: 10000).
    • topicEventNoticeMake: Custom topic generation for event notices. (default: (name, clientId) => clientId ? `${name}/event-notice/${clientId}` : `${name}/event-notice`)
    • topicServiceRequestMake: Custom topic generation for service requests. (default: (name, clientId) => clientId ? `${name}/service-request/${clientId}` : `${name}/service-request`)
    • topicServiceResponseMake): Custom topic generation for service responses. (default: (name, clientId) => clientId ? `${name}/service-response/${clientId}` : `${name}/service-response`)
    • topicEventNoticeMatch: Custom topic matching for event notices. (default: (topic) => { const m = topic.match(/^(.+?)\/event-notice(?:\/(.+))?$/); return m ? { name: m[1], clientId: m[2] } : null })
    • topicServiceRequestMatch: Custom topic matching for service requests. (default: (topic) => { const m = topic.match(/^(.+?)\/service-request(?:\/(.+))?$/); return m ? { name: m[1], clientId: m[2] } : null })
    • topicServiceResponseMatch: Custom topic matching for service responses. (default: (topic) => { const m = topic.match(/^(.+?)\/service-response\/(.+)$/); return m ? { name: m[1], clientId: m[2] } : null })
  • Event Subscription:

    /*  (simplified TypeScript API method signature)  */
    subscribe(
        event:    string,
        options?: MQTT::IClientSubscribeOptions
        callback: (...params: any[]) => void,
    ): Promise<Subscription>

    Subscribe to an event. The event has to be a valid MQTT topic name. The optional options allows setting MQTT.js subscribe() options like qos. The callback is called with the params passed to a remote emit(). There is no return value of callback.

    Internally, on the MQTT broker, the topics generated by topicEventNoticeMake() (default: ${event}/event-notice and ${event}/event-notice/${clientId}) are subscribed. Returns a Subscription object with an unsubscribe() method.

  • Service Registration:

    /*  (simplified TypeScript API method signature)  */
    register(
        service:  string,
        options?: MQTT::IClientSubscribeOptions
        callback: (...params: any[]) => any,
    ): Promise<Registration>

    Register a service. The service has to be a valid MQTT topic name. The optional options allows setting MQTT.js subscribe() options like qos. The callback is called with the params passed to a remote call(). The return value of callback will resolve the Promise returned by the remote call().

    Internally, on the MQTT broker, the topics by topicServiceRequestMake() (default: ${service}/service-request and ${service}/service-request/${clientId}) are subscribed. Returns a Registration object with an unregister() method.

  • Event Emission:

    /*  (simplified TypeScript API method signature)  */
    emit(
        event:     string,
        clientId?: ClientId,
        options?:  MQTT::IClientSubscribeOptions,
        ...params: any[]
    ): void

    Emit an event to all subscribers or a specific subscriber ("fire and forget"). The optional clientId directs the event to a specific subscriber only. The optional options allows setting MQTT.js publish() options like qos or retain.

    The remote subscribe() callback is called with params and its return value is silently ignored.

    Internally, publishes to the MQTT topic by topicEventNoticeMake(event, clientId) (default: ${event}/event-notice or ${event}/event-notice/${clientId}).

  • Service Call:

    /*  (simplified TypeScript API method signature)  */
    call(
        service:   string,
        clientId?: ClientId,
        options?:  MQTT::IClientSubscribeOptions,
        ...params: any[]
    ): Promise<any>

    Call a service on all registrants or on a specific registrant ("request and response"). The optional clientId directs the call to a specific registrant only. The optional options allows setting MQTT.js publish() options like qos or retain.

    The remote register() callback is called with params and its return value resolves the returned Promise. If the remote callback throws an exception, this rejects the returned Promise.

    Internally, on the MQTT broker, the topic by topicServiceResponseMake(service, clientId) (default: ${service}/service-response/${clientId}) is temporarily subscribed for receiving the response.

  • Client Id Wrapping:

    clientId(
        id: string
    ): ClientId

    Wrap a client ID string for use with emit() or call() to direct the message to a specific client. Returns a ClientId object.

Internals

In the following, assume that an RPC instance is created with:

import MQTT from "mqtt"
import RPC  from "mqtt-json-rpc"

const mqtt = MQTT.connect("...", { ... })
const rpc  = new RPC(mqtt, { clientId: "d1acc980-0e4e-11e8-98f0-ab5030b47df4", codec: "json" })

Internally, remote services are assigned to MQTT topics. When calling a remote service named example/hello with parameters "world" and 42 via...

rpc.call("example/hello", "world", 42).then((result) => {
    ...
})

...the following JSON-RPC 2.0 request message is sent to the permanent MQTT topic example/hello/service-request (UUID d1db7aa0-0e4e-11e8-b1d9-5f0ab230c0d9 is a random generated one):

{
    "jsonrpc": "2.0",
    "id":      "d1acc980-0e4e-11e8-98f0-ab5030b47df4:d1db7aa0-0e4e-11e8-b1d9-5f0ab230c0d9",
    "method":  "example/hello",
    "params":  [ "world", 42 ]
}

Beforehand, this example/hello service should have been registered with...

rpc.register("example/hello", (a1, a2) => {
    return `${a1}:${a2}`
})

...and then its result, in the above rpc.call() example "world:42", is then sent back as the following JSON-RPC 2.0 success response message to the temporary (client-specific) MQTT topic example/hello/service-response/d1acc980-0e4e-11e8-98f0-ab5030b47df4:

{
    "jsonrpc": "2.0",
    "id":      "d1acc980-0e4e-11e8-98f0-ab5030b47df4:d1db7aa0-0e4e-11e8-b1d9-5f0ab230c0d9",
    "result":  "world:42"
}

The JSON-RPC 2.0 id field always consists of clientId:requestId, where clientId is the UUID v1 of the RPC client instance and requestId is the UUID v1 of the particular service request. The clientId is used for sending back the JSON-RPC 2.0 response message to the requestor only. The requestId is used for correlating the response to the request only.

Broker Setup

For a real test-drive of MQTT-JSON-RPC, install the Mosquitto MQTT broker and a mosquitto.conf file like...

[...]

password_file        mosquitto-pwd.txt
acl_file             mosquitto-acl.txt

[...]

#   additional listener
listener             1883 127.0.0.1
max_connections      -1
protocol             mqtt

[...]

...and an access control list in mosquitto-acl.txt like...

user    example
topic   readwrite example/#

...and an example user (with password example) in mosquitto-pwd.txt like:

example:$6$awYNe6oCAi+xlvo5$mWIUqyy4I0O3nJ99lP1mkRVqsDGymF8en5NChQQxf7KrVJLUp1SzrrVDe94wWWJa3JGIbOXD9wfFGZdi948e6A==

Alternatively, you can use the NPM package mosquitto for an equal setup.

Example

You can test-drive MQTT-JSON-RPC with a complete sample to see MQTT-JSON-RPC in action and tracing its communication (the typing of the RPC class with API is optional, but strongly suggested):

import Mosquitto from "mosquitto"
import MQTT      from "mqtt"
import RPC       from "mqtt-json-rpc"

const mosquitto = new Mosquitto()
await mosquitto.start()
await new Promise((resolve) => { setTimeout(resolve, 500) })

const mqtt = MQTT.connect("mqtt://127.0.0.1:1883", {
    username: "example",
    password: "example"
})

type API = {
    "example/sample": (a1: string, a2: number) => void
    "example/hello":  (a1: string, a2: number) => string
}

const rpc = new RPC<API>(mqtt, { codec: "json" })

type Sample = (a: string, b: number) => string

mqtt.on("error",     (err)            => { console.log("ERROR", err) })
mqtt.on("offline",   ()               => { console.log("OFFLINE") })
mqtt.on("close",     ()               => { console.log("CLOSE") })
mqtt.on("reconnect", ()               => { console.log("RECONNECT") })
mqtt.on("message",   (topic, message) => { console.log("RECEIVED", topic, message.toString()) })

mqtt.on("connect", () => {
    console.log("CONNECT")
    rpc.register("example/hello", (a1, a2) => {
        console.log("example/hello: request: ", a1, a2)
        return `${a1}:${a2}`
    })
    rpc.call("example/hello", "world", 42).then((result) => {
        console.log("example/hello success: ", result)
        mqtt.end()
        await mosquitto.stop()
    }).catch((err) => {
        console.log("example/hello error: ", err)
    })
})

The output will be:

$ node sample.ts
CONNECT
RECEIVED example/hello/service-request {"jsonrpc":"2.0","id":"b441fe30-e8af-11f0-b361-a30e779baa27:b474f510-e8af-11f0-ace2-97e30fcf7dca","method":"example/hello","params":["world",42]}
example/hello: request:  world 42
RECEIVED example/hello/service-response/b441fe30-e8af-11f0-b361-a30e779baa27 {"jsonrpc":"2.0","id":"b441fe30-e8af-11f0-b361-a30e779baa27:b474f510-e8af-11f0-ace2-97e30fcf7dca","result":"world:42"}
example/hello success:  world:42
CLOSE

License

Copyright (c) 2018-2025 Dr. Ralf S. Engelschall (http://engelschall.com/)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.