@sailboat-computer/event-bus
v1.1.56
Published
Standardized event bus for sailboat computer v3 with resilience features and offline capabilities
Maintainers
Readme
Sailboat Computer Event Bus
A standardized event bus implementation for the Sailboat Computer v3 system with resilience features and offline capabilities.
Features
- Standardized event publishing and subscription across all services
- Support for multiple transport mechanisms (Redis, in-memory)
- Offline buffering with priority-based retention
- Automatic reconnection with exponential backoff
- Comprehensive metrics and monitoring
- Type-safe event handling with TypeScript
- Integration with resilience patterns (circuit breaker, bulkhead, etc.)
- Schema validation for events with JSON Schema
- Dead letter queue for failed events
- Monitoring and alerting integration
Recent Updates (June 2025)
- API Improvements:
- Added
unsubscribe(eventType)method to unsubscribe from all handlers for a specific event type - Renamed
close()toshutdown()for consistency (breaking change) - Added
isHealthy()method to check event bus health status
- Added
- Enhanced Type Safety: Improved TypeScript type definitions and enum usage for better compile-time checking
- Schema Validation Improvements: Fixed schema overwriting in the SchemaRegistry and improved error handling
- Dead Letter Queue Enhancements: Better error handling for non-existent events and improved null/undefined handling
- Test Suite Stability: Fixed all TypeScript errors in the test suite and improved test reliability
Installation
npm install @sailboat-computer/event-busUsage
Basic Usage
import { createEventBus, EventPriority, EventCategory } from '@sailboat-computer/event-bus';
// Create and initialize event bus
const eventBus = createEventBus({
adapter: {
type: 'redis',
config: {
url: 'redis://localhost:6379',
consumerGroup: 'my-service-group',
consumerName: 'my-service-consumer',
maxBatchSize: 100,
pollInterval: 1000,
reconnectOptions: {
baseDelay: 1000,
maxDelay: 30000,
maxRetries: 10
}
}
},
offlineBuffer: {
maxSize: 10000,
priorityRetention: true
},
metrics: {
enabled: true,
detailedTimings: true
}
});
await eventBus.initialize();
// Subscribe to events
const subscription = await eventBus.subscribe<UserCreatedEvent>('user.created.v1', async (event) => {
console.log(`User created: ${event.data.username}`);
});
// Publish an event
await eventBus.publish<UserCreatedEvent>(
'user.created.v1',
{ id: '123', username: 'sailor' },
{
priority: EventPriority.NORMAL,
category: EventCategory.USER_ACTION
}
);
// Get metrics
const metrics = eventBus.getMetrics();
console.log(`Published events: ${metrics.publishedEvents}`);
// Clean up
await subscription.unsubscribe();
await eventBus.shutdown();Service Integration
// In a service startup file
import { createEventBus } from '@sailboat-computer/event-bus';
import { config } from './config';
export async function initializeEventBus() {
const eventBus = createEventBus(config.eventBus);
await eventBus.initialize();
// Register global error handler
process.on('unhandledRejection', async (reason) => {
await eventBus.publish(
'system.error.v1',
{ reason: String(reason) },
{ priority: EventPriority.HIGH, category: EventCategory.SYSTEM }
);
});
return eventBus;
}
// In a service shutdown file
export async function shutdownEventBus(eventBus) {
await eventBus.shutdown();
}Configuration
The event bus can be configured with the following options:
interface EventBusConfig {
adapter: {
type: 'redis' | 'memory';
config: RedisAdapterConfig | MemoryAdapterConfig;
};
offlineBuffer: {
maxSize: number;
priorityRetention: boolean;
};
metrics: {
enabled: boolean;
detailedTimings: boolean;
};
}Redis Adapter Configuration
interface RedisAdapterConfig {
url: string;
consumerGroup: string;
consumerName: string;
maxBatchSize: number;
pollInterval: number;
reconnectOptions: {
baseDelay: number;
maxDelay: number;
maxRetries: number;
};
}Memory Adapter Configuration
interface MemoryAdapterConfig {
eventTtl?: number;
simulateLatency?: boolean;
latencyRange?: [number, number];
}API Reference
EventBus Interface
interface EventBus {
initialize(config: EventBusConfig): Promise<void>;
shutdown(): Promise<void>;
publish<T>(eventType: string, data: T, options?: PublishOptions): Promise<string>;
subscribe<T>(eventType: string, handler: EventHandler<T>): Promise<Subscription>;
unsubscribe(eventType: string): Promise<void>;
isHealthy(): Promise<boolean>;
getMetrics(): EventBusMetrics;
}Event Handler
type EventHandler<T> = (event: EventEnvelope<T>) => void | Promise<void>;Subscription
interface Subscription {
id: string;
eventType: string;
unsubscribe(): Promise<void>;
}License
MIT
