npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

atributo

v1.1.2

Published

Allocate jobs across a variable number of instances

Downloads

85

Readme

Node.js module for managing allocation of job IDs across a variable number of instance IDs.

A typical use would be to allocate work across a number of worker processes. Job allocations are stored in a SQLite or PostgreSQL database which any worker (instance) can access.

You can add a new instance ID and jobs will start to be allocated to it. You can mark an instance ID as unavailable and no further jobs will be allocated to it. You can then wait until an instance ID has no jobs allocated to it before removing it.

This is useful if your jobs are sticky, i.e. a job must remain attached to the instance which started it. A job ID remains allocated to an instance ID even if a new instance ID is added which would otherwise have been allocated the job ID. You have to deallocate the job ID explicitly to get it reassigned.

API documentation is available here.

Example

const { Atributo } = require('atributo'),
      async = require('async'),
      assert = require('assert');

// Open the database file
new Atributo({ db_filename: 'atributo.sqlite3' }).on('ready', function () {
    async.waterfall([

        // Make instances available
        cb => this.available('instance0', cb),
        cb => this.available('instance1', cb),

        // List instances
        cb => this.instances(cb),
        (instances, cb) => {
            instances.sort((x, y) => x.id > y.id ? 1 : x.id < y.id ? -1 : 0);
            assert.deepStrictEqual(instances, [
                { id: 'instance0', available: true },
                { id: 'instance1', available: true }
            ]);
            cb();
        },

        // Allocate jobs
        cb => this.allocate('job0', cb),
        (instance_id, persisted, cb) => {
            assert(persisted); // 
            assert.strictEqual(instance_id, 'instance1');
            cb();
        },
        cb => this.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(persisted); // 
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },

        // List jobs for each instance
        cb => this.jobs('instance0', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, ['job1']);
            cb();
        },
        cb => this.jobs('instance1', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, ['job0']);
            cb();
        },

        // Check if instance has jobs
        cb => this.has_jobs('instance0', cb),
        (has_jobs, cb) => {
            assert(has_jobs);
            cb();
        },

        // Get instance for job
        cb => this.instance('job1', cb),
        (instance_id, cb) => {
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },

        // Make instance unavailable but don't remove it
        cb => this.unavailable('instance0', false, cb),

        // Check instance is unavailable
        cb => this.instances(cb),
        (instances, cb) => {
            instances.sort((x, y) => x.id > y.id ? 1 : x.id < y.id ? -1 : 0);
            assert.deepStrictEqual(instances, [
                { id: 'instance0', available: false },
                { id: 'instance1', available: true }
            ]);
            cb();
        },

        // Check existing allocation to unavailable instance
        cb => this.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(!persisted); // 
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },

        // Deallocate existing allocation
        cb => this.deallocate('job1', cb), // 

        // Re-allocate job
        cb => this.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(persisted); // 
            assert.strictEqual(instance_id, 'instance1');
            cb();
        },

        // Remove instance and its allocated jobs
        cb => this.unavailable('instance0', true, cb),

        // Check instance has been removed
        cb => this.instances(cb),
        (instances, cb) => {
            assert.deepStrictEqual(instances, [
                { id: 'instance1', available: true }
            ]);
            cb();
        },

        // Close database
        cb => this.close(cb)

    ], assert.ifError);
});
  • This is a new allocation persisted to the database in this call.

  • This is an allocation which already existed in the database before the instance was made unavailable.

  • The allocation is removed from the database.

Allocator

The default algorithm for allocating a job to an instance is to hash the job ID, treat the resulting digest as a 32 bit integer and use that as an index into the list of available instances.

You can change the default algorithm by overriding the _allocate method.

Here’s an example which knows the ID of the instance on which it’s running and only persists an allocation to the database if it’s for that instance.

Since _allocate is only called when the allocation doesn’t already exist in the database, if you call allocate for each job on every instance, this example can start a job on its instance when first allocated.

const { Atributo } = require('atributo'),
      async = require('async'),
      assert = require('assert');

