azure-functions-rabbitmq-extension
v1.1.1
Published
A RabbitMQ extension for Azure Functions that allows you to easily consume messages from RabbitMQ queues
Maintainers
Readme
Azure Functions RabbitMQ Extension
A RabbitMQ extension for Azure Functions that allows you to easily consume messages from RabbitMQ queues.
Installation
npm install azure-functions-rabbitmq-extensionUsage
Basic Consumer
import "azure-functions-rabbitmq-extension";
import { app, InvocationContext } from "@azure/functions";
app.rabbitmq("BasicConsumer", {
queueName: process.env.RabbitMQQueueName ?? "myQueue",
connectionStringSetting: "RabbitMQConnectionString",
handler: async (message: any, context: InvocationContext) => {
context.log(`Received message: ${JSON.stringify(message)}`);
// Your processing logic here
await processMessage(message);
context.log("Message processed successfully");
}
});
async function processMessage(message: any): Promise<void> {
// Simulate some work
await new Promise(resolve => setTimeout(resolve, 100));
console.log("Processing:", message);
}Advanced Consumer with Error Handling
import "azure-functions-rabbitmq-extension";
import { app, InvocationContext } from "@azure/functions";
import { ErrorAction } from "azure-functions-rabbitmq-extension/dist/types";
app.rabbitmq("AdvancedConsumer", {
queueName: "orders",
connectionStringSetting: "RABBITMQ_CONNECTION_STRING",
durable: true,
prefetch: 2,
maxRetries: 5,
deadLetterQueue: "orders.dlq",
handler: async (order: Order, context: InvocationContext, metadata) => {
context.log(`Processing order: ${order.id}`);
// Your order processing logic
await validateOrder(order);
await processPayment(order);
await updateInventory(order);
},
onError: async (error, message, context, metadata) => {
context.error(`Error processing message: ${error.message}`);
// Decide what to do based on the error
if (error.message.includes('validation')) {
// Invalid message, send to DLQ
return ErrorAction.NACK_DLQ;
} else {
// Transient error, retry
return ErrorAction.NACK_REQUEUE;
}
}
});Configuration Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| queueName | string | - | Name of the queue to consume from |
| connectionStringSetting | string | - | Name of environment variable containing connection string |
| handler | MessageHandler | - | Function to handle incoming messages |
| durable | boolean | true | Queue durability (survives broker restart) |
| prefetch | number | 1 | Number of messages to prefetch |
| autoAck | boolean | false | Auto-acknowledge messages |
| pollingInterval | number | 1000 | Polling interval for health checks |
| maxRetries | number | 3 | Max retry attempts before sending to DLQ |
| retryDelay | number | 5000 | Delay between retries |
| deadLetterQueue | string | {queueName}.dlq | Dead letter queue name |
| deadLetterExchange | string | '' | Dead letter exchange name |
| messageTTL | number | 0 | Message TTL in milliseconds |
| maxPriority | number | 0 | Maximum message priority |
| logging | boolean | true | Enable logging |
| onError | ErrorHandler | - | Custom error handler |
| onConnected | () => void | - | Connection established callback |
| onDisconnected | () => void | - | Connection lost callback |
Environment Variables
Make sure to set the RabbitMQ connection string in your environment:
RABBITMQ_CONNECTION_STRING=amqp://username:password@host:portExamples
Check out the examples directory for more detailed usage examples:
Contributing
See CONTRIBUTING.md for details on how to contribute to this project.
License
This project is licensed under the MIT License - see the LICENSE file for details.
