npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

data-drift

v1.0.0

Published

Data streaming and scheduling library for Node.JS

Downloads

39

Readme

data-drift

Extensively configurable and stateful data-transformation-stream builder

Have you ever built a data pipeline, just to find out that the order of your transformers might change during runtime? But how to do something like that? Thankfully, data-drift comes to the rescue!

Data-drift is a highly configurable pipeline builder, which allows you to add different segments and re-organize them. On top of that, everything is built on top of Object Streams, which means you can add a state to the streams. In short, you can have one pipeline and send different data through it, which you can easily distinguish thanks to the state.

As an additional bonus, data-drift makes use of monadic Results and Options, which results in superior error-management and better performance (in error cases), as nothing has to unwind with try..catch.

You can find the complete API, as defined in code, below the examples!

Installation

Data-drift requires Node.JS v6+. For fast install-times, I recommend using npm v5+.

$ npm i data-drift

Simple Example

'use strict';

const Stream = require('stream');
const DD = require('data-drift');


const pipeline = new DD();

// create one source, one drain and n transformers.
// everything has to be in Object Mode, so we cannot simply use stdin and stdout.
const source = new Stream.Readable({ objectMode: true, });
const drain = new Stream.Writable({ objectMode: true, });
const trans = new Stream.Transform({ objectMode: true, });

// the source has to emit an object you want to use in your pipeline
source._read = function() {
    const input = process.stdin.read();
    if (input !== null) {
        this.push({
            state: {},
            data: input,
        });
    }
};

process.stdin.on('data', data => {
    source.push({
        state: {},
        data: data.toString(),
    });
});

// don't forget to always pass the initial object
trans._transform = function(data, _, cb) {
    data.data = `You just inputted "${data.data.replace(/\r?\n?$/, '')}"!\n`;
    cb(null, data);
};

// the drain can consume the object in any way it wants,
// for example write it to your HTTP server as response.
drain._write = function (data) {
    process.stdout.write(data.data);
};

// when using data-drift, you have to register all pieces
// you can register new workers any time you want
// however, there can only be one source and one drain at a time!
pipeline.registerSegment(DD.SegmentTypes.SOURCE, source);
pipeline.registerSegment(DD.SegmentTypes.DRAIN, drain);
pipeline.registerSegment(DD.SegmentTypes.WORKER, trans);
// add as many transformers as you like and hot-re-order them later on :)

// then start the pipeline
pipeline.buildPipeline();

Usage

Create New Pipeline

'use strict';

const Stream = require('stream');
const DD = require('data-drift');


const pipeline = new DD();

// create one source, one drain and n transformers.
// everything has to be in Object Mode, so we cannot simply use stdin and stdout.
const source = new Stream.Readable({ objectMode: true, });
const drain = new Stream.Writable({ objectMode: true, });
const trans = new Stream.Transform({ objectMode: true, });
const trans2 = new Stream.Transform({ objectMode: true, });

// the source has to emit an object you want to use in your pipeline
source._read = function() {
    const input = process.stdin.read();
    if (input !== null) {
        this.push({
            state: {},
            data: input,
        });
    }
};

// the source can be fed manually
process.stdin.on('data', data => {
    source.push({
        state: {},
        data: data.toString(),
    });
});

// don't forget to always pass the initial object
trans._transform = function(data, _, cb) {
    data.data = `You just inputted "${data.data.replace(/\r?\n?$/, '')}"!`;
    cb(null, data);
};

trans2._transform = function(data, _, cb) {
    data.data += ' ~ ';
    data.state.foo = 'FOO';
    cb(null, data);
};

// the drain can consume the object in any way it wants,
// for example write it to your HTTP server as response.
drain._write = function (data, _, cb) {
    process.stdout.write(`Data: "${data.data}" State: ${JSON.stringify(data.state)}\n`);
    cb();
};

// ...

Register Segments

// ...

// when using data-drift, you have to register all pieces
// you can register new workers any time you want
// however, there can only be one source and one drain at a time!
pipeline.registerSegment(DD.SegmentTypes.SOURCE, source);
pipeline.registerSegment(DD.SegmentTypes.DRAIN, drain);
const transformer1 = pipeline.registerSegment(DD.SegmentTypes.WORKER, trans).unwrap();
const transformer2 = pipeline.registerSegment(DD.SegmentTypes.WORKER, trans2).unwrap();
// add as many transformers as you like and hot-re-order them later on :)

//...

Start Pipeline

// ...

// then start the pipeline
pipeline.buildPipeline();

//...

Re-Order Segment

// ...

// type something, wait 10s, type again to see the difference
setTimeout(() => {
    console.log('Swap transformers...');

    // the first position (after a source, if available) has the index 0
    pipeline.setSegmentPosition(transformer2, 0);

    // the next line is implicit, since all subsequent segments are pushed to the next position
    //pipeline.setSegmentPosition(transformer1, 1);
}, 10000);

API

The interface below includes Exceptions, however all methods are fully implemented and will not throw. The Exceptions are in place in order to provide you a clear, non-cluttered API overview.

class DataDrift {
    static get SegmentType() {
        return {
            SOURCE: 0b001,
            WORKER: 0b010,
            DRAIN:  0b100,
        };
    };

    constructor() { super({ objectMode: true, }); this._init(); };

    /**
     * Build pipeline and make it start working.
     * You only have to call this method once to kick off the pipeline.
     *
     * @returns {boolean} true if the process was successful. If a source or drain is missing, false is returned.
     */
    buildPipeline() { throw new Error('Not implemented: DataDrift.buildPipeline'); };

    /**
     * Get position of a segment in the chain
     *
     * @param {object} segment
     * @returns {Option<number>}
     */
    getSegmentPosition(segment) { throw new Error('Not implemented: DataDrift.getSegmentPosition'); };

    /**
     * Change the position of one of the worker segments
     *
     * @param {number} segment segment ID
     * @param {number} position new position, whereas 0 is the first index (after a source, if available)
     */
    setSegmentPosition(segment, position) { throw new Error('Not implemented: DataDrift.setSegmentPosition'); };

    /**
     * Register a new segment.
     * Can return the following errnos in an Err:
     *   EPARAMETER: Either the type or the segment were not specified correctly.
     *   ESOURCEALREADYREGISTERED: A source has already been registered before. There can only be one source.
     *   EDRAINALREADYREGISTERED: A drain has already been registered before. There can only be one drain.
     *   ESOURCEMUSTBEREADABLESTREAM: A source must be derived from a readable stream.
     *   EDRAINMUSTBEWRITABLESTREAM: A drain must be derived from a writable stream.
     *   ECREATEITEM
     *
     * @param {SegmentType} type
     * @param {function} segment
     * @returns {Result<number,VError>} segment ID as used by the Drift
     */
    registerSegment(type, segment) { throw new Error('Not implemented: DataDrift.registerSegment'); };

    /**
     * Unregister a segment from the pipeline
     *
     * @param {number} segment segment ID
     */
    unregisterSegment(segment) { throw new Error('Not implemented: DataDrift.unregisterSegment'); };

    /**
     * Check if the pipeline has a certain segment type
     *
     * @param {SegmentType} type
     * @returns {boolean}
     */
    hasSegmentType(type) { throw new Error('Not implemented: DataDrift.hasSegmentType'); };
};