class ExampleAtributo extends Atributo
{
    available(instance_id, cb) {
        // Remember out instance ID
        this._instance_id = instance_id;
        super.available(instance_id, cb);
    }

    allocate(job_id, cb) {
        super.allocate(job_id, (err, instance_id, persisted) => {
            if (persisted) {
                // first allocation on our instance so start job
            }
            cb(err, instance_id, persisted);
        });
    }

    _allocate(job_id, instance_ids, cb) {
        super._allocate(job_id, instance_ids, (err, instance_id, persist) => {
            if (instance_id !== this._instance_id) {
                // Don't persist if not our instance
                persist = false;
            }
            cb(err, instance_id, persist);
        });
    }
}

async.times(2, (i, cb) => {
    new ExampleAtributo({
        db_filename: 'atributo.sqlite3',
        instance_id: `instance${i}`
    }).on('ready', function () {
        cb(null, this);
    });
}, (err, [ao0, ao1]) => {
    assert.ifError(err);
    async.waterfall([

        // Make instances available
        cb => ao0.available('instance0', cb),
        cb => ao1.available('instance1', cb),

        // List instances on both Atributos
        cb => ao0.instances(cb),
        (instances, cb) => {
            instances.sort((x, y) => x.id > y.id ? 1 : x.id < y.id ? -1 : 0);
            assert.deepStrictEqual(instances, [
                { id: 'instance0', available: true },
                { id: 'instance1', available: true }
            ]);
            cb();
        },
        cb => ao1.instances(cb),
        (instances, cb) => {
            instances.sort((x, y) => x.id > y.id ? 1 : x.id < y.id ? -1 : 0);
            assert.deepStrictEqual(instances, [
                { id: 'instance0', available: true },
                { id: 'instance1', available: true }
            ]);
            cb();
        },

        // Job allocated on instance0 to instance1 should not be persisted
        cb => ao0.allocate('job0', cb),
        (instance_id, persisted, cb) => {
            assert(!persisted);
            assert.strictEqual(instance_id, 'instance1');
            cb();
        },
        cb => ao1.jobs('instance1', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, []);
            cb();
        },

        // Job allocated on instance1 to instance1 should be persisted
        cb => ao1.allocate('job0', cb),
        (instance_id, allocated, cb) => {
            assert(persisted);
            assert.strictEqual(instance_id, 'instance1');
            cb();
        },
        cb => ao1.jobs('instance1', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, ['job0']);
            cb();
        },

        // Job allocated on instance1 to instance0 should not be persisted
        cb => ao1.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(!persisted);
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },
        cb => ao1.jobs('instance0', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, []);
            cb();
        },

        // Job allocated on instance0 to instance0 should be persisted
        cb => ao0.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(persisted);
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },
        cb => ao1.jobs('instance0', cb),
        (jobs, cb) => {
            assert.deepStrictEqual(jobs, ['job1']);
            cb();
        },

        // Jobs should only be persisted once
        cb => ao1.allocate('job0', cb),
        (instance_id, persisted, cb) => {
            assert(!persisted);
            assert.strictEqual(instance_id, 'instance1');
            cb();
        },
        cb => ao0.allocate('job1', cb),
        (instance_id, persisted, cb) => {
            assert(!persisted);
            assert.strictEqual(instance_id, 'instance0');
            cb();
        },

        // Close database
        cb => ao0.close(cb),
        cb => ao1.close(cb)

    ], assert.ifError);
});

Installation

npm install atributo

SQLite

In the top-level directory you’ll find a file called atributo.empty.sqlite3. This contains an empty copy of the database atributo needs to store instance availablity and job allocations.

You should use a copy of this file in your application and pass its location as db_filename when constructing Atributo objects.

PostgreSQL

Pass pg as db_type and the node-postgres configuration as db when constructing Atributo objects.

Licence

MIT

Test

grunt test

Lint

grunt lint

Coverage

grunt coverage

c8 results are available here.

Coveralls page is here.