npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@oldzy/conduit

v1.0.8

Published

Transport-agnostic request/response pipeline framework with mediator pattern, dependency injection, and middleware support

Readme

Conduit

Transport-agnostic request/response pipeline framework with mediator pattern, dependency injection, and middleware support.

Features

  • 🎯 Mediator Pattern - Clean request/response handling with type-safe handlers
  • 💉 Dependency Injection - Built-in DI container with three lifetime scopes (Singleton, Scoped, Transient)
  • 🔧 Pipeline Behaviors - Middleware support for cross-cutting concerns (logging, validation, error handling)
  • 🚀 Transport-Agnostic - Adapter pattern for any transport (HTTP, IPC, WebSocket, etc.)
  • 📡 Streaming Support - Built-in support for both simple and streaming responses
  • 📦 TypeScript First - Full type safety with generics
  • Lightweight - Zero dependencies (except uuid and reflect-metadata)

Installation

npm install @oldzy/conduit

Quick Start

import { ApplicationBuilder, BaseRequest, BaseResponse, SimpleHandler } from '@oldzy/conduit';

// 1. Define your request
class GetUserRequest extends BaseRequest {
    constructor(public userId: string) {
        super('GET_USER');
    }
}

// 2. Define your response
class GetUserResponse extends BaseResponse {
    constructor(public userName: string, public email: string) {
        super('GET_USER_RESPONSE');
    }
}

// 3. Create a handler
class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
    protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
        // Your business logic here
        return new GetUserResponse(`User ${request.userId}`, '[email protected]');
    }
}

// 4. Build and run the application
const app = await new ApplicationBuilder()
    .registerHandler('GET_USER', GetUserHandler)
    .build();

await app.run();

// 5. Send requests
const request = new GetUserRequest('123');
const response = await app.send<GetUserResponse>(request);

console.log(response.userName); // "User 123"

Core Concepts

Requests and Responses

All requests must extend BaseRequest and all responses must extend BaseResponse.

// Request with auto-generated UUID and timestamp
class CreateOrderRequest extends BaseRequest {
    constructor(
        public productId: string,
        public quantity: number
    ) {
        super('CREATE_ORDER'); // Request type identifier
    }
}

// Response with success/error handling
class CreateOrderResponse extends BaseResponse {
    constructor(public orderId: string, public total: number) {
        super('CREATE_ORDER_RESPONSE');
    }
}

BaseRequest includes:

  • uuid: Auto-generated unique identifier
  • timestamp: Request creation time
  • type: Request type identifier

BaseResponse includes:

  • success: Boolean indicating success/failure
  • errors: Optional array of error messages
  • timestamp: Response creation time
  • addError(error): Helper to add errors
  • setErrors(errors): Helper to set multiple errors

Handlers

Conduit provides two types of handlers:

Simple Handlers

For single-response requests:

class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
    protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
        const user = await database.getUser(request.userId);
        return new GetUserResponse(user.name, user.email);
    }
}

Streaming Handlers

For streaming/chunked responses:

class StreamLogsRequest extends BaseRequest {
    constructor(public filename: string) {
        super('STREAM_LOGS');
    }
}

class LogChunkResponse extends BaseResponse {
    constructor(public line: string) {
        super('LOG_CHUNK');
    }
}

class StreamLogsHandler extends StreamingHandler<StreamLogsRequest, LogChunkResponse> {
    protected async handleStream(request: StreamLogsRequest): Promise<AsyncIterable<LogChunkResponse>> {
        return {
            async *[Symbol.asyncIterator]() {
                const lines = await readFileLines(request.filename);
                for (const line of lines) {
                    yield new LogChunkResponse(line);
                }
            }
        };
    }
}

Sending Requests

Use the appropriate method based on your handler type:

// For simple handlers - returns a single response
const response = await app.send<GetUserResponse>(request);

// For streaming handlers - returns an async iterable
const stream = await app.stream<LogChunkResponse>(request);
for await (const chunk of stream) {
    console.log(chunk.line);
}

Important: Using send() with a streaming handler or stream() with a simple handler will throw an error.

Dependency Injection

Conduit includes a full-featured DI container with three lifetime scopes:

Service Lifetimes

  • Singleton: Created once and shared across the entire application lifetime
  • Scoped: Created once per request (each send() or stream() call gets a new instance)
  • Transient: Created every time the service is requested

Important: Conduit automatically creates a new scope for each request, making Scoped services perfect for:

  • Database transactions (commit/rollback per request)
  • Request-specific caching
  • Request context and metadata
  • Per-request logging with correlation IDs
class UserRepository {
    async getUser(id: string) {
        return { id, name: 'John Doe' };
    }
}

