npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@synadiaorbit/messagepipeline

v1.0.0

Published

messagepipeline - pipeline middleware for NATS subscriptions

Readme

MessagePipeline

License messagepipeline JSR JSR Score NPM Version NPM Downloads

The MessagePipeline utility allows you to compose a set of one or more transformations that you can easily reuse across message handlers. If you are thinking middleware for NATS, you are on the right track.

While NATS already provides a message-based vocabulary to implement transformations, code that you may be on-boarding to NATS may rely on a series of middleware transformations that you apply to input messages. If that is the case, this utility will probably be very useful to you.

You can use a MessagePipeline to validate, reformat, and transform messages. For example could check the schema of an input, and generate a different but equivalent input, or annotate the message with additional information.

Installing

The library requires an ESM-compatible runtime (like a browser).

The open-source package registry JSR, hosts packages. See messagepipeline.

deno add @synadia-io/messagepipeline

Pipeline Functions

The base functionality for a pipeline is a function PipelineFn that takes a Msg and returns a Msg or a Promise<Msg> in return:

export type PipelineFn = (msg: Msg) => Promise<Msg> | Msg;

Here's an example:

import { MutableMsg } from "./mod";

function reverse(m: Msg): Msg {
  const mm = MutableMsg.fromMsg(m);
  mm.data = new TextEncoder().encode(m.string().split("").reverse().join(""));
  return mm;
}

The above example is simply a function that takes an input message, and then creates a message that can be mutated from it. By using the source message, message properties like subjects, reply subjects, headers and data are all initialized to match the source message. Then additional transformations can be applied, in the case above, the message text is just reversed.

MutableMsg

Messages in the Javascript clients are immutable. For a pipeline, you'll need a way of crafting a message, that is where MutableMsg comes in. Looks like a standard message, but you are able to set values on the available properties.

Note that if you use MutableMsg.fromMsg() with a message that originated from a subscription, you'll effectively clone the message. If you use the constructor, you are responsible to initialize all the fields, including a special one called publisher that enables respond() functionality - this is effectively a reference to the NatsConnection.

Pipelines

A Pipelines are simply a collection of PipelineFn executed in order. The Pipelines interface defines a pipeline:

export interface Pipelines {
  transform(m: Msg): Promise<Msg> | Msg;
}

If the pipeline fails (one of its functions throws), the Promise rejects.

try {
  const r = await pipeline.transform(m);
  // do something with the transform
} catch (err) {
  // do something with the error
}

As you can see, using a Pipeline is very straight forward. It allows you to compose repetitive code info a flow that could lead to a simpler handler.

Full Example

Here's the full example:

import { MutableMsg, Pipeline } from "jsr:@synadia-io/messagepipeline";
import { connect, Empty, headers } from "jsr:@nats-io/[email protected]";
import type { Msg } from "jsr:@nats-io/[email protected]";

function valid(m: Msg): Msg {
  if (m.data.length > 0) {
    return MutableMsg.fromMsg(m);
  } else {
    // so you could respond here, the code base needs to be certain
    // that of that behaviour as there's nothing preventing another
    // respond elsewhere.
    const h = headers();
    h.set("Error", "message is empty");
    m.respond(Empty, { headers: h });
    // the throws will be caught by the pipeline, which can then
    // choose to ignore the message
    throw new Error("message is empty");
  }
}

function reverse(m: Msg): Msg {
  try {
    const mm = MutableMsg.fromMsg(m);
    mm.data = new TextEncoder().encode(m.string().split("").reverse().join(""));
    return mm;
  } catch (err) {
    const h = headers();
    h.set("Error", err.message);
    m.respond(Empty, { headers: h });
    // the throws will be caught by the pipeline, which can then
    // choose to ignore the message
    throw err;
  }
}

const nc = await connect({ servers: ["demo.nats.io"] });
const iter = nc.subscribe("hello");
(async () => {
  const pipeline = new Pipeline(valid, reverse);
  for await (const m of iter) {
    try {
      const r = await pipeline.transform(m);
      nc.respondMessage(r);
    } catch (_) {
      m.respond("error");
    }
  }
})();
await nc.flush();

const nc2 = await connect({ servers: ["demo.nats.io"] });
let i = 0;
setInterval(() => {
  nc2.request("hello", `hello ${++i}`)
    .then((r) => {
      console.log(r.string());
    });
}, 1000);