npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rabbitmq-stream-js-client

v0.3.1

Published

Rabbit stream client for JS/TS application

Downloads

990

Readme

RabbitMQ client for the stream protocol for Node.JS

Build Status

Table of Contents

Overview

A client for the RabbitMQ stream protocol, written (and ready for) Typescript

Installing via NPM

npm install rabbitmq-stream-js-client

Getting started

A quick getting started

const rabbit = require("rabbitmq-stream-js-client")

async function main() {
  const client = await rabbit.connect({
    hostname: "localhost",
    port: 5552,
    username: "rabbit",
    password: "rabbit",
    vhost: "/",
  })

  await client.close()
}

main()
  .then(() => console.log("done!"))
  .catch((res) => console.log("ERROR ", res))

Usage


Connect

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
})

// ...

await client.close()

Connect through TLS/SSL

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
  ssl: {
    key: "<client_private_key>",
    cert: "<client_certificate>",
    ca: "<ca>", // Optional
  },
})

// ...

await client.close()

Basic Publish

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
})

const publisher = await client.declarePublisher({
  stream: "stream-name",
  publisherRef: "my-publisher",
})

await publisher.send(Buffer.from("my message content"))

// ...

await client.close()

Sub Batch Entry Publishing

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
})

const publisher = await client.declarePublisher({
  stream: "stream-name",
  publisherRef: "my-publisher",
})

const messages = [
  { content: Buffer.from("my message content 1") },
  { content: Buffer.from("my message content 2") },
  { content: Buffer.from("my message content 3") },
  { content: Buffer.from("my message content 4") },
]

await publisher.sendSubEntries(messages)
/*
  It is also possible to specify a compression when sending sub entries of messages:
  e.g:  await publisher.sendSubEntries(messages, CompressionType.Gzip)

  The current values for the compression types are CompressionType.None or CompressionType.Gzip
*/

await client.close()

Basic Consuming

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
})

const consumerOptions = { stream: "stream-name", offset: Offset.next() }
/*
  When creating a consumer the offset and the stream name are mandatory parameters.
  The offset parameter can be created from one of the following functions:
    - Offset.first()        ---> Start reading from the first available offset.
    - Offset.next()         ---> Start reading from the next offset to be written.
    - Offset.last()         ---> Start reading from the last chunk of messages in the stream.
    - Offset.offset(x)      ---> Start reading from the specified offset. The parameter has to be a bigint.
    - Offset.timestamp(t)   ---> Start reading from the messages stored after the timestamp t.

*/

const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {
  console.log(message.content) // it's a Buffer
})

// declareConsumer works even with sub batch entry publishing and compression

// ...

await client.close()

Single Active Consumer

It is possible to create a consumer as single active. For the given reference only one consumer will be able to consume messages

const consumerOptions = {
  stream: "stream-name",
  offset: Offset.next(),
  singleActive: true,
  consumerRef: "my-consumer-ref",
} // see docs for various offset types

const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {
  console.log(message.content) // it's a Buffer
})
// ...

Clustering

Every time we create a new producer or a new consumer, a new connection is created. In particular for the producer the connection is created on the node leader. For more running the tests in a cluster follow the readme under the folder /cluster

Load Balancer

With the load balancer, what happens is we will connect first to the AddressResolver and then we will connect to the node through the AddressResolver. The address resolver is going to give us a node leader for a Producer and a node replica for the consumer, otherwise it will close the connection and retry.

const client = await connect({
  hostname: "node0",
  port: 5562,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
  addressResolver: { enabled: true },
})

const streamName = "my-stream"
await rabbit.createStream(streamName)

await wait(200) // wait for replicas to be created

// ...

await client.close()

Super Stream

It is possible to create a super stream directly through the client only if you are using the latest (3.13.0-rc) management version. Currently we do not support batch publishing and compression - that feature is coming soon

const client = await rabbit.connect({
  hostname: "localhost",
  port: 5552,
  username: rabbitUser,
  password: rabbitPassword,
  vhost: "/",
  heartbeat: 0,
})
await client.createSuperStream({ streamName: "super-stream-example" })
await sleep(200) // Waiting for partitions to be created

const routingKeyExtractor = (content, msgOptions) => msgOptions.messageProperties.messageId
const publisher = await client.declareSuperStreamPublisher({ superStream: "super-stream-example" }, routingKeyExtractor)

await publisher.send(Buffer.from("Test message 1"), { messageProperties: { messageId: "1" } })
await publisher.send(Buffer.from("Test message 2"), { messageProperties: { messageId: "2" } })
await publisher.send(Buffer.from("Test message 3"), { messageProperties: { messageId: "3" } })

await client.declareSuperStreamConsumer({ superStream: "super-stream-example" }, (message) => {
  console.log(`Received message ${message.content.toString()}`)
})

await sleep(2000)

await client.close()

Filtering

It is possible to tag messages while publishing and filter them on both the broker side and client side

const client = await connect({
  hostname: "localhost",
  port: 5552,
  username: "rabbit",
  password: "rabbit",
  vhost: "/",
})

const publisher = await client.declarePublisher(
  { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
  (msg) => msg.applicationProperties!["test"].toString() // Tags the message
)
const message1 = "test1"
const message2 = "test2"
const message3 = "test3"
const applicationProperties1 = { test: "A" }
const applicationProperties2 = { test: "B" }

await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 })

await client.declareConsumer(
  {
    stream: streamName,
    offset: Offset.first(),
    // Filter option for the consumer
    filter: {
      values: ["A", "B"],
      postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A",
      matchUnfiltered: true,
    },
  },
  (msg) => filteredMsg.push(msg.content.toString("utf-8"))
)

await sleep(2000)

await client.close()

Running Examples

the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps

move to the example folder and install the dependencies

cd example
npm i

run the docker-compose to launch a rabbit instance already stream enabled

docker-compose up -d

add this line to your host file (on linux /etc/hosts) to correctly resolve rabbitmq

127.0.0.1       rabbitmq

then launch the examples

npm start

Build from source

Build:

npm run build

Test:

docker-compose up -d
npm run test

Check everything:

npm run check

MISC

https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbitmq_stream/docs/PROTOCOL.adoc