class Logger {
    log(message: string) {
        console.log(`[${new Date().toISOString()}] ${message}`);
    }
}

const app = await new ApplicationBuilder()
    // Register services
    .registerSingleton('UserRepository', UserRepository)
    .registerTransient('Logger', Logger)
    
    // Register handler with dependency injection
    .registerFactory('GET_USER', (provider) => {
        const repo = provider.getRequiredService<UserRepository>('UserRepository');
        const logger = provider.getRequiredService<Logger>('Logger');
        return new GetUserHandler(repo, logger);
    })
    .build();

Scoped Services Example

Scoped services are perfect for per-request state. Each send() or stream() call creates a new scope:

// Database transaction that lives for the duration of a request
class DatabaseTransaction {
    private isCommitted = false;
    private isRolledBack = false;

    async execute(query: string) {
        if (this.isCommitted || this.isRolledBack) {
            throw new Error('Transaction already completed');
        }
        // Execute query within transaction
    }

    async commit() {
        this.isCommitted = true;
        // Commit transaction
    }

    async rollback() {
        this.isRolledBack = true;
        // Rollback transaction
    }
}

// Request cache that's cleared after each request
class RequestCache {
    private cache = new Map<string, any>();

    get<T>(key: string): T | undefined {
        return this.cache.get(key);
    }

    set<T>(key: string, value: T): void {
        this.cache.set(key, value);
    }
}

// Handler using scoped services
class CreateOrderHandler extends SimpleHandler<CreateOrderRequest, CreateOrderResponse> {
    constructor(
        private transaction: DatabaseTransaction,
        private cache: RequestCache
    ) {
        super();
    }

    protected async handleSimple(request: CreateOrderRequest): Promise<CreateOrderResponse> {
        try {
            // Check cache first
            const cached = this.cache.get('product:' + request.productId);
            
            // Use transaction
            await this.transaction.execute('INSERT INTO orders...');
            await this.transaction.execute('UPDATE inventory...');
            
            // Commit transaction
            await this.transaction.commit();
            
            return new CreateOrderResponse('order-123');
        } catch (error) {
            // Rollback on error
            await this.transaction.rollback();
            throw error;
        }
    }
}

const app = await new ApplicationBuilder()
    // Scoped services - new instance per request
    .registerScoped('DatabaseTransaction', DatabaseTransaction)
    .registerScoped('RequestCache', RequestCache)
    
    // Handler factory
    .registerFactory('CREATE_ORDER', (provider) => {
        const transaction = provider.getRequiredService<DatabaseTransaction>('DatabaseTransaction');
        const cache = provider.getRequiredService<RequestCache>('RequestCache');
        return new CreateOrderHandler(transaction, cache);
    })
    .build();

await app.run();

// Each request gets its own transaction and cache
const order1 = await app.send(new CreateOrderRequest('product-1'));
const order2 = await app.send(new CreateOrderRequest('product-2'));
// order1 and order2 had separate DatabaseTransaction instances

Using @Inject Decorator

import { Inject } from '@oldzy/conduit/di';

class GetUserHandler extends SimpleHandler<GetUserRequest, GetUserResponse> {
    constructor(
        @Inject('UserRepository') private repository: UserRepository,
        @Inject('Logger') private logger: Logger
    ) {
        super();
    }
    
    protected async handleSimple(request: GetUserRequest): Promise<GetUserResponse> {
        this.logger.log(`Fetching user ${request.userId}`);
        const user = await this.repository.getUser(request.userId);
        return new GetUserResponse(user.name, user.email);
    }
}

Pipeline Behaviors (Middleware)

Behaviors allow you to add cross-cutting concerns like logging, validation, and error handling:

class LoggingBehavior implements IPipelineBehavior {
    async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
        console.log(`Handling: ${context.request.type}`);
        const startTime = Date.now();
        
        const response = await next();
        
        const duration = Date.now() - startTime;
        console.log(`Completed in ${duration}ms`);
        
        return response;
    }
}

class ErrorHandlingBehavior implements IPipelineBehavior {
    async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
        try {
            return await next();
        } catch (error) {
            const response = new BaseResponse('ERROR_RESPONSE');
            response.addError(error instanceof Error ? error.message : 'Unknown error');
            return response;
        }
    }
}

const app = await new ApplicationBuilder()
    .registerHandler('GET_USER', GetUserHandler)
    .registerBehavior(new LoggingBehavior())
    .registerBehavior(new ErrorHandlingBehavior())
    .build();

Execution Order: Behaviors are executed in the order they are registered:

LoggingBehavior (before) → ErrorHandlingBehavior (before) → Handler → ErrorHandlingBehavior (after) → LoggingBehavior (after)

Transport Adapters

Adapters connect your application to specific transports (HTTP, IPC, WebSocket, etc.):

