npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

abstract-message-queue

v1.0.0

Published

The AbstractMessageQueue attempts to define a common interface for interacting with message queues. By depending on this interface instead of any particular queue implementation, application code can remain flexible for different environments with differe

Downloads

1

Readme

AbstractMessageQueue

The AbstractMessageQueue attempts to define a common interface for interacting with message queues. By depending on this interface instead of any particular queue implementation, application code can remain flexible for different environments with different requirements.

I have tried to keep the interface to a minimum so as to make it easy to implement. Any extra features can be added at an implementor's discression, or by wrapping a queue instance in another library.

Inspired by abstract-blob-store.

In this package

This package contains:

- A class for helping to create your implementation of a MessageQueue
- A function for testing your implementation against the spec

You do not have to use either of these in order to be compliant - they are just (hopefully) useful tools.

Using a MessageQueue

The MessageQueue is designed to be used in a for await loop. The loop must be run in series, but you may have multiple loops running in paralel.

for await (const message of queueInstance) {
	const success = processMessage(message);

	// On successful processing, remove message from queue
	if(success) {
		queueInstance.delete();
	}
}

Abstract Spec

A message queue instance only needs to have three methods:

- `[Symbol.asyncIterator]`
- `delete`
- `send`

[Symbol.asyncIterator]

A method that returns an instance of an async iterator. The iterator must have a next method, and should have return and throw methods.

The easiest way to do this is by using an async generator, but does not have to be. When run, the iterator must do the following:

  1. Fetch and remove the next message from the queue (this removal may be temporary and time-based)
  2. Yield the message
  3. If the delete method has been called, permanently remove the message from the queue
  4. Otherwise, return the message to the queue (the message does not have to be immediately available again)

Steps 3 and 4 should happen regardless of whether an error is thrown at the yield stage.

An illistration of how to achieve this with an asyncIterator:

class MyQueue {
	delete(){
		this.deleteCalled = true;
	}

	async * [Symbol.asyncIterator](){
		while(true) {
			this.deleteCalled = false;
			const { id, message } = await somehowGetNextMessage();

			try {
				yield message;
			} finally {
				if(this.deleteCalled) {
					deleteMessage(id);
				} else {
					retryMessage(id);
				}
			}
		}
	}
}

This method may also take other actions such as opening and closing connections to external resources.

delete

This is a simple, syncronous message that sets a flag for the message to be deleted from the queue when the iterator is resumed or finalized.

send

When called with a message as the first argument, add that message to the queue.

Implementing a MessageQueue

You can use this package to create your message queue class by extending the provided MessageQueue class:

import MessageQueue from 'abstract-message-queue';

class MyQueue extends MessageQueue {
	constructor(){
		// Call the superconstructor with config methods:
		super({
			// The next method fetches the next message
			async next(){
				return {
					id: 'optional - some value used to identify a message interally',
					message: 'the message - may be of any time, including objects'
				}
			},
			async delete(){
				// Remove the message from the queue entirely
			},
			async retry(){
				// Re-add the message to the queue to be tried again at a later date
			}
		})
	}

	send(message){
		// Add the new message to the queue
	}

	async * [Symbol.asyncIterator](){
		// You may optionally extend the iterator method to open and close connections
		this.connection = openConnection();

		try {
			yield* super[Symbol.asyncIterator]();
		} finally {
			this.connection.close();
		}
	}
}

Dependencies

None

abstract-message-queue

MessageQueue

See: default

module.exports ⏏

Kind: Exported class

new module.exports(options)

Construct an instance of the message queue

| Param | Type | Description | | --- | --- | --- | | options | Object | Protected methods for the queue | | options.next | function | Function that gets the next message from the queue. Its return value must be an object with a "message" property and an optional "id" | | options.retry | function | Function that re-queues the last item returned by next. The item ID will be passed as the first argument. | | options.delete | function | Function that deletes the last item rturned by next. The item ID will be passed as the first argument. |

module.exports.delete()

Deletes the last message from the queue

Kind: instance method of module.exports

module.exports.default

Utility class to help implement the AbstractMessageQueue

Kind: static property of module.exports