@majkapp/event-bus-local
v2.0.1
Published
High-performance in-memory event bus implementation
Downloads
63
Maintainers
Readme
@majkapp/event-bus
A type-safe, generic event bus library with channels, filters, and query builders. Built for high-performance event-driven applications with zero dependencies.
Features
- 🎯 Type-Safe: Full TypeScript support with generics
- 📡 Channel-Based: Organize events by channels (like topics/streams)
- 🔍 Powerful Filtering: Query builder API with chainable filters
- ⚡ High Performance: Lazy channel creation, efficient filtering
- 🛡️ Error Resilient: Isolated error handling - one listener crash doesn't affect others
- 🔧 Zero Dependencies: Pure TypeScript implementation
- 📊 Observable: Built-in diagnostics and metrics
- 🎨 Flexible: Generic design - wrap with domain-specific types
Installation
npm install @majkapp/event-busQuick Start
import { EventBus } from '@majkapp/event-bus';
interface User {
id: string;
name: string;
email: string;
}
const bus = new EventBus();
const userChannel = bus.channel<User>('users');
userChannel.onEvent('created', (user) => {
console.log('New user:', user.name);
});
bus.emit('users', {
type: 'created',
payload: { id: '1', name: 'Alice', email: '[email protected]' },
timestamp: new Date()
});Core Concepts
Events
Events are the fundamental unit of communication:
interface Event<TPayload, TType extends string> {
type: TType; // Event type (e.g., 'created', 'updated')
payload: TPayload; // The actual data
timestamp: Date; // When it occurred
metadata?: Record<string, any>; // Optional metadata
}Channels
Channels organize events by category (similar to topics or streams):
const userChannel = bus.channel<User>('users');
const postChannel = bus.channel<Post>('posts');Each channel can have its own payload type, providing strong type safety.
Subscriptions
Subscribe to events on a channel:
// Listen to specific event type
userChannel.onEvent('created', (user) => {
console.log('User created:', user);
});
// Listen to multiple event types
userChannel.onEvent(['created', 'updated'], (user) => {
console.log('User changed:', user);
});
// Full event listener (includes metadata)
userChannel.subscribe((event) => {
console.log('Event:', event.type, event.payload);
});Advanced Usage
Query Builder
Build complex filters with a fluent API:
userChannel
.query()
.whereType(['created', 'updated'])
.where(event => event.payload.age >= 18)
.whereMetadata('source', 'api')
.subscribe((event) => {
console.log('Adult user from API:', event.payload.name);
});Multiple Channels
const bus = new EventBus();
const userChannel = bus.channel<User>('users');
const postChannel = bus.channel<Post>('posts');
userChannel.onEvent('created', (user) => {
console.log('User:', user.name);
});
postChannel.onEvent('created', (post) => {
console.log('Post:', post.title);
});Global Monitoring
Subscribe to all events across all channels:
bus.subscribeAll((event) => {
console.log('Global:', event.type, event.payload);
});Priority and Options
Control execution order and behavior:
// High priority listener (executes first)
channel.subscribe(listener, filter, { priority: 10 });
// One-time listener (auto-unsubscribes after first event)
channel.subscribe(listener, filter, { once: true });Error Handling
Customize error handling:
import { ErrorHandler } from '@majkapp/event-bus';
ErrorHandler.setGlobalErrorHandler((error, context) => {
console.error(`Error in ${context.channelId}:${context.eventType}`, error);
// Send to error tracking service, etc.
});Diagnostics
Get insights into your event bus:
const diagnostics = bus.getDiagnostics();
console.log(diagnostics);
// {
// channelCount: 2,
// channels: ['users', 'posts'],
// totalListeners: 5,
// totalEventsEmitted: 150,
// channelDiagnostics: {
// users: { listenerCount: 3, eventsEmitted: 100, ... },
// posts: { listenerCount: 2, eventsEmitted: 50, ... }
// }
// }Wrapping with Domain-Specific Types
The generic design allows you to wrap the event bus with domain-specific types:
import { EventBus, Event, QueryableEventChannel } from '@majkapp/event-bus';
type RepositoryEventType = 'created' | 'updated' | 'deleted';
interface RepositoryEvent<T> extends Event<T, RepositoryEventType> {
entityType: string;
}
class MainProcessEventBus {
private bus = new EventBus<any, RepositoryEventType>();
// Typed channel accessors
messages() {
return this.bus.channel<Message>('message');
}
agents() {
return this.bus.channel<Agent>('agent');
}
// Generic channel access
channel<T>(entityType: string) {
return this.bus.channel<T>(entityType);
}
// Domain-specific emit
emit<T>(entityType: string, event: Omit<RepositoryEvent<T>, 'entityType'>) {
this.bus.emit(entityType, { ...event, entityType } as any);
}
}
// Usage
const eventBus = new MainProcessEventBus();
eventBus.agents().onEvent('created', (agent) => {
console.log('Agent created:', agent.name);
});
eventBus.messages()
.query()
.whereType('created')
.where(e => e.payload.role === 'user')
.subscribe((event) => {
console.log('User message:', event.payload.content);
});API Reference
EventBus
Constructor
new EventBus<TPayload, TType>(options?: EventBusOptions)Options:
lazyChannelCreation: Create channels on first access (default:true)
Methods
channel<T>(channelId: string): QueryableEventChannel<T>createChannel<T>(channelId: string): QueryableEventChannel<T>hasChannel(channelId: string): booleandeleteChannel(channelId: string): booleanemit<T>(channelId: string, event: Event<T>): voidsubscribe(listener, filter?, options?): SubscriptionsubscribeAll(listener, options?): Subscriptionclear(): voidgetDiagnostics(): EventBusDiagnostics
EventChannel
Methods
subscribe(listener, filter?, options?): Subscriptionemit(event): voidclear(): voidgetListenerCount(): numbergetDiagnostics(): ChannelDiagnostics
QueryableEventChannel (extends EventChannel)
Methods
query(): QueryBuilderonEvent(type, listener, options?): Subscription
QueryBuilder
Methods
whereType(type): QueryBuilderwhere(predicate): QueryBuilderwhereMetadata(key, value): QueryBuildersubscribe(listener, options?): Subscription
Subscription
Properties
id: string- Unique subscription identifieractive: boolean- Whether subscription is still active
Methods
unsubscribe(): void- Remove the subscription
Performance
- Lazy Channel Creation: Channels are created only when first accessed
- Efficient Filtering: Early exit optimization for filters
- Memory Safe: Proper cleanup on unsubscribe
- Error Isolated: One failing listener doesn't affect others
- Priority Support: Control execution order for performance tuning
Benchmarks
- 1000 events → 1 listener: < 10ms
- 1 event → 100 listeners: < 100ms
- Supports millions of events per second in production workloads
Testing
# Run tests
npm test
# Watch mode
npm run test:watch
# Coverage
npm run test:coverageBuilding
# Build TypeScript
npm run build
# Watch mode
npm run build:watch
# Clean
npm run cleanExamples
See the examples directory for more usage patterns:
01-basic-usage.ts- Getting started02-filtering.ts- Query builder and filters03-multi-channel.ts- Multiple channels and global monitoring04-advanced-patterns.ts- Priority, error handling, diagnostics05-majk-wrapper.ts- Domain-specific wrapper pattern
Use Cases
- Repository Events: Broadcast entity lifecycle events (create, update, delete)
- Plugin Systems: Allow plugins to react to application events
- State Synchronization: Keep multiple components in sync
- Audit Logging: Monitor all system events
- Task Orchestration: Coordinate async workflows
- Real-time Updates: Push updates to multiple subscribers
- Microservices: Internal event bus for service communication
Design Philosophy
- Generic First: No assumptions about your domain
- Type Safety: Leverage TypeScript's type system
- Zero Dependencies: No external packages needed
- Error Tolerance: Isolated failure handling
- Performance: Optimized for high throughput
- Simplicity: Clean, intuitive API
- Extensibility: Easy to wrap and customize
Contributing
Contributions welcome! Please ensure:
- All tests pass:
npm test - Code is formatted: TypeScript strict mode
- Coverage remains above 80%
License
MIT
Credits
Built by the Majk team for high-performance event-driven applications.