interface IApplicationAdapter {
    readonly name: string;
    configure(services: ServiceCollection): void | Promise<void>;
    initialize?(app: Application): void | Promise<void>;
    dispose?(): void | Promise<void>;
}

Example: HTTP Adapter with Express

import express from 'express';

class ExpressAdapter implements IApplicationAdapter {
    readonly name = 'express-adapter';
    private server?: Server;
    
    configure(services: ServiceCollection): void {
        // Register transport-specific services
    }
    
    async initialize(app: Application): Promise<void> {
        const expressApp = express();
        
        expressApp.post('/api/:requestType', async (req, res) => {
            const { requestType } = req.params;
            
            // Create request from HTTP body
            const request = this.createRequest(requestType, req.body);
            
            try {
                const response = await app.send(request);
                res.json(response);
            } catch (error) {
                res.status(500).json({ error: error.message });
            }
        });
        
        this.server = expressApp.listen(3000);
    }
    
    async dispose(): Promise<void> {
        if (this.server) {
            this.server.close();
        }
    }
}

const app = await new ApplicationBuilder()
    .registerHandler('GET_USER', GetUserHandler)
    .use(new ExpressAdapter())
    .build();

await app.run();

Example: Electron IPC Adapter with Streaming

import { ipcMain } from 'electron';

class ElectronAdapter implements IApplicationAdapter {
    readonly name = 'electron-adapter';
    private requestControllers = new Map<string, AbortController>();
    
    configure(services: ServiceCollection): void {}
    
    async initialize(app: Application): Promise<void> {
        // Handle simple requests
        ipcMain.handle('conduit:send', async (event, requestData) => {
            const request = this.createRequest(requestData);
            return await app.send(request);
        });
        
        // Handle streaming requests
        ipcMain.handle('conduit:stream', async (event, requestData) => {
            const request = this.createRequest(requestData);
            const requestId = request.uuid;
            
            const abortController = new AbortController();
            this.requestControllers.set(requestId, abortController);
            
            try {
                const stream = await app.stream(request);
                
                for await (const response of stream) {
                    if (abortController.signal.aborted) break;
                    event.sender.send(`conduit:stream:${requestId}`, response);
                }
                
                event.sender.send(`conduit:stream:${requestId}:complete`);
            } catch (error) {
                event.sender.send(`conduit:stream:${requestId}:error`, error.message);
            } finally {
                this.requestControllers.delete(requestId);
            }
        });
        
        // Handle cancellation
        ipcMain.on('conduit:cancel', (event, requestId) => {
            const controller = this.requestControllers.get(requestId);
            if (controller) {
                controller.abort();
                this.requestControllers.delete(requestId);
            }
        });
    }
    
    dispose(): void {
        ipcMain.removeHandler('conduit:send');
        ipcMain.removeHandler('conduit:stream');
        ipcMain.removeAllListeners('conduit:cancel');
    }
}

Application Builder API

The ApplicationBuilder provides a fluent API for configuring your application:

const app = await new ApplicationBuilder()
    // Register handlers
    .registerHandler('REQUEST_TYPE', HandlerClass, ServiceLifetime.Transient)
    .registerHandlers(handlersMap)
    
    // Register services
    .registerSingleton('ServiceName', ServiceClass)
    .registerScoped('ServiceName', ServiceClass)
    .registerTransient('ServiceName', ServiceClass)
    .registerFactory('ServiceName', (provider) => new Service(...), ServiceLifetime.Singleton)
    
    // Register behaviors (middleware)
    .registerBehavior(new BehaviorInstance())
    .registerBehavior(BehaviorClass) // Will be instantiated by DI
    
    // Register adapters
    .use(new HttpAdapter())
    .use(new WebSocketAdapter())
    
    .build();

Application Lifecycle

const app = await builder.build();

// Start the application (calls adapter.initialize())
await app.run();

// Application is now ready to handle requests
const response = await app.send(request);

// Stop the application (calls adapter.dispose())
await app.stop();

Advanced Examples

Complete Application with All Features

import { 
    ApplicationBuilder, 
    BaseRequest, 
    BaseResponse, 
    SimpleHandler,
    StreamingHandler,
    IPipelineBehavior,
    PipelineContext,
    PipelineDelegate,
    HandlerResult
} from '@oldzy/conduit';

// Services
class DatabaseService {
    async query(sql: string) { /* ... */ }
}

class CacheService {
    async get(key: string) { /* ... */ }
    async set(key: string, value: any) { /* ... */ }
}

// Request/Response
class GetProductRequest extends BaseRequest {
    constructor(public productId: string) {
        super('GET_PRODUCT');
    }
}

class ProductResponse extends BaseResponse {
    constructor(public id: string, public name: string, public price: number) {
        super('PRODUCT_RESPONSE');
    }
}

