@concorde2k/bus.mq
v1.1.26
Published
Access the message queue to win friends and make money
Downloads
9
Keywords
Readme
Enterprise Service Bus
The enterprise bus is a consumer agnostic bus that allows consumers of any type to pass messages down the pipe, and with the correct configuration, get a response as well. This package is exposes classes and methods for interacting with the bus.
Installation
npm install concorde2k/bus.mq
Configuration
When you load this library, you must provide a url for both Rabbit and Kue. These can be specified in a configuration file or via the environment, with the environment the preferred method to make inclusion in the container more portable.
MQ_URL
(env)/qUrl
(file) - the url of the message queue (Rabbit). It would take the form of MQ_URL=amqp://concorde:[email protected]
JS_URL
(env)/jUrl
(file) - the url of the job queue (Kue). It looks like JQ_URL=redis://192.168.56.101:6379
Technical Docs
A description of the internal API for contributors can be found on bitbucket or locally.
Message Types
The bus can handle 4 types of messages, depending on what your needs are. These message types are referred to as "exchanges" in line with RabbitMQ terminology.
Pub/Sub
A publisher will send a message to all of the workers on a given exchange. Unlike the other exchange types, this one does not discriminate who gets the message. All listeners will get the message at the same time. This pattern is described in the Rabbit docs. The sender is the [[Publisher]] and the listener is [[Subscriber]].
Sender Example
const { Publisher } = require( "../dist/Publisher" );
const publisher = new Publisher( { exchange: "Lotus" } );
publisher.connect().then( () => {
publisher.write( "JimmyClark" );
}).catch((e)=>{
console.error("An error occurred", e);
});
Listener Example
const { Subscriber } = require( "concorde2k/bus.mq" );
const worker = new Subscriber( { exchange: "Lotus" } );
worker.connect().then( () => {
worker.on(Subscriber.onWorkItem, (message)=>{
console.log(message);
});
}).catch((e)=>{
console.error("An error occurred", e);
});
Task Messages
A Task models the competing consumers pattern. Messages are routed to the listeners round-robin and it provides a simple kind of load balancing. See the Rabbit docs for more information. The sender is the [[Task]] and the listener is a [[TaskWorker]].
Sender Example
const { Task } = require( "concorde2k/bus.mq" );
const tasks = new Task( { exchange: "Mclaren" } );
tasks.connect().then( () => {
tasks.write( "AyrtonSenna" );
}).catch((e)=>{
console.error("An error occurred", e);
});
Listener Example
const { TaskWorker } = require( "concorde2k/bus.mq" );
const worker = new TaskWorker( { exchange: "Mclaren" } );
worker.connect().then( () => {
worker.on(TaskWorker.onWorkItem, (message)=>{
console.log(message);
});
}).catch((e)=>{
console.error("An error occurred", e);
});
Router Messages
A route allows a sender to send messages to a subset of listeners. Wildcards are used to filter who receives a
message. If no wildcard is used, it becomes the same as Direct
messages.
If the receiver ([[RouteWorker]]) specifies that it will listen to all messages that
start with doc.hollywood
by declaring a key of doc.hollywood.*
, then a sender will be successful
with doc.hollywood
, doc.hollywood.sucks
, doc.hollywood.great
, but not
with avengers.end.game
. The key is the the use of periods (.), stars (*), and hashes (#) in the key when
spooling up a listener. See the Rabbit
docs for more information. The sender is
[[Route]] and the listener is [[RouteWorker]]
Sender Example
const { Route } = require( "concorde2k/bus.mq" );
const route = new Route( { exchange: "Ferarri", key: "Drivers.Past" } );
const route2 = new Route( { exchange: "Ferarri", key: "Drivers.Present" } );
route.connect().then( () => {
route.write( "JodyScheckter" );
}).catch((e)=>{
console.error("An error occurred", e);
});
route2.connect().then( () => {
route2.write( "SebVettel" );
}).catch((e)=>{
console.error("An error occurred", e);
});
Listener Example
const { RouteWorker } = require( "concorde2k/bus.mq" );
const worker = new RouteWorker( { exchange: "Ferarri", key: "Drivers.*" } );
worker.connect().then( () => {
worker.on(RouteWorker.onWorkItem, (message, key)=>{
if (key === "Drivers.Past"){
console.log("Past driver:", message);
} else if (key === "Drivers.Present") {
console.log("Present driver:", message);
}else{
console.warn("Unknown key");
}
});
}).catch((e)=>{
console.error("An error occurred", e);
});
Direct Messages
A Direct exchange allows senders and receivers to selectively receive messages based on a key. This
is very similar to a Route
/RouteWorker
but differs in that the key does not accept wildcards:
listeners to the exchange must match the key exactly. This pattern is described in the
Rabbit docs. The sender for a Direct
message is [[Direct]] and the listener is [[DirectWorker]].
Sender Example
const { Direct } = require( "concorde2k/bus.mq" );
const direct = new Direct( { exchange: "March", key: "Drivers" } );
direct.connect().then( () => {
direct.write( "JackieStewart" );
}).catch((e)=>{
console.error("An error occurred", e);
});
Listener Example
const { DirectWorker } = require( "concorde2k/bus.mq" );
const worker = new TaskWorker( { exchange: "March", key: "Drivers" } );
worker.connect().then( () => {
worker.on(TaskWorker.onWorkItem, (message)=>{
console.log(message);
});
}).catch((e)=>{
console.error("An error occurred", e);
});
RPC Messages
An RPC feels like a synchronous call, and it is kind of is, but it is non blocking, You make a request to the exchange and a response it delivered by Promise. Unlike the other message types, it is not driven by Rabbit, but by Kue, a job queue whereas Rabbit is a message queue. It is the only messages that returns a response. The sender is [[RPC]] and the listener is [[RPCWorker]].
Sender Example
const rpc = new RPC( {
exchange: "UnitTestExchange5"
} );
rpc.connect().then( async () => {
const res = await rpc.publish( "I am a squirrel" );
console.info("answer:", res);
});
Listener Example
const worker = new RPCWorker( {
exchange: "UnitTestExchange5"
} );
worker.connect().then( () => {
worker.listen( ( job, cts, done ) => {
done( null, `${job.data} I am a prarie dog` );
} );
} );
[[include:./BUILD.md]]