npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

message-bus

v0.0.6

Published

An implementation of resque based message bus.

Downloads

24

Readme

message-bus

This package implements message bus using redis-based queue (resque). It provides a convenient way of building asynchronous services in node.js based architecture.

Install

npm install message-bus

Use

var MessgeBus = require('message-bus');

APIs

new MessageBus(cfg)

This creates a message bus instance. The configuration object is of the following format:

{
    "type": "Object",
    "fields": {
        "mysql_config": {
            "type": "Alias",
            "alias": "MYSQL_CONFIG"
        },
        "resque_configs": {
            "type": "Array",
            "element": {
                "type": "Alias",
                "alias": "COFFEE_RESQUE_CONFIG"
            }
        },
        "retry_limit": {
            "type": "Number",
            "nullable": true,
            "default": 5
        }
    }
}

Please refer to the following packages:

for the format of the mysql configuration and coffee-resque configuration included in messge-bus configuration. The configuration field "retry_limit" will be used in the retry API.

Example

var mb = new MessageBus({
    mysql_config: {
        host     : 'localhost',
        user     : 'root',
        password : '',
        database: "message_bus"
    },
    resque_configs: [
        {
            "host": "localhost",
            "port": 6379,
            "timeout": 3000
        }
    ],
    retry_limit: 10
});

A message bus relies on two types of storage: mysql and resque, where mysql is used to store the message body and resque is used to trigger the listeners registered on the message. For the mysql storage, we enforce the following schema:

create table if not exists tasks (
  id bigint(20) not null primary key auto_increment,
  version bigint(20) not null default 0,
  date_created datetime not null,
  last_updated datetime not null,
  name varchar(255) not null,
  args text not null,
  status varchar(255) not null,
  retry_times int(11) not null default 0
);

to exist in the database specified in the configuration object. For resque, we allow muliple queues to be included in the configuration, which will be used in a round-robin fashion to increased the availability and stability of the message bus. Specifically, if one resque fails, the message load will be falling back to the rest resque(s).

  • event: a string that represents an event.
  • args: the payload object of the event. It will be passed into the listener function.
  • callback: a callback function to continue with the rest of the program flow.

This API fires an consumable event to the message bus, which will be handled by registered listeners of this particular event. Note that this is NOT a pub/sub model, in that the event is consumable. Once it is handled by one of the listeners, it is consumed and no other listeners will further receive it.

Example

mb.fire('test:foobar100', {foo: 'bar', text: 'This is the payload.'}, cb);
  • event: a string that represents an event.
  • handler_procedure: the handler procedure of the event.
  • number_of_concurrency: the number of concurrent threads that are allowed to process the event stream simultaneously. This parameter is optional with a default value to be 1.

This API attaches a listener to an event. When the event is fired, this listener will be invoked if the event is consumed by it.

Example

mb.addListener('test:foobar100', function(args, cb) {
    console.log('start to process args: ', args);
    setTimeout(function(){
        console.log('finish to process args: ', args);
    }, 1000);
}, 5);

This API shuts down the message bus and clean up resources. It

  • drains the currently running handlers (by allowing them to finish);
  • tears down the mysql connections;
  • tears down the resque connections.

Example

mb.stop(cb);

This API performs garbage collection on the mysql database. It sweeps out all the messages that are either done, or had failed with fatal errors. This API is usually used in a cron job to clean up the message bus mysql storage periodically.

Example

mb.garbageCollect(cb);

messageBus.retry(callback)

This API retries the messages that had failed with recoverable errors. It is usually used in a cron job to increase the reliability of the message bus. The number of retries before declaring fatal can be configured in the constructor with the field "retry_limit", the default value of which is 5.

Example

mb.retry(cb);