@ido_kawaz/amqp-client
v7.3.2
Published
AMQP client library for Kawaz Plus services
Downloads
193
Readme
@ido_kawaz/amqp-client
TypeScript AMQP client for RabbitMQ publishers and consumers.
Installation
npm install @ido_kawaz/amqp-clientQuick Start
import {
AmqpClient,
Consumer,
createConsumerBinding,
type AmqpConfig,
createAmqpConfig,
AmqpRetriableError,
} from '@ido_kawaz/amqp-client';
type OrderCreatedPayload = {
orderId: string;
total: number;
};
const binding = createConsumerBinding(
'orders.created.queue',
'orders.exchange',
'orders.created',
);
const isOrderCreatedPayload = (payload: object): payload is OrderCreatedPayload => {
const candidate = payload as Partial<OrderCreatedPayload>;
return typeof candidate.orderId === 'string' && typeof candidate.total === 'number';
};
const config: AmqpConfig = {
amqpConnectionString: 'amqp://guest:guest@localhost:5672',
};
async function bootstrap() {
const consumer = new Consumer<OrderCreatedPayload, typeof binding>('orders-created-consumer', binding);
consumer
.on('validateMessage', isOrderCreatedPayload)
.on('handleMessage', async (payload) => {
if (payload.total <= 0) {
throw new AmqpRetriableError(payload, 'Amount not ready yet', undefined, 5);
}
console.log('received order:', payload.orderId);
});
const client = new AmqpClient(config, [consumer]);
await client.start('my-service');
client.publish('orders.exchange', 'orders.created', {
orderId: '123',
total: 45.5,
});
process.on('SIGTERM', async () => {
await client.stop();
});
}
bootstrap().catch(console.error);Configuration
AmqpConfig
amqpConnectionString: Full RabbitMQ connection URL (for exampleamqp://guest:guest@localhost:5672)
createAmqpConfig(): AmqpConfig
- Validates
AMQP_CONNECTION_STRINGfromprocess.envusing Zod - Requires URI scheme
amqporamqps - Throws Zod validation error for invalid or missing env value
API
AmqpClient
new AmqpClient(config: AmqpConfig, consumers?: Consumer[])start(serviceName: string): Promise<void>- Connects to RabbitMQ and starts all consumer registrations.
registerConsumers(consumers: Consumer[]): void- Adds consumers after construction.
publish<T>(exchange: string, topic: string, message: T): void- Serializes payload to JSON and publishes it.
- Throws
AmqpUninitializedErrorifstart()has not been called. - Throws
AmqpPublisherErrorif publish returnsfalse.
stop(): Promise<void>- Closes channel and connection (if initialized).
Consumer
new Consumer<Payload, Binding>(consumerName: string, binding: Binding)- Generic
Payloadtype must be provided;Bindingis inferred from the binding parameter. - Wire handlers with
.on(event, handler)(chainable):validateMessage: type guard(payload: object) => payload is PayloadhandleMessage:(payload: Payload) => Promise<Output>handleSuccess:(payload: Payload, response: Output) => Promise<void>handleRetriableError:(error: AmqpRetriableError, payload: Payload) => Promise<void>handleFatalError:(error: AmqpFatalError, payload: any) => Promise<void>
acks on success.nacks invalid payloads without requeue.nacks with requeue forAmqpRetriableErrorwhilex-delivery-count < retryLimit.nacks without requeue for all other errors.
- Generic
createConsumerBinding
createConsumerBinding(queue, exchange, topic): ConsumerBinding- Helper for creating typed queue/exchange/topic bindings.
Errors
Exported error classes:
AmqpError(base)AmqpConnectionErrorAmqpPublisherErrorAmqpConsumerErrorAmqpRetriableError— hasretryLimit: number; triggers requeue up to that limitAmqpFatalError— triggers nack without requeue
Development
npm run build— clean and compile TypeScriptnpm run build:watch— compile TypeScript in watch modenpm run clean— remove build outputnpm test— run unit tests
Publishing
npm run package- Cleans workspace deeply
- Reinstalls dependencies
- Builds library
- Publishes with public access
