npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@thinkmill/node-worker

v1.0.0

Published

A simple, promise-based, worker framework

Downloads

6

Readme

Worker Framework

A simple, promise-based, worker framework.

Install

yarn add @thinkmill/node-worker

Constructor

| Argument | Type | Description | |----------|------|-------------| | label | String | A label that describes the worker (used in logs, errors, etc) | | payload | Function | A promise-returning function that is executed on schedule (received the run ordinal) | | options | Object | Options controlling how the worker should behave |

payload

The payload function represents the main body of the worker, where the real work is done. It should return a promise that resolves or rejects withing the timeoutMs provided to it.

The payload is invoked according to the following schedule:

  • 1000 ms after the worker.start() function is called
  • If the promise rejects or returns a truthy value, the worker will sleep for it's configured sleepMs period before invoking the payload
  • If the promise returns a falsey value the worker will sleep for 1000 ms before invoking the payload

When invoked, the payload will be provided with a single argument; an object containing the following:

| Property | Type | Description | |----------|------|-------------| | label | String | The worker label that was provided on construction | | ordinal | Number | An integer indicating how many times the payload has been executed | | timeoutMs | Number | The number of milliseconds the worker will wait for this invocation to return |

options

The options can contain:

| Property | Type | Description | |----------|------|-------------| | sleepMs | Number | How long do we pause between runs? (in milliseconds) | | timeoutMs | Number | How long do we wait (in milliseconds) for the run promise to resolve/reject? See important notes below! |

Note that the timeoutMs provided forces the end of a cycle (and allows the next run to be scheduled) but does not (and cannot?) terminate the still-running promise. If the promise returned by the payload function has errored internally (without resolving or rejecting) then that's OK; the schedule timeout will prevent the worker from stalling forever. But if the promise returned is still doing work, there's the possibility we'll end up with multiple instances of the payload executing in tandem. This is almost certainly a Bad Thing, but that's up to you.

The take away:

  • Make sure your worker promises always resolve
  • Probably set a fairly long timeoutMs value

Instance methods

| Method | Description | | ------ | ----------- | | start() | Start the worker after a short delay. | | stop() | Stop the worker. The currently running job will be allowed to compete but the worker will not restart afterwords. | | scheduleRunInMs(delayMs, onceOff) | Schedule a run in delayMs milliseconds. If onceOff is true this will trigger an extra run rather than rescheduling the next run. If the worker has been stopped this will have no effect. |

Usage

Examples

A simple example:

const Worker = require('@thinkmill/node-worker');

const myWorker = new Worker(
  'test-worker',
  ({ label, ordinal, timeoutMs }) => {
    return new Promise((resolve, reject) => {
      const takeMs = 4000 + (Math.random() * 1500);
      console.log(`Run #${ordinal} ..
        will take ${takeMs} ms`);
      setTimeout(() => {
        console.log(`Run #${ordinal} ..
          resolving`);
        resolve(true);
      }, takeMs);
    });
  }, {
    sleepMs: 5000,
    timeoutMs: 5 * 1000,
  }
);

myWorker.start();

A more realistic/interesting example, processing items in a queue:

const Worker = require('@thinkmill/node-worker');
const debug = require('debug')('workers:dequeue-things');
const Model = require('../models/queuedThings');

// Manage the dequeuing of things
const payload = async ({ label, ordinal, timeoutMs }) => {
  const runForMs = timeoutMs - 1000;
  const runUntil = new Date(Date.now() + runForMs);

  debug(`Running for ${runForMs} ms (until ${runUntil.toISOString()})`);

  let processedCount = 0;
  let queueEmptied = false;
  let nextThing;

  do {
    await knex.transaction(async (trx) => {

      // Get the next thing from the queue
      nextThing = await Model.query(trx).findOne('isReady', true).whereNull('processedAt').orderBy('queuedAt');

      // The queue is empty; exit early
      if (!nextThing) {
        queueEmptied = true;
        return;
      }

      // Do whatever it is that things do
      // ..

      // Record that we've processed this thing
      await Model.query(trx).update({ processedAt: new Date() }).where({ id: nextThing.id });

      // Inc. our count
      processedCount++;
    });
  }
  while (new Date() < runUntil);

  // Output some debug info
  const summaryMsg = `DONE: ${processedCount} things processed, leaving the queue ${queueEmptied ? 'EMPTY' : 'NOT EMPTY'}`;
  debug(summaryMsg);
  // Resolve with a boolean indicating whether the payload should be re-invoked soon or after the normal sleep
  return queueEmptied;
};

// Create the worker instance and start it
const worker = new Worker('dequeue-things', payload, { sleepMs: 60 * 1000 });
worker.start();

You can also request a once-off worker run from some other action. For example, if you have a worker sending emails which processes a queue every 10 minutes, you might want to trigger that worker after a successful account creation.

class AccountCreator {

	onCreateSuccess () {
		// Tell the email worker to run so the user receives email promptly.
		const onceOff = true;
		require('../email-worker').scheduleRunInMs(200, onceOff);
	}
}

Debug

We use the debug package internally. Entries scoped to workers:${label} (where label is that supplied on construction). It's probably helpful to follow this pattern in your own payload functions.

Output can be enabled by supplying a scope (or list of scopes) to output in the DEBUG env var. This can include wildcards. Eg

# All worker debug
DEBUG=workers:* yarn start

# Debug for a specific worker
DEBUG=workers:send-notifications yarn start

# Debug for several specific workers
DEBUG=workers:send-notifications,workers:send-emails yarn start

License

BSD Licensed. Copyright (c) Thinkmill 2018.