@katdev/communication
v1.0.0
Published
Transport-agnostic communication layer for NestJS microservices
Maintainers
Readme
@audit-hub/communication
Transport-agnostic communication layer for NestJS microservices. Write once, run on any transport (HTTP, gRPC, TCP, RabbitMQ, NATS).
Features
✅ Transport Agnostic - Switch between HTTP, gRPC, TCP, RMQ, NATS without code changes
✅ Unified API - Single interface for all communication patterns
✅ Multiple Patterns - Request/Response, Events, Message Queues
✅ Automatic Failover - Fallback to alternative transports
✅ Type Safe - Full TypeScript support
✅ Easy Integration - Works seamlessly with NestJS
Installation
npm install @audit-hub/communicationQuick Start
1. Setup in Your Service
import { Module } from '@nestjs/common';
import { CommunicationModule } from '@audit-hub/communication';
@Module({
imports: [
CommunicationModule.forRoot({
services: [
{
transport: 'http',
serviceName: 'user',
serviceUrl: 'http://localhost:3003',
},
{
transport: 'grpc',
serviceName: 'notification',
serviceUrl: 'localhost:5002',
metadata: {
protoPath: './notification.proto',
},
},
],
}),
],
})
export class AppModule {}2. Use in Your Controllers/Services
import { Injectable } from '@nestjs/common';
import { CommunicationClient } from '@audit-hub/communication';
@Injectable()
export class OrderService {
constructor(private readonly commClient: CommunicationClient) {}
async createOrder(orderData: any) {
// Call user service via HTTP
const user = await this.commClient.send(
'user',
'user.getById',
{ id: orderData.userId },
);
// Send notification via gRPC
await this.commClient.emit(
'notification',
'notification.orderCreated',
{ orderId: orderData.id, userId: user.id },
);
return { ...orderData, user };
}
}Communication Patterns
1. Request-Response (Synchronous)
// Send a request and wait for response
const result = await commClient.send(
'user', // Service name
'user.getById', // Pattern/route
{ id: '123' }, // Data
{ timeout: 3000 } // Options
);2. Event (Fire and Forget)
// Emit an event without waiting
await commClient.emit(
'notification',
'user.created',
{ userId: '123', email: '[email protected]' }
);3. Message Queue
// Send message to a queue
await commClient.sendMessage(
'email',
'email.sendWelcome',
{ email: '[email protected]', name: 'John' }
);Transport Configurations
HTTP
{
transport: 'http',
serviceName: 'user',
serviceUrl: 'http://localhost:3003',
timeout: 5000,
}gRPC
{
transport: 'grpc',
serviceName: 'notification',
serviceUrl: 'localhost:5002',
metadata: {
protoPath: join(__dirname, './notification.proto'),
package: 'notification',
},
}TCP
{
transport: 'tcp',
serviceName: 'analytics',
metadata: {
host: 'localhost',
port: 4001,
},
}RabbitMQ
{
transport: 'rmq',
serviceName: 'email',
serviceUrl: 'amqp://localhost:5672',
metadata: {
queue: 'email_queue',
},
}NATS
{
transport: 'nats',
serviceName: 'logging',
serviceUrl: 'nats://localhost:4222',
metadata: {
queue: 'logging_queue',
},
}Advanced Features
Failover
Automatically fallback to alternative transports:
import { CommunicationClientFactory } from '@audit-hub/communication';
const client = CommunicationClientFactory.createWithFailover(
{
transport: 'grpc',
serviceName: 'user',
serviceUrl: 'localhost:5003',
},
[
{
transport: 'http',
serviceName: 'user',
serviceUrl: 'http://localhost:3003',
},
]
);
// Will try gRPC first, fallback to HTTP on failure
const user = await client.send('user.getById', { id: '123' });Multiple Transports for Same Service
CommunicationModule.forRoot({
services: [
// HTTP for queries
{
transport: 'http',
serviceName: 'user-query',
serviceUrl: 'http://localhost:3003',
},
// RabbitMQ for commands
{
transport: 'rmq',
serviceName: 'user-command',
serviceUrl: 'amqp://localhost:5672',
metadata: { queue: 'user_commands' },
},
],
})Service Discovery Integration
import { ServiceDiscoveryClientImpl } from '@katdev/service-discovery';
@Injectable()
export class DynamicCommunicationService {
constructor(
private readonly commClient: CommunicationClient,
private readonly discovery: ServiceDiscoveryClientImpl,
) {}
async registerServiceDynamically() {
// Discover service URL
const userServiceUrl = await this.discovery.getServiceUrl('user', 'http');
// Register dynamically
this.commClient.registerService('user', {
transport: 'http',
serviceName: 'user',
serviceUrl: userServiceUrl,
});
}
}Pattern Naming Conventions
We recommend these naming patterns:
HTTP
- Resource-based:
user.getById,user.create,user.update - Maps to:
GET /user/getById,POST /user/create, etc.
gRPC
- Service.Method:
UserService.GetUser,UserService.CreateUser
Message Queues
- Domain.Action:
user.created,order.shipped,payment.processed
Examples
Example 1: E-commerce Order Flow
@Injectable()
export class OrderService {
constructor(private readonly comm: CommunicationClient) {}
async placeOrder(orderDto: CreateOrderDto) {
// 1. Get user details (HTTP)
const user = await this.comm.send(
'user',
'user.getById',
{ id: orderDto.userId }
);
// 2. Check inventory (gRPC for performance)
const inventory = await this.comm.send(
'inventory',
'inventory.check',
{ productIds: orderDto.items.map(i => i.productId) }
);
if (!inventory.available) {
throw new Error('Items not available');
}
// 3. Process payment (HTTP)
const payment = await this.comm.send(
'payment',
'payment.charge',
{ amount: orderDto.total, userId: user.id }
);
// 4. Create order
const order = await this.saveOrder({ ...orderDto, paymentId: payment.id });
// 5. Send notifications (Events - Fire and Forget)
await this.comm.emit('notification', 'order.created', {
orderId: order.id,
userId: user.id,
email: user.email,
});
// 6. Queue analytics (Message Queue)
await this.commClient.sendMessage('analytics', 'order.placed', {
orderId: order.id,
amount: orderDto.total,
timestamp: new Date(),
});
return order;
}
}Example 2: Switch Transport at Runtime
@Injectable()
export class ConfigurableService {
constructor(private readonly comm: CommunicationClient) {}
async callUser(method: string, data: any) {
const transport = process.env.USER_SERVICE_TRANSPORT || 'http';
// Register service with configured transport
this.comm.registerService('user', {
transport: transport as any,
serviceName: 'user',
serviceUrl: this.getServiceUrl(transport),
});
return this.comm.send('user', method, data);
}
private getServiceUrl(transport: string): string {
switch (transport) {
case 'http': return 'http://localhost:3003';
case 'grpc': return 'localhost:5003';
case 'rmq': return 'amqp://localhost:5672';
default: return 'http://localhost:3003';
}
}
}Best Practices
- Use HTTP for External APIs - Easy debugging, wide support
- Use gRPC for Internal High-Performance - Binary protocol, type-safe
- Use Message Queues for Async Operations - Decoupled, resilient
- Use Events for Notifications - Fire-and-forget, non-blocking
- Configure Timeouts - Prevent hanging requests
- Handle Errors Gracefully - Always catch and handle failures
- Use Service Discovery - Dynamic service URLs
- Monitor Communication - Track latency, errors, patterns
Troubleshooting
Error: No client registered for service
// Make sure to register the service first
commClient.registerService('user', {
transport: 'http',
serviceName: 'user',
serviceUrl: 'http://localhost:3003',
});Error: Request timeout
// Increase timeout for slow operations
await commClient.send('user', 'user.complexQuery', data, {
timeout: 10000, // 10 seconds
});Error: gRPC proto file not found
// Use absolute path for proto files
import { join } from 'path';
{
transport: 'grpc',
metadata: {
protoPath: join(__dirname, './proto/user.proto'),
},
}API Reference
CommunicationClient
registerService(serviceName, options)
Register a service with specific transport configuration.
send<TResult, TInput>(serviceName, pattern, data, options?)
Send request and wait for response.
emit<TInput>(serviceName, pattern, data)
Emit event without waiting for response.
sendMessage<TInput>(serviceName, pattern, data)
Send message to queue.
CommunicationClientFactory
create(options)
Create a communication client for specific transport.
createWithFailover(primary, fallbacks)
Create a client with automatic failover.
License
MIT
