amqpea
v0.4.1
Published
Easy Peasy AMQP
Downloads
7
Readme
AMQPea
AMQP made easy

"Happy Pea" Copyright FancyFerret on DeviantArt
Goals
- Would rather explode than miss an error
- API supports common use-cases cleanly
- API still allows more complex use-cases
- API clearly separates protocol from higher-level actions
- AMQP 0.9.1
- Work nicely with RabbitMQ clusters
- Be fast enough
- Easy to debug
- Well tested...
Quick Start
Install
npm install --save amqpeaAll-in-one example:
var amqpea = require('amqpea');
function die(err) {
throw err;
}
var uri = 'amqp://guest:guest@localhost:5672/%2F';
var amqp = amqpea(uri, { timeout: 2000 });
amqp.on('error', die);
amqp.on('ready', function() {
amqp.declareExchange({
name: 'x'
}, whenExchangeReady);
});
function whenExchangeReady(err) {
if (err) return die(err);
amqp.declareQueue({
name: 'q',
exclusive: true,
binding: {
exchange: 'x',
keys: ['route']
}
}, whenQueueReady);
}
function whenQueueReady(err) {
if (err) return die(err);
beginPublishing();
var consumer = amqp.createQueueConsumerChannel('q', 1);
consumer.consume('ack', 'exclusive', function(msg) {
var body = msg.fromJSON();
console.log("Received: %j", body);
msg.ack();
});
}
function beginPublishing() {
var i = 0;
var publisher = amqp.createPublishChannel('confirm');
setInterval(function() {
publisher.publish('x', 'route', { num: ++i }, function(err) {
if (err) return die(err);
console.log("Published message %d", i);
});
}, 1000);
}Examples
More examples can be found in the examples folder.
API Docs
Most of these options correspond directly to an AMQP protocol concept, for more information see the AMQP 0.9.1 reference.
amqpea(urisOrUri, options) => AMQPConnection
Establish a new AMQPConnection instance.
urisOrUri {string or array(string)}Pass one or more AMQP URIs, the first one that works will be connected to. An AMQP uri looks likeamqp://login:password@hostname:port/vhost. Note that if the vhost begins with a/, this needs to be URL encoded, so/defaultbecomes a URL path of/%2Fdefault.options {object}Various options to control the client's behaviourtimeout {number}Number of milliseconds to wait before considering the connection timed outdebug {boolean}Set to true to log a bunch of debugging messages to STDERR. Can also be enabled by setting the environment variableNODE_DEBUG_AMQPto a non-empty value.heartbeat {boolean or number}Control the AMQP protocol heartbeat: false for no heartbeats, true to do what the server says, or a number of seconds to override.client {object}Send some strings to the server to help identify the client. Allowed keys areproduct,version,platform,copyrightandinformation. Product, version and platform default to something useful.
AMQPConnection
Instances represent a connected AMQP client, use the main amqpea export to create an instance.
Event: error(err)
Fired when the server has an error.
err {Error}The exception that occurred
The connection object will not be usable after an error has been emitted. By default node.js will exit your program if you don't listen for this event.
Event: close(hadError)
Fired when the server connection has been closed.
hadError {boolean}True when server is closing due to error
Event: ready()
Fired when the server connection is ready to use.
Event: connection-error(uri, err)
Fired for every failed server connection.
uri {string}The URI that failed to connecterr {Error}The exception that occurred
When attempting to connect to multiple servers, this is the only way to see why servers are failing. If none of the servers can be connected to, the error event will be fired with the same err as the last connection-error.
amqp.declareExchange(options, callback(err))
Declare an exchange on the server.
options {object}Various optionsname {string}name of the exchangetype {string}type of the exchange, default:topicpassive {boolean}only re-use existing exchange, default:falsedurable {boolean}persist exchange across broker restarts, default:falseautoDelete {boolean}delete exchange when queues are finished using it, default:falseinternal {boolean}disallow publishing directly to the exchange, default:false
callback(err) {function}Called when exchange declaration is confirmederr {Error}non-null when an error occurred
To publish to an exchange, use createPublishChannel.
amqp.declareQueue(options, callback(err))
Declare a queue on the server.
options {object}Various optionsname {string}name of the queue, leave blank to let the server generate a unique namepassive {boolean}only re-use existing queue, default:falsedurable {boolean}persist queue across broker restarts, default:falseexclusive {boolean}only allow use by this connection, default:falseautoDelete {boolean}delete queue when all consumers are finished, default:falsebinding {object}optional, configure the queue's bindingsexchange {string}name of the exchange to bind tokeys {array(string)}which routing keys to bind
callback(err, queue) {function}Called when queue declaration is confirmederr {Error}non-null when an error occurredqueue {object}Containsnameas a{string}.
amqp.createPublishChannel(confirm) => AMQPPublishChannel
TODO: write this
amqp.createQueueConsumerChannel(name, prefetch) => AMQPQueueConsumerChannel
TODO: write this
amqp.close()
TODO: write this
AMQPPublishChannel
TODO: write this
channel.publish(exchange, key, body, callback)
TODO: write this
channel.close()
TODO: write this
AMQPQueueConsumerChannel
Represents a channel to be used for consuming messages.
channel.tag
The consumer's tag.
Event: 'error'
Can be thrown if an ack or reject fails.
TODO: move these errors into the actions' callbacks.
channel.consume(ack, exclusive, handler(msg))
Begin consuming the queue.
ack {boolean}Enable message acknowledgementexclusive {boolean}Request that only this consumer can use the queuehandler(msg) {function}Called on every message with anAMQPMessageobject
AMQPMessage
msg.delivery
{object} Delivery information, likely to change in future versions.
msg.properties
{object} Message properties, likely to change in future versions.
msg.content
{Buffer} The raw message content.
msg.fromJSON()
Decode a JSON message into an object, may throw an Error.
msg.ack()
Acknowledge the message with the server.
msg.ack()
Reject the message with the server.
Running the Tests
To run the tests you will need a local AMQP server. The testsuite talks to the broker as well as via the HTTP admin API.
There are environment variables to set which tell the test runner how to connect.
AMQP_USERNAMEdefaults to "guest"AMQP_PASSWORDdefaults to "guest"AMQP_HOSTNAMEdefaults to "localhost"AMQP_PORTdefaults to 5672AMQP_VHOSTdefaults to "/"AMQP_ADMIN_PORTdefaults to 15672AMQP_ADMIN_PROTOdefaults to "http"AMQP_ADMIN_SSL_INSECUREdefaults to false
With the appropriate variables set, use npm to run the testsuite.
npm test