// Handler with dependencies
class GetProductHandler extends SimpleHandler<GetProductRequest, ProductResponse> {
    constructor(
        private db: DatabaseService,
        private cache: CacheService
    ) {
        super();
    }
    
    protected async handleSimple(request: GetProductRequest): Promise<ProductResponse> {
        // Check cache first
        const cached = await this.cache.get(`product:${request.productId}`);
        if (cached) return cached;
        
        // Query database
        const product = await this.db.query(`SELECT * FROM products WHERE id = ?`, [request.productId]);
        const response = new ProductResponse(product.id, product.name, product.price);
        
        // Cache result
        await this.cache.set(`product:${request.productId}`, response);
        
        return response;
    }
}

// Caching behavior
class CachingBehavior implements IPipelineBehavior {
    constructor(private cache: CacheService) {}
    
    async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
        const cacheKey = `${context.request.type}:${JSON.stringify(context.request)}`;
        
        const cached = await this.cache.get(cacheKey);
        if (cached) return cached;
        
        const response = await next();
        
        if (response instanceof BaseResponse && response.success) {
            await this.cache.set(cacheKey, response);
        }
        
        return response;
    }
}

// Validation behavior
class ValidationBehavior implements IPipelineBehavior {
    async handle(context: PipelineContext, next: PipelineDelegate): HandlerResult<BaseResponse> {
        // Add your validation logic
        if (!context.request.type) {
            const response = new BaseResponse('VALIDATION_ERROR');
            response.addError('Request type is required');
            return response;
        }
        
        return await next();
    }
}

// Build the application
const app = await new ApplicationBuilder()
    // Register services
    .registerSingleton('DatabaseService', DatabaseService)
    .registerSingleton('CacheService', CacheService)
    
    // Register handler with factory
    .registerFactory('GET_PRODUCT', (provider) => {
        const db = provider.getRequiredService<DatabaseService>('DatabaseService');
        const cache = provider.getRequiredService<CacheService>('CacheService');
        return new GetProductHandler(db, cache);
    })
    
    // Register behaviors (order matters!)
    .registerFactory('ValidationBehavior', (provider) => new ValidationBehavior())
    .registerFactory('CachingBehavior', (provider) => {
        const cache = provider.getRequiredService<CacheService>('CacheService');
        return new CachingBehavior(cache);
    })
    
    .build();

await app.run();

// Use the application
const request = new GetProductRequest('123');
const response = await app.send<ProductResponse>(request);

console.log(response.name, response.price);

Testing

Conduit is designed to be testable. You can easily mock services and test handlers in isolation:

import { describe, it, expect } from 'vitest';

describe('GetUserHandler', () => {
    it('should return user data', async () => {
        const mockRepo = {
            getUser: async (id: string) => ({ id, name: 'Test User' })
        };
        
        const handler = new GetUserHandler(mockRepo as any);
        const request = new GetUserRequest('123');
        
        const response = await handler.handle(request);
        
        expect(response.userName).toBe('Test User');
        expect(response.success).toBe(true);
    });
});

API Reference

Core Classes

  • ApplicationBuilder - Fluent API for building applications
  • Application - Main application class
  • Mediator - Core mediator implementation
  • ServiceProvider - DI container
  • ServiceCollection - Service registration

Base Classes

  • BaseRequest - Base class for all requests
  • BaseResponse - Base class for all responses
  • SimpleHandler - Base class for simple handlers
  • StreamingHandler - Base class for streaming handlers

Interfaces

  • IRequestHandler - Handler interface
  • IPipelineBehavior - Behavior/middleware interface
  • IApplicationAdapter - Transport adapter interface

Types

  • ServiceLifetime - Enum: Singleton, Scoped, Transient
  • HandlerResult - Union: Promise<TResponse> | Promise<AsyncIterable<TResponse>>

Best Practices

  1. Use specific request/response types - Don't use generic types, create specific classes for each use case
  2. Keep handlers focused - Each handler should do one thing well
  3. Use behaviors for cross-cutting concerns - Don't repeat logging, validation, etc. in every handler
  4. Register services by interface - Use string/symbol identifiers for better testability
  5. Handle errors in behaviors - Use an error handling behavior instead of try-catch in every handler
  6. Use dependency injection - Don't create dependencies inside handlers, inject them
  7. Separate transport concerns - Keep business logic in handlers, transport logic in adapters
  8. Choose the right service lifetime:
    • Singleton for stateless services, repositories, configuration
    • Scoped for database transactions, request caching, request context
    • Transient for lightweight, stateful operations
  9. Leverage scoped services - Use them for per-request resources that need cleanup (transactions, connections, caches)

License

MIT