npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

nstorm

v2.0.1

Published

Node.js real-time processing engine, based on Twitter's Storm

Downloads

24

Readme

nStorm

A simple real-time processing system inspired by Storm and SimpleStorm.

Overview

The motivation for this work is to enable the easy creation of processing blocks that can be chained together in flexibile ways, coupled with the ability to scale by adding additional servers (workers).

The high-level objectives are;

  1. Ability to create simple processing blocks that can be easily chained together (create topologies).
  2. Easily scale by adding new servers/workers, ideally automatically.
  3. Automatic replay of messages. If a process dies, then re-send message to another worker.
  4. Robust. This is achieved through using Node's cluster module and running workers in child processes that are restarted upon failure.

Dependencies

To enable simple cross-server communications and replay of messages I made use of Redis, which is used for both a message store and also a pub-sub mechanism.

Installation

npm install nstorm

Usage

Include nStorm by

var nStorm = require('nstorm');

In Twitter Storm, you can create two types of blocks;

  1. Spouts are data sources, and must contain a start function that emits data.
  2. bolts Are chainable processing blocks, that must contain a process function.

nStorm uses the same concept, except we only have one type of block and they can have a process and a start method, thereby allowing hybrid blocks.

The start method can be used to emit data, i.e. a data source, or can be used for any initialization code a block needs.

Creating a Data source

Data source blocks are fairly easy to create, using the BaseBolt base class. The only restriction is that the block must contain a start() function that returns a promise. To output any messages, call the this.emit(data).

The data you pass should be an Array or Object.

The following block demonstrates a coin toss, it randomly 'emits' a head or a tails, and emits that as an object.

/**
 * Demo spout that randomly creates 'coins', either heads or tails
 */
class CoinSpout extends BaseBolt {

    async start() {

        Logger.info("Starting...");

        for (var i=0; i<1000; i++){

            var test = getRandomInt(0,100);

            var toss = 'tails';
            if (test < 50){
                toss = 'heads';
            }

            var row = {coin: toss, count: i, time: Date.now(), deltaSpout: getDelta()}

            Logger.debug("Emitting", row);

            // Use the utility delay function to wait 3 seconds
            await this.delay(3000)

            // Send the data to any block that is consuming it
            this.emit(row);
            
        }

        return

    }


}

Creating a Process Block

Creating a processing block is also pretty straight forward, and also extends from the BaseBolt class. The only restriction is that they must contain a process function. This function is passed a message and a done function that is called when the message has been processed. The message will be whatever data was emitted by a block that this block is connected to. So for example, if this was listening to the spout we just created then the message would be the result of the coin toss.

The block can also send data using the this.emit() function, which allows you to pass data to other blocks.

class HeadsBolt extends BaseBolt {

    async process(message, done) {

        message.deltaHeads = Date.now() - message.time
        //Logger.info(`Heads >>>>> ${message.coin} - message.count = ${message.count}`);

        // Pass data along
        this.emit(message);

        // Acknowledge
        done()

    }

}

Queue Providers

The default implementation uses the awesome kue under the hood. This creates a very robust system, but with some performance trade-offs. If you have a relatively simple implenetation with small jobs, then you can use the in memory 'async' implementation (uses async.queue).

Support for featureless-job-queue is coming soon.

Setting up a toplogy

Once you have created your blocks, now you can wire them up!

Here is an example based on the bolt and spout we just created

const Logger = require('nstorm').Logger
const nStorm = require('nstorm').nStorm
const BaseBolt = require('nstorm').BaseBolt

var opts = {
    queue:'simple', 
    debug:true
	redis: {
			port: 6379,
			host: '127.0.0.1',
			prefix: 'nstorm-pubsub:'
		}
}

async function test(){

    // Spout and Bolt implementation
    var coinTossSpout = new CoinSpout();
    var headsBolt = new HeadsBolt();
    var tailsBolt = new TailsBolt();
    var resultsBolt = new ResultsBolt();

    var cloud = new nStorm(opts);

    // Setting up topology using the topology builder
    cloud.addBlock("coindTossSpout", coinTossSpout);
    cloud.addBlock("tailsBolt", tailsBolt, 10).input("coindTossSpout", {filter:{coin: "tails"}});
    cloud.addBlock("headsBolt", headsBolt, 5).input("coindTossSpout", {filter:{coin: "heads"}});
    cloud.addBlock("resultsBolt", resultsBolt, 6).input("tailsBolt").input("headsBolt");

    // Setup cluster, and run topology...
    await cloud.setupTopology()
    await cloud.reset()
    await cloud.start()

}

test().then().catch(err=>Logger.error(err))

nStorm Options

When you create a nStorm cloud using var cloud = new nStorm(<options>); you can pass in the following options;

Option | Default | Description --- | --- | --- cloudName | "stormcloud" | Specify the topology name, used if you plan to run more than one topology. queue | 'kue' | The queue engine to use, options are 'kue', 'async' and 'featureless' redis | {port: 6379, host: '127.0.0.1'} | Redis connection object debug | false | Turns on logging replayLimit | 3 | The number of times a message is replayed before its considered bad and deleted, i.e. if the same message causes an exception 3 times stop replaying the message! replayTime | 300 | Time (ms) between checking for failed messages (deprecated) reset | false | Deprecated, use nStorm.reset() instead. This gives you more control of how and when to reset.

Parallelism

By default, every time a block gets a message its processing method is called. However, you can restrict the number of instances per worker by specifying a limit, e.g.

builder.addBlock("headsBolt", headsBolt, 2).input("coinTossSpout");

Here a maximum of 2 instances of the headsBolt will run. Any messages sent to that block will be buffered (using redis) until the block is ready.

Note, for this to work you must call context.ack(message) so nStorm knows you've finished processing the message.

Chaining inputs

Inputs can be chained, e.g.;

builder.addBlock("resultsBolt", resultsBolt, 1).input("tailsBolt").input("headsBolt");

So, here a 'resultsBot' is getting inputs from both a tailsBolt and a headsBolt.

Filtering

Inputs can be filted on a specific key and value, e.g

builder.addBlock("headsBolt", headsBolt, 1).input("coinTossSpout", {coin: "heads"});

Here the 'headsBolt' will reject any messages that do not have a "coin" key which equals "heads", i.e. it only accepts coin tosses of heads!

Adding remote workers

This is in the works, but because the system is built on top of redis this should be basically already possible......

Monitoring

A web based graphical monitor is in the works, and coming soon. Feel free to bug me if you'd like to see an early version.

Suggestions

Feel free to contact me at [email protected] if you want to help or have suggestions.

Contribution

Contributions are welcome!

Please feel free to file issues and submit pull requests.

Donate

If you like this, and use it, please consider donating to help support future development.

Donate Bitcoins

License

The MIT License (MIT)

Copyright (c)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.