npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

seneca-amqp-transport

v2.2.0

Published

Official AMQP transport plugin for Seneca

Downloads

1,649

Readme

Seneca

Official Seneca AMQP transport plugin

seneca-amqp-transport

Greenkeeper badge Build Status codecov.io Known Vulnerabilities MIT License

This plugin allows seneca listeners and clients to communicate over AMQP.

Important: Starting from 2.2.0 this plugin will require the usage of the --harmony flag in order to run in node versions older than LTS (currently 6.x.x).

Important: If you are upgrading to 2.1.0 (or later) from an older version, please read and follow instructions on this wiki guide to avoid some potential issues.

Install

npm install --save seneca-amqp-transport

This transport supports AMQP 0-9-1, which is what amqplib currently supports. For an AMQP 1.0 compliant transport, take a look at seneca-servicebus-transport

Usage

The following snippets showcase the most basic usage examples.

Listener

require('seneca')()
  .use('seneca-amqp-transport')
  .add('cmd:log,level:*', function(req, done) {
    console[req.level](req.message);
    return done(null, { ok: true, when: Date.now() });
  })
  .listen({
    type: 'amqp',
    pin: 'cmd:log,level:*',
    url: process.env.AMQP_URL
  });

How it works

A listener always creates one and only one queue. The queue name can be provided via the name parameter, but it will be auto-generated from the pin (or pins) if not.

Be careful with name clashing when specifying a name for a listener. Having more than one queue with the same name declared on the AMQP broker will probably behave unexpectedly. It is recommended that you leave the name generation to the plugin in order to avoid problems, unless you know what you are doing.

In the example above, the following things are declared:

  • A topic exchange named seneca.topic.
  • A queue named seneca.cmd:log.level:any.
  • A binding between the queue and the exchange using the routing key cmd.log.level.* (named after the pin).

Queue names are prefixed with a configurable word (seneca., by default). It can be disabled or modified during plugin declaration (read below).

If your intention is to create multiple queues, just declare multiple listeners. Each queue will be bound to an exchange (seneca.topic, by default) using routing keys derived from the pin (or pins).

If your intention is to declare multiple consumers on a single queue, run multiple listeners with the same set of pins. Or just spawn many instances of a single microservice.

Client

var client = require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    pin: 'cmd:log,level:log',
    url: process.env.AMQP_URL
  });

setInterval(function() {
  client.act('cmd:log,level:log', {
    message: 'Hello World'
  }, (err, res) => {
    if (err) {
      // Handle error in some way
      throw err;
    }
    // Print out the response
    console.log(res);
  });
}, 2000);

How it works

A client creates an exclusive, randomly named response queue (something similar to seneca.act.x42jK0l) and starts consuming from it - much like a listener would do. On every act, the client publishes the message to the seneca.topic exchange using a routing key built from the pin that matches the act pattern. In the simple example above, the pattern is cmd:log,level:log which equals the only declared pin. With that, the routing key cmd.log.level.log is inferred. An AMQP replyTo header is set to the name of the random queue, in an RPC-schema fashion.

Manual queue naming on a client (using the name parameter as seen in the listener configuration) is not supported. Client queues are deleted once the client disconnects and re-created each time.

As you can see, pins play an important role on routing messages on the broker, so in order for a listener to receive messages from a client, their pins must match.

In the example, the following things are declared:

  • A topic exchange named seneca.topic.
  • An exclusive queue with a random alphanumeric name (like seneca.act.x42jK0l).

Clients do not declare the queue of their listener counterpart. So, if the message does not reach its destination or is discarded by the broker, the seneca instance will fail with a TIMEOUT error on the client side.

Options

The JSON object in defaults.json describes the available options for this transport. These are applicable to both clients and listeners.

To override this settings, pass them to the plugin's .use declaration:

require('seneca')()
  .use('seneca-amqp-transport', {
    amqp: {
      client: {
        queues: {
          options: {
            durable: false
          }
        }
      }
    }
  });

Transport options

AMQP related options may be indicated either by the connection URI or by passing additional parameters to the seneca#client() or seneca#listen() functions.

This,

require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    url: 'amqp://guest:[email protected]:5672/seneca?locale=es_AR'
  });

will result in the same connection URI as:

require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    hostname: 'rabbitmq.host',
    port: 5672,
    vhost: 'seneca',
    locale: 'es_AR',
    username: 'guest',
    password: 'guest'
  });

You may also pass in additional options for the channel#publish and channel#consume methods of amqplib under publish and consume, respectively.

require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    hostname: 'rabbitmq.host',
    publish: {
      persistent: true
    }
  });

Read the offical amqplib docs for a list of available options for publish and consume.

Socket options

Additionally, you may pass in options to the amqp.connect method of amqplib as documented in its API reference, using the socketOptions parameter.

// Example of using a TLS/SSL connection. Note that the server must be
// configured to accept SSL connections; see http://www.rabbitmq.com/ssl.html.

var fs = require('fs');

var opts = {
  cert: fs.readFileSync('../etc/client/cert.pem'),
  key: fs.readFileSync('../etc/client/key.pem'),
  // cert and key or
  // pfx: fs.readFileSync('../etc/client/keycert.p12'),
  passphrase: 'MySecretPassword',
  ca: [fs.readFileSync('../etc/testca/cacert.pem')]
};

require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    url: 'amqp://guest:[email protected]:5672/seneca?locale=es_AR',
    socketOptions: opts
  });

Snippet above is based on amqplib/examples/ssl.js

Run the examples

There are simple examples under the /examples directory. To run them, just install latest seneca (if you didn't install devDependencies) and execute:

#Install seneca
npm i seneca

# Start listener.js
cd examples
AMQP_URL='amqp://guest:guest@localhost:5672' node listener.js
{"kind":"notice","notice":"seneca started","level":"info","when":1476216405556}

# Start client.js
cd examples
AMQP_URL='amqp://guest:guest@localhost:5672' node client.js
{"kind":"notice","notice":"seneca started","level":"info","when":1476216473818}
{ id: 93,
  message: 'Hello World!',
  from: { pid: 4150, file: 'examples/listener.js' },
  now: 1476306009801 }
# ...

If you don't export the env variable AMQP_URL the default value of amqp://localhost will be used.

Contributors

Roadmap

  • [x] :muscle: ~~Mocha unit tests.~~
  • [x] :muscle: ~~Functional tests~~ (#74).
  • [x] :muscle: ~~Setup Travis CI.~~
  • [x] :muscle: ~~Support for message TTL and dead-lettering~~ (#59).
  • [ ] Better support for work queues.
  • [ ] Better support for fanout exchanges.
  • [ ] Improve logging using seneca.log.
  • [ ] Don't depend on pins for routing (#58).
  • [x] ~~Internal: remove classes in favor of factory functions~~ (https://github.com/senecajs/seneca-amqp-transport/pull/73).

Contributing

This module follows the general Senecajs.org contribution guidelines and encourages open participation. If you feel you can help in any way, or discover any issues, feel free to create an Issue or a Pull Request. For more information on contribution please see our Contributing guidelines.

License

Licensed under the MIT license.