@oldzy/conduit
v1.0.8
Published
Transport-agnostic request/response pipeline framework with mediator pattern, dependency injection, and middleware support
Maintainers
Readme
Conduit
Transport-agnostic request/response pipeline framework with mediator pattern, dependency injection, and middleware support.
Features
- 🎯 Mediator Pattern - Clean request/response handling with type-safe handlers
- 💉 Dependency Injection - Built-in DI container with three lifetime scopes (Singleton, Scoped, Transient)
- 🔧 Pipeline Behaviors - Middleware support for cross-cutting concerns (logging, validation, error handling)
- 🚀 Transport-Agnostic - Adapter pattern for any transport (HTTP, IPC, WebSocket, etc.)
- 📡 Streaming Support - Built-in support for both simple and streaming responses
- 📦 TypeScript First - Full type safety with generics
- ⚡ Lightweight - Zero dependencies (except uuid and reflect-metadata)
Installation
npm install @oldzy/conduitQuick Start
import { ApplicationBuilder, BaseRequest, BaseResponse, SimpleHandler } from '@oldzy/conduit';
// 1. Define your request
class GetUserRequest extends BaseRequest {
constructor(public userId: string) {
super('GET_USER');
}
}
// 2. Define your response
class GetUserResponse extends BaseResponse {
constructor(public userName: string, public email: string) {
super('GET_USER_RESPONSE');
}
}
// 3. Create a handler
class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
// Your business logic here
return new GetUserResponse(`User ${request.userId}`, '[email protected]');
}
}
// 4. Build and run the application
const app = await new ApplicationBuilder()
.registerHandler('GET_USER', GetUserHandler)
.build();
await app.run();
// 5. Send requests
const request = new GetUserRequest('123');
const response = await app.send<GetUserResponse>(request);
console.log(response.userName); // "User 123"Core Concepts
Requests and Responses
All requests must extend BaseRequest and all responses must extend BaseResponse.
// Request with auto-generated UUID and timestamp
class CreateOrderRequest extends BaseRequest {
constructor(
public productId: string,
public quantity: number
) {
super('CREATE_ORDER'); // Request type identifier
}
}
// Response with success/error handling
class CreateOrderResponse extends BaseResponse {
constructor(public orderId: string, public total: number) {
super('CREATE_ORDER_RESPONSE');
}
}BaseRequest includes:
uuid: Auto-generated unique identifiertimestamp: Request creation timetype: Request type identifier
BaseResponse includes:
success: Boolean indicating success/failureerrors: Optional array of error messagestimestamp: Response creation timeaddError(error): Helper to add errorssetErrors(errors): Helper to set multiple errors
Handlers
Conduit provides two types of handlers:
Simple Handlers
For single-response requests:
class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
const user = await database.getUser(request.userId);
return new GetUserResponse(user.name, user.email);
}
}Streaming Handlers
For streaming/chunked responses:
class StreamLogsRequest extends BaseRequest {
constructor(public filename: string) {
super('STREAM_LOGS');
}
}
class LogChunkResponse extends BaseResponse {
constructor(public line: string) {
super('LOG_CHUNK');
}
}
class StreamLogsHandler extends StreamingHandler<StreamLogsRequest, LogChunkResponse> {
protected async handleStream(request: StreamLogsRequest): Promise<AsyncIterable<LogChunkResponse>> {
return {
async *[Symbol.asyncIterator]() {
const lines = await readFileLines(request.filename);
for (const line of lines) {
yield new LogChunkResponse(line);
}
}
};
}
}Sending Requests
Use the appropriate method based on your handler type:
// For simple handlers - returns a single response
const response = await app.send<GetUserResponse>(request);
// For streaming handlers - returns an async iterable
const stream = await app.stream<LogChunkResponse>(request);
for await (const chunk of stream) {
console.log(chunk.line);
}Important: Using send() with a streaming handler or stream() with a simple handler will throw an error.
Dependency Injection
Conduit includes a full-featured DI container with three lifetime scopes:
Service Lifetimes
- Singleton: Created once and shared across the entire application lifetime
- Scoped: Created once per request (each
send()orstream()call gets a new instance) - Transient: Created every time the service is requested
Important: Conduit automatically creates a new scope for each request, making Scoped services perfect for:
- Database transactions (commit/rollback per request)
- Request-specific caching
- Request context and metadata
- Per-request logging with correlation IDs
class UserRepository {
async getUser(id: string) {
return { id, name: 'John Doe' };
}
}
class Logger {
log(message: string) {
console.log(`[${new Date().toISOString()}] ${message}`);
}
}
const app = await new ApplicationBuilder()
// Register services
.registerSingleton('UserRepository', UserRepository)
.registerTransient('Logger', Logger)
// Register handler with dependency injection
.registerFactory('GET_USER', (provider) => {
const repo = provider.getRequiredService<UserRepository>('UserRepository');
const logger = provider.getRequiredService<Logger>('Logger');
return new GetUserHandler(repo, logger);
})
.build();Scoped Services Example
Scoped services are perfect for per-request state. Each send() or stream() call creates a new scope:
// Database transaction that lives for the duration of a request
class DatabaseTransaction {
private isCommitted = false;
private isRolledBack = false;
async execute(query: string) {
if (this.isCommitted || this.isRolledBack) {
throw new Error('Transaction already completed');
}
// Execute query within transaction
}
async commit() {
this.isCommitted = true;
// Commit transaction
}
async rollback() {
this.isRolledBack = true;
// Rollback transaction
}
}
// Request cache that's cleared after each request
class RequestCache {
private cache = new Map<string, any>();
get<T>(key: string): T | undefined {
return this.cache.get(key);
}
set<T>(key: string, value: T): void {
this.cache.set(key, value);
}
}
// Handler using scoped services
class CreateOrderHandler extends SimpleHandler<CreateOrderRequest, CreateOrderResponse> {
constructor(
private transaction: DatabaseTransaction,
private cache: RequestCache
) {
super();
}
protected async handleSimple(request: CreateOrderRequest): Promise<CreateOrderResponse> {
try {
// Check cache first
const cached = this.cache.get('product:' + request.productId);
// Use transaction
await this.transaction.execute('INSERT INTO orders...');
await this.transaction.execute('UPDATE inventory...');
// Commit transaction
await this.transaction.commit();
return new CreateOrderResponse('order-123');
} catch (error) {
// Rollback on error
await this.transaction.rollback();
throw error;
}
}
}
const app = await new ApplicationBuilder()
// Scoped services - new instance per request
.registerScoped('DatabaseTransaction', DatabaseTransaction)
.registerScoped('RequestCache', RequestCache)
// Handler factory
.registerFactory('CREATE_ORDER', (provider) => {
const transaction = provider.getRequiredService<DatabaseTransaction>('DatabaseTransaction');
const cache = provider.getRequiredService<RequestCache>('RequestCache');
return new CreateOrderHandler(transaction, cache);
})
.build();
await app.run();
// Each request gets its own transaction and cache
const order1 = await app.send(new CreateOrderRequest('product-1'));
const order2 = await app.send(new CreateOrderRequest('product-2'));
// order1 and order2 had separate DatabaseTransaction instancesUsing @Inject Decorator
import { Inject } from '@oldzy/conduit/di';
class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
constructor(
@Inject('UserRepository') private repository: UserRepository,
@Inject('Logger') private logger: Logger
) {
super();
}
protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
this.logger.log(`Fetching user ${request.userId}`);
const user = await this.repository.getUser(request.userId);
return new GetUserResponse(user.name, user.email);
}
}Pipeline Behaviors (Middleware)
Behaviors allow you to add cross-cutting concerns like logging, validation, and error handling:
class LoggingBehavior implements IPipelineBehavior {
async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
console.log(`Handling: ${context.request.type}`);
const startTime = Date.now();
const response = await next();
const duration = Date.now() - startTime;
console.log(`Completed in ${duration}ms`);
return response;
}
}
class ErrorHandlingBehavior implements IPipelineBehavior {
async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
try {
return await next();
} catch (error) {
const response = new BaseResponse('ERROR_RESPONSE');
response.addError(error instanceof Error ? error.message : 'Unknown error');
return response;
}
}
}
const app = await new ApplicationBuilder()
.registerHandler('GET_USER', GetUserHandler)
.registerBehavior(new LoggingBehavior())
.registerBehavior(new ErrorHandlingBehavior())
.build();Execution Order: Behaviors are executed in the order they are registered:
LoggingBehavior (before) → ErrorHandlingBehavior (before) → Handler → ErrorHandlingBehavior (after) → LoggingBehavior (after)Transport Adapters
Adapters connect your application to specific transports (HTTP, IPC, WebSocket, etc.):
interface IApplicationAdapter {
readonly name: string;
configure(services: ServiceCollection): void | Promise<void>;
initialize?(app: Application): void | Promise<void>;
dispose?(): void | Promise<void>;
}Example: HTTP Adapter with Express
import express from 'express';
class ExpressAdapter implements IApplicationAdapter {
readonly name = 'express-adapter';
private server?: Server;
configure(services: ServiceCollection): void {
// Register transport-specific services
}
async initialize(app: Application): Promise<void> {
const expressApp = express();
expressApp.post('/api/:requestType', async (req, res) => {
const { requestType } = req.params;
// Create request from HTTP body
const request = this.createRequest(requestType, req.body);
try {
const response = await app.send(request);
res.json(response);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
this.server = expressApp.listen(3000);
}
async dispose(): Promise<void> {
if (this.server) {
this.server.close();
}
}
}
const app = await new ApplicationBuilder()
.registerHandler('GET_USER', GetUserHandler)
.use(new ExpressAdapter())
.build();
await app.run();Example: Electron IPC Adapter with Streaming
import { ipcMain } from 'electron';
class ElectronAdapter implements IApplicationAdapter {
readonly name = 'electron-adapter';
private requestControllers = new Map<string, AbortController>();
configure(services: ServiceCollection): void {}
async initialize(app: Application): Promise<void> {
// Handle simple requests
ipcMain.handle('conduit:send', async (event, requestData) => {
const request = this.createRequest(requestData);
return await app.send(request);
});
// Handle streaming requests
ipcMain.handle('conduit:stream', async (event, requestData) => {
const request = this.createRequest(requestData);
const requestId = request.uuid;
const abortController = new AbortController();
this.requestControllers.set(requestId, abortController);
try {
const stream = await app.stream(request);
for await (const response of stream) {
if (abortController.signal.aborted) break;
event.sender.send(`conduit:stream:${requestId}`, response);
}
event.sender.send(`conduit:stream:${requestId}:complete`);
} catch (error) {
event.sender.send(`conduit:stream:${requestId}:error`, error.message);
} finally {
this.requestControllers.delete(requestId);
}
});
// Handle cancellation
ipcMain.on('conduit:cancel', (event, requestId) => {
const controller = this.requestControllers.get(requestId);
if (controller) {
controller.abort();
this.requestControllers.delete(requestId);
}
});
}
dispose(): void {
ipcMain.removeHandler('conduit:send');
ipcMain.removeHandler('conduit:stream');
ipcMain.removeAllListeners('conduit:cancel');
}
}Application Builder API
The ApplicationBuilder provides a fluent API for configuring your application:
const app = await new ApplicationBuilder()
// Register handlers
.registerHandler('REQUEST_TYPE', HandlerClass, ServiceLifetime.Transient)
.registerHandlers(handlersMap)
// Register services
.registerSingleton('ServiceName', ServiceClass)
.registerScoped('ServiceName', ServiceClass)
.registerTransient('ServiceName', ServiceClass)
.registerFactory('ServiceName', (provider) => new Service(...), ServiceLifetime.Singleton)
// Register behaviors (middleware)
.registerBehavior(new BehaviorInstance())
.registerBehavior(BehaviorClass) // Will be instantiated by DI
// Register adapters
.use(new HttpAdapter())
.use(new WebSocketAdapter())
.build();Application Lifecycle
const app = await builder.build();
// Start the application (calls adapter.initialize())
await app.run();
// Application is now ready to handle requests
const response = await app.send(request);
// Stop the application (calls adapter.dispose())
await app.stop();Advanced Examples
Complete Application with All Features
import {
ApplicationBuilder,
BaseRequest,
BaseResponse,
SimpleHandler,
StreamingHandler,
IPipelineBehavior,
PipelineContext,
PipelineDelegate,
HandlerResult
} from '@oldzy/conduit';
// Services
class DatabaseService {
async query(sql: string) { /* ... */ }
}
class CacheService {
async get(key: string) { /* ... */ }
async set(key: string, value: any) { /* ... */ }
}
// Request/Response
class GetProductRequest extends BaseRequest {
constructor(public productId: string) {
super('GET_PRODUCT');
}
}
class ProductResponse extends BaseResponse {
constructor(public id: string, public name: string, public price: number) {
super('PRODUCT_RESPONSE');
}
}
// Handler with dependencies
class GetProductHandler extends SimpleHandler<GetProductRequest, ProductResponse> {
constructor(
private db: DatabaseService,
private cache: CacheService
) {
super();
}
protected async handleSimple(request: GetProductRequest): Promise<ProductResponse> {
// Check cache first
const cached = await this.cache.get(`product:${request.productId}`);
if (cached) return cached;
// Query database
const product = await this.db.query(`SELECT * FROM products WHERE id = ?`, [request.productId]);
const response = new ProductResponse(product.id, product.name, product.price);
// Cache result
await this.cache.set(`product:${request.productId}`, response);
return response;
}
}
// Caching behavior
class CachingBehavior implements IPipelineBehavior {
constructor(private cache: CacheService) {}
async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
const cacheKey = `${context.request.type}:${JSON.stringify(context.request)}`;
const cached = await this.cache.get(cacheKey);
if (cached) return cached;
const response = await next();
if (response instanceof BaseResponse && response.success) {
await this.cache.set(cacheKey, response);
}
return response;
}
}
// Validation behavior
class ValidationBehavior implements IPipelineBehavior {
async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
// Add your validation logic
if (!context.request.type) {
const response = new BaseResponse('VALIDATION_ERROR');
response.addError('Request type is required');
return response;
}
return await next();
}
}
// Build the application
const app = await new ApplicationBuilder()
// Register services
.registerSingleton('DatabaseService', DatabaseService)
.registerSingleton('CacheService', CacheService)
// Register handler with factory
.registerFactory('GET_PRODUCT', (provider) => {
const db = provider.getRequiredService<DatabaseService>('DatabaseService');
const cache = provider.getRequiredService<CacheService>('CacheService');
return new GetProductHandler(db, cache);
})
// Register behaviors (order matters!)
.registerFactory('ValidationBehavior', (provider) => new ValidationBehavior())
.registerFactory('CachingBehavior', (provider) => {
const cache = provider.getRequiredService<CacheService>('CacheService');
return new CachingBehavior(cache);
})
.build();
await app.run();
// Use the application
const request = new GetProductRequest('123');
const response = await app.send<ProductResponse>(request);
console.log(response.name, response.price);Testing
Conduit is designed to be testable. You can easily mock services and test handlers in isolation:
import { describe, it, expect } from 'vitest';
describe('GetUserHandler', () => {
it('should return user data', async () => {
const mockRepo = {
getUser: async (id: string) => ({ id, name: 'Test User' })
};
const handler = new GetUserHandler(mockRepo as any);
const request = new GetUserRequest('123');
const response = await handler.handle(request);
expect(response.userName).toBe('Test User');
expect(response.success).toBe(true);
});
});API Reference
Core Classes
- ApplicationBuilder - Fluent API for building applications
- Application - Main application class
- Mediator - Core mediator implementation
- ServiceProvider - DI container
- ServiceCollection - Service registration
Base Classes
- BaseRequest - Base class for all requests
- BaseResponse - Base class for all responses
- SimpleHandler - Base class for simple handlers
- StreamingHandler - Base class for streaming handlers
Interfaces
- IRequestHandler - Handler interface
- IPipelineBehavior - Behavior/middleware interface
- IApplicationAdapter - Transport adapter interface
Types
- ServiceLifetime - Enum: Singleton, Scoped, Transient
- HandlerResult - Union:
Promise<TResponse> | Promise<AsyncIterable<TResponse>>
Best Practices
- Use specific request/response types - Don't use generic types, create specific classes for each use case
- Keep handlers focused - Each handler should do one thing well
- Use behaviors for cross-cutting concerns - Don't repeat logging, validation, etc. in every handler
- Register services by interface - Use string/symbol identifiers for better testability
- Handle errors in behaviors - Use an error handling behavior instead of try-catch in every handler
- Use dependency injection - Don't create dependencies inside handlers, inject them
- Separate transport concerns - Keep business logic in handlers, transport logic in adapters
- Choose the right service lifetime:
- Singleton for stateless services, repositories, configuration
- Scoped for database transactions, request caching, request context
- Transient for lightweight, stateful operations
- Leverage scoped services - Use them for per-request resources that need cleanup (transactions, connections, caches)
License
MIT
