npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

simple-amqplib-rpc

v1.0.3

Published

simple amqp rpc interface

Downloads

12

Readme

Simple amqplib RPC Build Status

Simple RPC interface for amqplib

Installation

This module is installed via npm:

$ npm install simple-amqplib-rpc

Example Usage

Client side:

const amqplib = require('amqplib');
const { request } = require('simple-amqplib-rpc');
const config = {
  url: 'amqp://guest:[email protected]:5672//',
  exchange: 'exchange',
  routingKey: 'sum'
};

const connection = await amqplib.connect(config.url);
try {
  const content = [ 1, 2, 3 ];
  const opts = { exchange: config.exchange, timeout: 5000 };
  const resp = await request(connection, config.routingKey, content, opts);
  // resp = 6
} catch (err) {
  switch (err.name) {
    case 'ChannelClosedError': // the connection was closed unexpectedly.
    case 'NoRouteError': // the specified routing key goes nowhere (server needs to bindQueue).
    case 'ResponseError': // the request returned an error.
    case 'TimeoutError': // the request took more than 5 seconds.
  }
}

Server side:

const amqplib = require('amqplib');
const { checkReplyQueue, error, reply } = require('simple-amqplib-rpc');
const config = {
  url: 'amqp://guest:[email protected]:5672//',
  exchange: 'exchange',
  queue: 'sum',
  routingKey: 'sum'
};

const connection = await amqplib.connect(config.url);
const consumeChannel = await connection.createChannel();
await consumeChannel.assertQueue(config.queue);
await consumeChannel.bindQueue(config.queue, config.exchange, config.routingKey);
const publishChannel = await connection.createChannel();
consumeChannel.consume(config.queue, async message => {
  if (!await checkReplyQueue(connection, message)) { // the consumer doesn't exist anymore
    return consumeChannel.reject(message, false); // reject and don't requeue
  }
  try {
    const numbers = JSON.parse(message.content);
    const sum = numbers.reduce((acc, n) => acc + n, 0);
    reply(consumeChannel, message, sum, publishChannel);
  } catch (err) {
    // if something went wrong, return an error to the client
    error(consumeChannel, message, err, publishChannel);
  }
});

API

checkReplyQueue(connection, message) ⇒ boolean

Check if a "reply to" queue exists or not. Will create a separate channel so that it doesn't kill an existing one if the queue check fails.

Kind: global function
Returns: boolean - whether the reply queue exists or not

| Param | Type | Description | | --- | --- | --- | | connection | amqplibConnection | amqplib connection | | message | object | incomming message |

error(channel, message, error, publisherChannel)

Reply to an rpc request with an error. Will automatically nack and not requeue the message after the error response has been sent.

Kind: global function

| Param | Type | Description | | --- | --- | --- | | channel | AmqplibChannel | the amqplib channel on which the message was received | | message | object | incomming message | | error | Error | an error object. { message, code } will be returned to the client. | | publisherChannel | AmqplibChannel | optional separate channel to publish response on |

reply(channel, message, content, publisherChannel)

Reply to an rpc request. Will automatically ack the message after the response has been sent.

Kind: global function

| Param | Type | Description | | --- | --- | --- | | channel | AmqplibChannel | on which the message was received | | message | object | incomming message | | content | * | response message | | publisherChannel | AmqplibChannel | optional separate channel to publish response on |

request(connection, key, content, opts) ⇒ *

Make an rpc request. Each request will have its own channel.

Kind: global function Returns: * - json decoded response Throws:

  • ChannelClosedError when the channel is closed
  • NoRouteError if a published message has nowhere to go
  • ResponseError if the request returned an error
  • TimeoutError after the specified timeout period

| Param | Type | Description | | --- | --- | --- | | connection | amqplibConnection | amqplib connection | | key | string | the routing key for the rpc service | | content | * | must be json serialisable | | opts | object | | | opts.exchange | string | the amqp exchange to publish to (defaults to '') | | opts.timeout | number | optional max time to wait for a response |

Note: To regenerate this section from the jsdoc run npm run docs and paste the output above.

License

The BSD License

Copyright (c) 2019, Andrew Harris

All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

  • Neither the name of the Andrew Harris nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.