@ido_kawaz/amqp-client
v6.0.0
Published
AMQP client library for Kawaz Plus services
Downloads
902
Readme
@ido_kawaz/amqp-client
TypeScript AMQP client for RabbitMQ publishers and consumers.
Installation
npm install @ido_kawaz/amqp-clientQuick Start
import {
AmqpClient,
Consumer,
createConsumerBinding,
type AmqpConfig,
createAmqpConfig,
AmqpRetriableError,
} from '@ido_kawaz/amqp-client';
type OrderCreatedPayload = {
orderId: string;
total: number;
};
const binding = createConsumerBinding(
'orders.created.queue',
'orders.exchange',
'orders.created',
);
const isOrderCreatedPayload = (payload: object): payload is OrderCreatedPayload => {
const candidate = payload as Partial<OrderCreatedPayload>;
return typeof candidate.orderId === 'string' && typeof candidate.total === 'number';
};
const config: AmqpConfig = {
amqpConnectionString: 'amqp://guest:guest@localhost:5672',
};
async function bootstrap() {
const consumer = new Consumer<OrderCreatedPayload>(
binding,
isOrderCreatedPayload,
async (payload) => {
if (payload.total <= 0) {
throw new AmqpRetriableError(new Error('Amount not ready yet'), 5);
}
console.log('received order:', payload.orderId);
}
);
const client = new AmqpClient(config, [consumer]);
await client.start();
client.publish('orders.exchange', 'orders.created', {
orderId: '123',
total: 45.5,
});
process.on('SIGTERM', async () => {
await client.stop();
});
}
bootstrap().catch(console.error);Configuration
AmqpConfig
amqpConnectionString: Full RabbitMQ connection URL (for exampleamqp://guest:guest@localhost:5672)
createAmqpConfig(): AmqpConfig
- Validates
AMQP_CONNECTION_STRINGfromprocess.envusing Joi - Requires URI scheme
amqporamqps - Throws Joi validation error for invalid or missing env value
API
AmqpClient
new AmqpClient(config: AmqpConfig, consumers: Consumer[])start(): Promise<void>- Connects to RabbitMQ and starts all consumer registrations.
publish<T>(exchange: string, topic: string, message: T): void- Serializes payload to JSON and publishes it.
- Throws
AmqpUninitializedErrorifstart()has not been called. - Throws
AmqpPublisherErrorif publish returnsfalse.
stop(): Promise<void>- Closes channel and connection (if initialized).
Consumer
new Consumer<Payload>(binding, validatePayload, handlePayload)- Generic
Payloadtype is required; binding type is inferred from the binding parameter. binding: ConsumerBindingcontainsqueue,exchange,topic.validatePayload(payload): payload is Payloadvalidates parsed JSON before handling.handlePayload(payload)runs only for valid payloads.acks on success.nacks invalid payloads without requeue.nacks with requeue forAmqpRetriableErrorwhilex-delivery-count < retryLimit.nacks without requeue for all other errors.
- Generic
createConsumerBinding
createConsumerBinding(queue, exchange, topic): ConsumerBinding- Helper for creating typed queue/exchange/topic bindings.
Errors
AmqpErrorAmqpConnectionErrorAmqpUninitializedErrorAmqpPublisherErrorAmqpConsumerErrorAmqpRetriableErrorAmqpFatalError
Development
npm run build— clean and compile TypeScriptnpm run build:watch— compile TypeScript in watch modenpm run clean— remove build outputnpm test— run unit tests
Publishing
npm run package- Cleans workspace deeply
- Reinstalls dependencies
- Builds library
- Publishes with public access
