npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rxmsg

v4.1.2

Published

![RxJS Message Diagram](docs/images/diagram.png?123)

Downloads

14

Readme

RxJS Message Diagram

RxMsg

A powerfull and simple universal messaging abstraction

This library makes it easy to send messages in a distributed network transparent way via various brokers using RxJS streams.

RxMsg uses a versatile middleware pattern to create messaging endpoints that are extremely flexible.

Sending a message

const { createProducer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');

const middleware = createAmqpConnector(amqpConfig).sender();
const producer = createProducer(middleware);

// RxJS observer
producer.next({ body: 'Hello World!', to: 'hello' });

Receiving a message

const { createConsumer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');

const middleware = createAmqpConnector(amqpConfig).receiver({ noAck: true, queue: 'hello' });
const consumer = createConsumer(middleware);

// RxJS observable
consumer.subscribe(msg => {
  console.log(`Received: "${msg.body}"`);
});

Configure your broker

module.exports.amqpConfig = {
  declarations: {
    // List the queues, exchanges etc. you want to use here.
    queues: [
      {
        durable: false,
        name: 'hello'
      }
    ]
  },
  uri: 'amqp://user:[email protected]/user'
};

Using Middleware

The endpoint creators each accept a list of middleware as arguments. When the producer sends a message it passes top down through the list of middleware.

Producer middleware

Messages come into the system top to bottom. In this case from a producer.next(msg) call.

const producer = createProducer(
  transformMessageSomehow,      // Step 1 - Do some transformation
  broadCastsMessagesSomewhere   // Step 2 - The last middleware must do the broadcasting
);

Consumer middleware

Again, messages come into the system top to bottom. Here this would be from an external broker via the top middleware.

const consumer = createConsumer(
  receivesMessagesFromSomewhere, // Step 1 - The first middleware must emit the message.
  logOrTransformMessage,         // Step 2 - Perhaps send the message to a logger.
  doSomeMoreTransformation       // Step 3 - Run another transform on the message before subscription.
);

Creating your own Middleware

Middleware is simple as they are only functions designed to decorate RxJS streams. Here is their signature:

type Middleware<T> = (stream: Observable<T>) => Observable<T>;

Here is an example:

function logger(stream) {
  return stream.pipe(
    tap(
      (msg) => console.log(`Stream logged: ${msg.body}`
    )
  );
}

You might use a middleware by passing it as one of the arguments to the createProducer() or createConsumer() functions.

const consumer = createConsumer(amqpReceiver, logger);

Manipulating messages

Note that because consumer is simply an RxJS observable you can apply filtering and throttling or do whatever you want to it

const sub = consumer
  .pipe(filter(msg => msg.body.toLowerCase().includes('world')))
  .subscribe(msg => {
    console.log(`Received: ${msg.body}`);
  });

Installation

You can install over npm.

yarn add rxmsg
npm install rxmsg --save

Getting Started Examples

You can checkout the getting started example here:

  1. RabbitMQ
  2. Kafka (coming soon)
  3. Node Processes (coming soon)
  4. Web Workers (coming soon)
  5. Socket.io (coming soon)

RabbitMQ Examples as tests

For usage and examples please look at the basic tests thrown together here

  1. Hello World
  2. Work Queues
  3. PubSub
  4. Routing
  5. Topics

Usage with Typescript

Messages

Generic message objects look like this:

// Generic message
export interface IMessage {
  body: any;
  to: any;
  correlationId?: string;
  replyTo?: string;
}

You might use a message by sending it to the next() method of a producer.

producer.next({
  body: 'Hi there!',
  to: 'some-queue'
});

Project Principles:

  • Declarative over imperative.
  • Functions over classes.
  • Simplicity over complexity.
  • Immutable over mutable.
  • Flexible and composable over fixed heirarchy.
  • Pure over impure.
  • Minmalistic sensible defaults over boilerplate.
  • Idiomatic API's over reinventing the wheel.

Environments

  • Basic framework should work in all V8 environments. eg.
  • Middleware is environment specific. Eg. rxmsg/amqp requires node. rxmsg/socketio-browser (coming soon) requires a browser environment eg. window, document etc.

Broker Support

Currently we support the following brokers:

  • [x] AMQP / RabbitMQ
  • [ ] Kafka
  • [ ] Node Processes
  • [ ] Web Workers
  • [ ] Socket.io

Is there a message broker you would like to see on this list? Want to get a specific integration sooner?

Create an issue or talk to me about sponsoring this project.

Architectural Roadmap

  • [ ] Refactor to lerna

RxJS References

Docs

Videos

NOTE: Using version 6

rxmsg uses RxJS v6.0 so you need to pipe all your operators:

import { filter } from 'rxjs/operators';

// ...

consumer.pipe(filter(forUserEvents(userId))).subscribe(
  msg => {
    dealWithMessage(msg.body);
  },
  () => {}
);

Other References

  • https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
  • https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
  • https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
  • https://www.rabbitmq.com/tutorials/tutorial-four-javascript.html
  • https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html
  • https://aws.amazon.com/blogs/compute/building-scalable-applications-and-microservices-adding-messaging-to-your-toolbox/