npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

monkeycymbal

v1.5.0

Published

Production ready, MongoDB-based queue for Node.

Downloads

35

Readme


Features

  • [x] Delayed jobs
  • [x] Retries
  • [x] Dead queue
  • [x] Priority
  • [x] Concurrency
  • [x] Pause/resume processing
  • [x] Optional Topic based publishing (publish into multiple queues)
  • [x] Low CPU usage
  • [x] Able to process around 1000 messages per second (tested on Macbook Pro 13-inch, 2017, concurrency set to 150)

Install

npm install monkeycymbal --save

or

yarn add monkeycymbal

Requirements: Monkey Cymbal requires MongoDB


Quick Guide

Basic Usage

import { Queue } from 'monkeycymbal';

// Initialize queue
const queue = new Queue('mongodb://localhost/myDb', 'videoTranscoding');

// subscribe to the queue
queue.subscribe((msg) => {
  // process the message
  // ...
  
  // we can return a result that will be saved in the message
  
  return 'transcoded';
});

// Add a message to the queue
const [msgId] = await queue.add({ video: 'http://example.com/video1.mov' });

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

await queue.pause()

// queue is paused now

await queue.resume()

// queue is resumed now

Events

A queue emits also some useful events, for example...

queue.on('added', msgId => {
  // A message has been added to the queue
})

queue.on('active', msg => {
  // The message is being processed
});

queue.on('completed', (msg, result) => {
  // The message has been processed succesfully
})

queue.on('error', (msg, error) => {
  // An error occurred while processing the message.
  // If maxRetries is set, it will be re-processed after a visibility timeout
})

queue.on('dead', msg => {
  // The message is failed permanently.
  // If a dead queue is configured, the message will be copied there.
})

For more information on events, including the full list of events that are fired, check out the Events reference

Documentation

Queue

new Queue(connectionUrlOrMongoClient, queueName, options)

This is the Queue constructor. It creates a new Queue that is persisted in MongoDB.

| Name | Type | Description | | ----------------------------------------- | ------------------------------------------------- | ------------------------------------------------------- | | connectionUrlOrMongoClient required | MongoClient | string | MongoClient instance or MongoDB Connection Url | | queueName required | string | The name of the queue | | options | SubscriptionOptions | |

SubscriptionOptions

| Arguments | Type | Default | Description | | ----------------------------------------- | --------- | --------------------- | ------------------------------------------------------- | | visibility | number (seconds) | 10 | After a message is received to prevent other consumers from processing the message again, Monkeycymbal sets a visibility timeout, a period of time during which Monkeycymbal prevents other consumers from receiving and processing the message.| | delay | number (seconds) | | if you set delay to be 10, then every message will only be available for retrieval 10s after being added. | | maxRetries | number | 5 | Maximum number of attempts to retry processing a message. If deadQueue is set, the message will be moved to the dead queue. Otherwise it will be acked. | | expireAfterSeconds | number (seconds) | | The processed messages will be removed from the collection after the specified number of seconds. | | concurrency | number | 1 | The max number of messages that will be processed in parallel. | | pollInterval | number (seconds) | 10 | The amount of time the subscriber waits before checking for new messages. | | deadQueue | string or Queue instance | | Messages that have been retried over maxRetries will be pushed to this queue for later inspection. |

Queue.subscribe

queue.subscribe(handler);

Defines a processing function for the jobs in a given Queue and start processing. The handler function receive msg as argument.

Queue.add

queue.add(msg, AddMessageOptions);

Adds a message to the queue.

AddMessageOptions

| Arguments | Type | Default | Description | | ----------------------------------------- | --------- | --------------------- | ------------------------------------------------------- | | priority | number | 1 | Optional priority value. It ranges from -Infinity to +Infinity|

Queue.pause

queue.pause();

Pause a queue. A paused queue will not process new jobs until resumed.

Queue.resume

queue.resume();

Resume a queue after being paused.

Queue.ping

queue.ping(msg.ack);

Ping a message to keep it's visibility open for long-running tasks.

Queue.totalCount

queue.totalCount();

Returns the total number of records in the collection.

Queue.waitingCount

queue.waitingCount();

Returns the total number of messages that are waiting to be processed.

Queue.inFlightCount

queue.inFlightCount();

Returns the total number of messages that are currently being processed.

Channel

new Channel(connectionUrlOrMongoClient, topic, options)

This is the Channel constructor. It creates a new Channel.

| Name | Type | Description | | ----------------------------------------- | ------------------------------------------------- | ------------------------------------------------------- | | connectionUrlOrMongoClient required | MongoClient | string | MongoClient instance or MongoDB Connection Url | | topic required | topic | The name of the channel. | | options | SubscriptionOptions | |

Channel.publish

channel.publish(msg, PublishOptions);

Publish messages to the queues subscribed to the topic.

PublishOptions

| Arguments | Type | Default | Description | | ----------------------------------------- | --------- | --------------------- | ------------------------------------------------------- | | priority | number | 1 | Optional priority value. It ranges from -Infinity to +Infinity|

Channel.subscribe

channel.subscribe(handler, queueName, SubscriptionOptions): Queue;

Convenience method that returns an instance of a queue bound to the channel.