mubsub-es
v2.1.0
Published
Pub/sub for Node.js and MongoDB, updated for mongodb 4.x, 5.x, 6.x, 7.x, 8.x and fix some deprecated usages by emman sun
Maintainers
Readme
mubsub
Mubsub is a pub/sub implementation for Node.js and MongoDB. It prefers Mongo's capped collections and tailable cursors when available, and can fallback to polling on normal collections for Mongo-compatible backends that do not support capped/tailable behavior (for example, AWS DocumentDB). It supports mongodb driver 6.x and mongodb 7.x now.
Example
var mubsub = require('mubsub-es');
var client = mubsub('mongodb://localhost:27017/mubsub_example');
var channel = client.channel('test');
client.on('error', console.error);
channel.on('error', console.error);
channel.subscribe('bar', function (message) {
console.log(message.foo); // => 'bar'
});
channel.subscribe('baz', function (message) {
console.log(message); // => 'baz'
});
channel.publish('bar', { foo: 'bar' });
channel.publish('baz', 'baz');
Usage
Create a client
You can pass a Db instance or a URI string. For more information about the URI format visit http://mongodb.github.io/node-mongodb-native/driver-articles/mongoclient.html
var mubsub = require('mubsub-es');
// Using a URI
var client = mubsub('mongodb://localhost:27017/mubsub_example', [options]);
// Passing a MongoDB driver `Db` instance directly.
var client = mubsub(new Db(...));Channels
A channel maps one-to-one with a collection. In auto mode (default), Mubsub first tries a capped collection + tailable cursor. If capped/tailable is unavailable, Mubsub falls back to polling a normal collection.
WARNING: You should not create lots of channels because Mubsub will poll from the cursor position.
var channel = client.channel('foo', { size: 100000, max: 500 });Options:
sizemax size of the collection in bytes, default is 5mbmaxmax amount of documents in the collectionmodetransport mode:auto|capped|polling, default isautoretryIntervaltime in ms to wait if no docs are found in capped mode, default is 200ms. This options will be used to set maxAwaitTimeMS now. Reference Tailable Cursor Option tailableRetryInterval IgnoredpollIntervaltime in ms between polling cycles in polling mode, default is 1000mspollTtlSecondsoptional retention TTL (seconds) for polling collections. If set to a positive value, old documents are automatically expired using a TTL indexrecreaterecreate the tailable cursor when an error occurs, default is true
Mode notes:
auto: try capped+tailable first, fallback to polling if unsupportedcapped: force capped+tailable and emit error if unavailablepolling: force polling transport on a normal collection
WARNING: Don't remove collections with running publishers. It's possible for mongod to recreate the collection on the next insert (before Mubsub has the chance to do so). If this happens the collection will be recreated as a normal, uncapped collection.
Subscribe
var subscription = channel.subscribe([event], callback);Subscriptions register a callback to be called whenever a document matching the specified event is inserted (published) into the collection (channel). You can omit the event to match all inserted documents. To later unsubscribe a particular callback, call unsubscribe on the returned subscription object:
subscription.unsubscribe();Publish
channel.publish(event, obj, [callback]);Publishing a document simply inserts the document into the channel's capped collection. A callback is optional.
In polling mode, documents are inserted into a normal collection. If pollTtlSeconds is configured, Mubsub creates a TTL index and old documents are cleaned up automatically.
WARNING: If you publish events concurrently, when mubsub re-listen the collection, the subscriber will receive some outdated events due to latest can't get the record with the max _id.
const cursor = collection
.find(latest ? { _id: latest._id } : {}, { timeout: false })
.hint({ $natural: -1 })
.limit(1)
const cursor = collection.find(
{ _id: { $gt: latest._id } },
{
tailable: true,
awaitData: true,
timeout: false,
maxAwaitTimeMS: self.options.retryInterval
}).hint({ $natural: 1 }) Listen to events
The following events will be emitted:
// The given event was published
channel.on('myevent', console.log);
// Any event was published
channel.on('message', console.log);
// Document was inserted
channel.on('document', console.log);
// Mubsub is ready to receive new documents
channel.on('ready', console.log);
// Connection error
client.on('error', console.log);
// Channel error
channel.on('error', console.log);Close
client.close();Closes the MongoDB connection.
Install
npm install mubsub-esTests
make testYou can optionally specify the MongoDB URI to be used for tests:
MONGODB_URI=mongodb://localhost:27017/mubsub_tests make testProjects using mubsub
- simpleio Simple long polling based communication.
