npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

assemble-worker

v1.0.0

Published

NodeJS / PostgreSQL / RabbitMQ worker library

Downloads

23

Readme

assemble-worker

Note

In order for integration tests to pass, pg-amqp-bridge-node must be running.

Introduction

This is a Node.js/PostgreSQL/RabbitMQ worker heavily influence by @benjie's graphile/worker. The goal is to have a similar API to graphile/workerwith a higher throughput enabled by the introduction of RabbitMQ.

Why Graphile Worker?

I like Graphile Worker because I can queue jobs via SQL triggers, which means:

  • I get to interact with external APIs asynchronously when using something like Postgraphile
  • My job queuing runs within PostgreSQL transactions, which means if a queue a job in a transaction that is rolled back, the transaction is not run

However, Graphile Worker runs at about 700 jobs/sec, which is not enough for some use cases. So, just like Graphile Worker, we want to:

  • Store job state in PostgreSQL
  • Have PostgreSQL manage failure behavior, with exponential backoff retry
  • Queue jobs via a PostgreSQL function
  • Define jobs as asynchronous NodeJS functions
  • Have it all work seamlessly™

But we want higher throughput!

Why RabbitMQ?

The bottleneck of the Graphile Worker is the SELECT FOR UPDATE .. SKIP LOCKEDfunctionality, which is a native PostgreSQL feature to ensure that two simultaneous queries selecting from the same table will not select each other's rows. This is necessary to prevent two concurrently running workers, both fetching jobs, from selecting the same job, and running one job twice.

Although Graphile Worker uses LISTEN/NOTIFY to notify workers about new jobs for low job queue -> job start latency, PostgreSQL NOTIFY sends a message to all clients listening on a channel – which makes it unsuitable as a method for delivering jobs to be run. Instead, it’s only suitable for notifying workers that they should query for new jobs.

Fortunately, RabbitMQ uses something called Round Robin dispatch, which ensures that each message is only sent to one consumer at a time – the message may be redelivered to other consumers if that consumer fails to acknowledge it, but it will never be delivered to two consumers at the same time.

By using SubZero’s high throughput pg-amqp-bridge, we’re able to send messages to RabbitMQ from Postgres at a higher rate, and scale our NodeJS workers around them to achieve a much higher throughput!

This library can achieve around 1400 jobs/sec with a concurrency of 1, and around 1800 jobs/sec with a greater concurrency on a single machine. I will update with more benchmarks as I have more data with more workers!

Usage

yarn install assemble-worker
import { run } from 'assemble-worker';

async function main() {
  const worker = await run({
    databaseConnectionString: 'postgres://localhost:5432/my_db',
    // optionally include a pgPool parameter to re-use connections - will
    // override databaseConnectionString if present

    amqpConnectionString: 'amqp://localhost',

    // When jobs aren't queued to be immediately run, PostgreSQL must
    // be 'poked' in order to check if there are jobs that should be run
    // and send them to RabbitMQ
    pokeInterval: 10000, // the default

    taskList: {
      'simple-task': {
        // Concurrency is the number of consumers to spawn on this node instance
        // for this task
        // There is currently no way to limit the rate of task execution
        // across workers – that would require implementing token buckets
        // or maybe using Redis
        concurrency: 1,

        // If you function throws an error, it will be retried
        // Any return value is ignored
        task: async function(payload) {
          // do some work
        }
      }
    }
  });

  // In additon to adding jobs via SQL, you can do
  await worker.addJob({
    queueName: 'simple-task',
    payload: { someValue: 4 }
  });

  // Now your working is running and can be stopped via:
  await worker.stop();
}

Via SQL, queue a job by running:

SELECT assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);

Or, if inside of a PostgreSQL function:

PERFORM assemble_worker.add_job('simple-task', '{"someValue": 4}'::json);

Installation

Assemble Worker will run and manage its own migrations on the database given by databaseConnectionString , and manage its own queue assertions in RabbitMQ for queue’s with keys in taskList.

Follow the instructions on pg-amqp-bridge’s Github for deploying it – I’ll post a sample Kubernetes configuration here when I get a chance.

Releases

Each release gets its own commit on master that includes the version bump and changelog updates. The version bump, changelog updates, commit, and tag are generated by standard-version:

yarn release

Other helpful options are:

# Preview the changes
yarn release --dry-run

# Specify the version manually
yarn release --release-as 1.5.0
# or the semver version type to bump
yarn release --release-as minor

# Specify an alpha release
yarn release --prerelease
# or the pre-release type
yarn release --prerelease alpha