npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

stream-fusion

v1.0.2

Published

Fuse multiple streams together based on some field and a transformation function

Downloads

7

Readme

Stream-Fusion

Relationally join streams based on some computated, comparable (but not necessarily equal) key (e.g. a time stamp). For instance, this is useful for sensor fusion - combining multiple streams of timestamped sensor data. This project was developed for Clearpath Robotics.

Terrible documentation incoming...

Contract and usage

var fuser = new StreamFusion([stream*], [options])

Where a stream is an object such that

{
  stream: /* a Node.js compatiable stream */
  key: /* how to compare a stream to another stream (should return a integer or string). See http://lodashjs.org/#iteratee */
  check: /* [default=false] whether the fusion stream should (try to) emit data whenever this stream emits data */
  bufferLeft: [default=options.buffer] how many items to the left of the current item to include in the window
  bufferRight: [default=options.buffer] how many items to the right of the current item to include in the window
  // Note if bufferLeft and bufferRight is unset we'll use options.buffer and the closest value will be in the middle
  maxRechecks: /* [default=options.buffer * 2] The num of times to fit any piece of data in the window (see options.buffer) */
  bufferLength: /* [default=options.bufferLength] how much historical data should be kept. If streams are in sync you should definetely set this to a low value */
}

And options is of the form

{
  objectMode: /* [default=true] should the stream be in objectMode */
  buffer: /* [default=2] the window to the left and right closest to the value being checked (i.e. if buffer is 1 then transform will be called with `[valueBelowCheck, valueAboveCheck]`). Note: values will be buffered internally until there is enough info to the left and right of the window. Implementation note: the value in the middle will always be `<=` the value being checked */
  bufferLength: /* [default=1000] useful for post processing */
  maxRechecks: see above
}

You can create a transform to decide how the fused stream should emit data. The transform will be called with (in the case that stream 1 is being checked) [[stream0Window], [valueBeingChecked], [stream2Window]]. Where window for a given stream refers to an array of size leftBuffer + rightBuffer + 1 of the values, (obviously bufferLeft len to the left and bufferRight items to the right of the closest value in each stream. Call this.push in transform to publish data

fusionStream.transform = function(streams) {
  var stream0 = streams[0],
      streams1 = streams[1];

  this.push({"cat": streams0[0].id, "time": streams1[1].time});
  // Call push as many times as you want doing whatever u want to publish to the stream
};

By default (in the case that stream 1 is the stream being checked) it will be called with [[closestValueFromStream0], [valueBeingChecked], [closestValueFromStream2]]


The fused stream is simply a Node.js Stream and should be used as such

Algorithm

I spent a good amount of time developing an optimal algorithm for processing this problem, while still leaving a powerful and flexible API. Under the hood, we leverage many of the features of the Node stream API, circular lists

  • Create a circular list of length bufferLength for each provided list
  • Start listening for data from each stream or finished events on each stream

Non watched streams new data

  • Append an item to the corresponding circular buffer
  • Check if any watch streams have data buffered dependent on new messages from this stream. For example if the buffer is 3 and the key is based on timestamp, we would have to wait for 2 to 3 (depending if the messages fire on the same millisecond) new messages for enough data to have give a window of 2 items to the right (2 to the left) of the closest value
    • If there is data being buffered dependent on this stream proceed to the checked stream algorithm

Checked stream new data

  • For each stream (besides the one being checked)
    • Do a circular binary search to find the index of the comparitively closest value in the streams (sorted) circular buffer. Note: smaller bufferLengths will yield fewer the iterations in the search
    • If there are enough (buffer size) items to the left and right of the index
      • Do a circular slice on the items buffer indexs to the left and right of the index
      • continue looping through the streams
    • Otherwise enqueue the current data from this stream to be checked later (see above). (Abort)
  • If enough data has been seen from each stream call the transform stream with the data in each window

Example (see tests for more examples)

Using roslibjs and node-serialport we can create fusion windows based on timestamps (in different time formats).

var Ros = require("roslibjs/src/core/Ros");
var ros = new Ros("connect to rosbridge");
var serialport = require("serialport");
var SerialPort = serialport.SerialPort;

var through = require("through2")

var accelerometerStream = ros.Topic({
  name: "/accelerometer/data"
}).toStream();

var gpsStream = ros.Topic({
  name: "/navsat/fix"
}).toStream();

// Connect to some serial device (e.g. GPS) and pipe through nmea transform
var compassStream = new SerialPort("/dev/tty-usbserial1", {
  parser: serialport.parsers.raw
})
.pipe(require("nmea").createDefaultTransformer())
.pipe(through.obj(function(chunk, enc, callback) {
  chunk.timestamp = Date.now();
  callback();
}));

var Fusion = require("stream-fusion");

// Convert time in nanosecs to ms
function headerToTimestamp(item) {
  return item.header.stamp.secs * 10e2 + item.header.stamp.nsecs / 10e5;
}

var fused = new Fusion(
    // inherit options
    {stream: accelerometerStream, key: headerToStamp, bufferLength: 50},
    // explictly set buffer sizes
    {stream: gpsStream, key: headerToStamp, bufferLeft: 5, bufferRight: 2, bufferLength: 10},
    {stream: compassStream, key: "timestamp", check: true},
    // Options
    {bufferLength: 50, buffer: 2}
);

fused.transform = function(streamData) {
  var accelerometerWindow = streamData[0];
  var gpsWindow = streamData[1];
  var compassValue = streamData[2][0];

  this.push( /* computed value in window */ );
  // can push as many times as desired but only one item per push
};

// Pipe the live data out
fused.pipe(fusionTopic.toStream());

Post processing stream fusion example

For post processing, it's important to set a large bufferLength and high maxRetries value in order to prevent losing data. Otherwise the API remains the same :)

var fs = require("fs");
var http = require("http");
var csv = require("csv-stream");
var Fusion = require("stream-fusion");

var marketStream = fs.readFile("../transactions.log").pipe(csv);
var exchangeRateStream = http.get(/* exchange rate service */).pipe(csv)

var fused = new Fusion(
  // Use a large buffer so we don't lose any transactions
  {stream: marketStream, key: "timestamp", check: true, bufferLength: 10e5, maxRetries: 10e3, buffer: 1},
  {stream: exchangeRateStream, key: function(row) {
    // for instance
    return new Date(row.year, row.month, row.day, row.hour, row.minute);
  }, check: false, bufferLength: 10e4, bufferLeft: 1, bufferRight: 1}
);

fused.transform = function(window) {
  var marketItem = window[0][0];
  var exchangeData = window[1];

  // Do a IDW calculation or something to match the data to the time stamp of the transaction
  var computed = f();

  this.push(computed);
}

fused.pipe(/* wherever*/ );