@teemill/node-red-amqp
v1.0.3
Published
Node-RED nodes for AMQP (RabbitMQ) with full exchange, queue, binding and acknowledgement support
Readme
@teemill/node-red-amqp
Node-RED nodes for AMQP 0-9-1 (RabbitMQ) messaging with full support for exchanges, queues, bindings, and manual acknowledgements.
Nodes
amqp broker
Shared configuration node that manages the connection to an AMQP broker. All other nodes reference a broker config.
| Property | Type | Default | Description |
|---|---|---|---|
| Host | string | localhost | Broker hostname or IP |
| Port | number | 5672 | Broker port (5671 for TLS) |
| Virtual Host | string | / | AMQP virtual host |
| Username | string | | Broker credentials |
| Password | string | | Broker credentials |
| Heartbeat | number | 30 | Heartbeat interval in seconds |
| Prefetch | number | 0 | Channel-level prefetch count (0 = unlimited) |
| Client ID | string | | Optional client identifier sent to the broker |
| Use TLS | boolean | false | Enable TLS/SSL for the connection |
The broker node handles automatic reconnection with a 5-second backoff when the connection is lost.
amqp in
Consumes messages from a queue. Supports two modes:
- Queue — consume directly from a named queue.
- Exchange → Queue — assert an exchange, bind a queue with a routing key, then consume.
Configuration
| Property | Type | Default | Description |
|---|---|---|---|
| Mode | queue | exchange | queue | Consumption mode |
| Queue | string | | Queue name |
| Durable | boolean | true | Queue survives broker restart |
| Exclusive | boolean | true | Queue is used by only this connection |
| Auto-delete | boolean | true | Queue is deleted when the last consumer disconnects |
| Auto-acknowledge | boolean | true | Automatically acknowledge messages on receipt |
| Message TTL | number | 0 | Per-queue message TTL in milliseconds (0 = disabled) |
| Dead Letter Exchange | string | | Exchange to route rejected/expired messages to |
| Dead Letter Routing Key | string | | Routing key for dead-lettered messages |
| Exchange | string | | Exchange name (exchange mode only) |
| Exchange Type | topic | direct | fanout | headers | topic | Exchange type (exchange mode only) |
| Exchange Durable | boolean | true | Exchange survives broker restart |
| Exchange Auto-delete | boolean | false | Exchange is deleted when the last binding is removed |
| Binding Key | string | # | Routing pattern for exchange → queue binding |
Output msg Properties
| Property | Type | Description |
|---|---|---|
| payload | object | string | Buffer | Message body, auto-parsed from JSON when possible |
| routingKey | string | The routing key the message was published with |
| exchange | string | The exchange the message came from |
| headers | object | AMQP message headers |
| amqpMessage | object | Raw amqplib message (for use with the amqp ack node) |
When Auto-acknowledge is disabled, pass
msg.amqpMessageto an amqp ack node to acknowledge, nack, or reject messages manually.
amqp out
Publishes messages to a queue or exchange. Supports two modes:
- Queue — send directly to a named queue via the default exchange.
- Exchange — publish to a named exchange with an optional routing key.
Configuration
| Property | Type | Default | Description |
|---|---|---|---|
| Mode | queue | exchange | queue | Publishing mode |
| Queue | string | | Target queue name (queue mode) |
| Durable | boolean | true | Queue survives broker restart |
| Exclusive | boolean | true | Queue is used by only this connection |
| Auto-delete | boolean | true | Queue is deleted when the last consumer disconnects |
| Message TTL | number | 0 | Per-queue message TTL in milliseconds (0 = disabled) |
| Dead Letter Exchange | string | | Exchange to route rejected/expired messages to |
| Dead Letter Routing Key | string | | Routing key for dead-lettered messages |
| Exchange | string | | Exchange name (exchange mode) |
| Exchange Type | topic | direct | fanout | headers | topic | Exchange type |
| Exchange Durable | boolean | true | Exchange survives broker restart |
| Exchange Auto-delete | boolean | false | Exchange is deleted when the last binding is removed |
| Routing Key | string | | Routing key for published messages |
Input msg Properties
| Property | Type | Description |
|---|---|---|
| payload | object | string | Buffer | Message body to send |
| routingKey | string | Override the configured routing key (exchange mode) |
| queue | string | Override the configured queue name (queue mode) |
| headers | object | AMQP message headers |
| correlationId | string | Correlation ID for RPC patterns |
| replyTo | string | Reply-to queue for RPC patterns |
| persistent | boolean | Mark message as persistent (default: true) |
| expiration | string | number | Per-message TTL in milliseconds |
The contentType header is set automatically based on the payload type:
| Payload Type | Content Type |
|---|---|
| Buffer | application/octet-stream |
| string | text/plain |
| object | application/json |
amqp ack
Acknowledges, negative-acknowledges, or rejects an AMQP message. Used with amqp in when Auto-acknowledge is disabled.
Configuration
| Property | Type | Default | Description |
|---|---|---|---|
| Action | ack | nack | reject | ack | Default acknowledgement action |
| All Up To | boolean | false | Ack/nack all outstanding messages up to and including this one |
| Requeue | boolean | false | Requeue the message instead of discarding or dead-lettering |
Input msg Properties
| Property | Type | Description |
|---|---|---|
| amqpMessage | object | Required. The raw AMQP message from the amqp in node |
| amqpAction | string | Override action: ack, nack, or reject |
| amqpAllUpTo | boolean | Override the "all up to" setting |
| amqpRequeue | boolean | Override the requeue setting |
Development
Prerequisites
Setup
cd node-red-amqp
npm installBuild
The project is written in TypeScript. Source files live in src/ and compile to dist/.
npm run buildThis copies the .html node definitions into dist/ and compiles TypeScript to JavaScript.
Test
Tests use vitest and cover all nodes and shared utilities.
npm test # single run
npm run test:watch # watch modeProject Structure
node-red-amqp/
├── src/
│ ├── amqp-broker.ts # Broker connection config node
│ ├── amqp-broker.html
│ ├── amqp-in.ts # Consumer node
│ ├── amqp-in.html
│ ├── amqp-out.ts # Publisher node
│ ├── amqp-out.html
│ ├── amqp-acknowledge.ts # Ack/nack/reject node
│ ├── amqp-acknowledge.html
│ └── amqp-utils.ts # Shared utility functions
├── test/
│ ├── helpers.ts # Mock factories for tests
│ ├── amqp-broker.test.ts
│ ├── amqp-in.test.ts
│ ├── amqp-out.test.ts
│ ├── amqp-acknowledge.test.ts
│ └── amqp-utils.test.ts
├── dist/ # Compiled output (git-ignored)
├── tsconfig.json
├── vitest.config.ts
└── package.json