@ts-wire/messaging
v0.1.0
Published
In-process message consumer for ts-wire.
Readme
@ts-wire/messaging
In-process message consumer for ts-wire.
npm install @ts-wire/messagingSetup
import { TsMessaging } from '@ts-wire/messaging';
import { OrderConsumer } from './consumers/order.consumer';
import { components } from './components';
const messaging = new TsMessaging();
messaging.start({
consumers: [OrderConsumer],
components,
});Stop all consumers cleanly:
await messaging.stop();@Consumer(topic)
Declares a consumer class for the given topic.
import { Consumer, OnMessage, ConsumerMessage } from '@ts-wire/messaging';
@Consumer('order.created')
export class OrderConsumer {
@OnMessage()
async handle(message: ConsumerMessage, { orderService }: Components) {
await orderService.process(message.payload);
message.ack();
}
}@OnMessage()
Marks the method that handles incoming messages. One per consumer class.
Handler signature:
(message: ConsumerMessage, components: Components) => void | Promise<void>ConsumerMessage
interface ConsumerMessage {
topic: string;
payload: unknown;
ack: () => void;
nack: (requeue?: boolean) => void;
}@ConsumerWith(components)
Injects additional components at class or method level. Method-level overrides class-level for the same key.
@Consumer('user.import')
@ConsumerWith({ db: components.db })
export class UserImportConsumer {
@OnMessage()
@ConsumerWith({ email: components.email })
async handle(message: ConsumerMessage, { db, email }: any) {
const user = await db.insert(message.payload);
await email.send(user.email, 'Welcome');
message.ack();
}
}Publishing messages
messaging.publish() dispatches in-process. Useful for testing and internal triggers — not a replacement for a real message broker.
await messaging.publish('order.created', { id: 42, total: 99.90 });Same service, multiple entry points
// consumers/order.consumer.ts — queue trigger
@Consumer('order.created')
export class OrderConsumer {
@OnMessage()
async handle(msg: ConsumerMessage, { orderService }: Components) {
await orderService.process(msg.payload);
msg.ack();
}
}
// controllers/order.controller.ts — HTTP trigger
@Controller('/orders')
export class OrderController {
@Post('/')
async create(req: Request, res: Response, { orderService }: Components) {
const order = await orderService.process(req.body);
res.status(201).json(order);
}
}Same orderService — different entry point. Services know nothing about channels.
