pp-event-bus
v1.11.0
Published
Distributed event bus library supporting Redis/DragonflyDB
Maintainers
Readme
pp-event-bus
🇬🇧 Modern library for distributed event handling with Redis Streams. Supports event versioning, metadata, validation, and fanout pattern (1:N).
🇵🇱 Nowoczesna biblioteka do rozproszonej obsługi zdarzeń z Redis Streams. Wspiera wersjonowanie, metadata, walidację i wzorzec rozgłaszania (1:N).
Architecture / Architektura
High-Level Pattern / Wzorzec Wysokopoziomowy
┌─────────────┐ Redis ┌─────────────┐ ┌─────────────┐
│ Event │ Stream │ EventBus │ ┌──> │ Handler 1 │
│ (Facts) │ ────────> │ (Streams) │ ├──> │ Handler 2 │
└─────────────┘ (1:N) └─────────────┘ └──> │ Handler N │
└─────────────┘
Consumer GroupsInternal Components / Komponenty Wewnętrzne
┌─────────────────────────────────────────────────────┐
│ EventBus │
│ (Orchestration + Public API) │
└──────────┬──────────────────────────────────────────┘
│
├──► StreamManager (Stream key management + cache)
├──► EventPublisher (Publish events to Redis XADD)
├──► ConsumerGroupManager (Create consumer groups)
├──► ConsumerOrchestrator (Consumer lifecycle)
│ └──► StreamConsumer (XREADGROUP loop)
└──► EventLogger (Optional JSONL persistence)🇬🇧 Modular design following Single Responsibility Principle (SRP) 🇵🇱 Modułowa architektura zgodna z zasadą pojedynczej odpowiedzialności (SRP)
Each component has one responsibility:
- StreamManager: Generate and cache stream keys
- EventPublisher: Serialize and publish events to Redis
- ConsumerGroupManager: Create and manage Redis consumer groups
- ConsumerOrchestrator: Register and orchestrate consumers
- StreamConsumer: Run XREADGROUP loop and process messages
- EventLogger: Persist events to JSONL files (optional)
Key Concepts / Kluczowe Koncepty
EventBus (Redis Streams)
🇬🇧 Events represent facts that happened (1:N fanout pattern) 🇵🇱 Zdarzenia reprezentują fakty które się wydarzyły (wzorzec rozgłaszania 1:N)
Features / Funkcjonalności:
- One event → Multiple handlers - Fanout pattern for broadcasting events
- Consumer groups - Reliable message delivery with load balancing
- Real-time streaming - Redis Streams for high-performance event distribution
- Modular architecture - Components with clear separation of responsibilities (SRP)
- Event versioning - Semantic versioning support for backward compatibility
- Metadata support - Attach contextual information to events
- Built-in validation - Optional validation hook in Event class
📚 Detailed Architecture Documentation - Complete technical documentation of EventBus internal architecture
For 1:1 command patterns, see pp-command-bus
Installation / Instalacja
npm i --save pp-event-bus📦 Migracja z v1.x? Zobacz CHANGELOG.md dla szczegółów migracji.
Quick Start / Szybki Start
import { Event, EventBus, EventBusConfig } from 'pp-event-bus';
// Define event with validation / Zdefiniuj zdarzenie z walidacją
class OrderCreatedEvent extends Event {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly totalAmount: number
) {
super('1.0.0'); // Optional version
this.validate();
}
protected validate(): void {
if (this.totalAmount <= 0) {
throw new Error('Amount must be greater than 0');
}
}
}
// Setup event bus / Skonfiguruj magistralę zdarzeń
const config = new EventBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console
});
const eventBus = new EventBus(config);
// Subscribe multiple handlers (fanout) / Subskrybuj wiele handlerów (rozgłaszanie)
// NOTE: Handlers receive plain objects (not class instances)
// Email service
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📧 Sending email for order ${event.__id}`);
console.log(`Event timestamp: ${new Date(event.__time).toISOString()}`);
}, { groupName: 'email-service' });
// Inventory service
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📦 Updating inventory for order ${event.__payload.orderId}`);
// Access metadata if needed
const userId = event.__metadata['userId'];
}, { groupName: 'inventory-service' });
// Analytics service
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📊 Recording analytics (v${event.__version})`);
}, { groupName: 'analytics-service' });
// Emit event with metadata / Emituj zdarzenie z metadata
const event = new OrderCreatedEvent('ORDER-123', 'CUST-123', 299.99);
event.setMetadata('userId', 'USER-456');
event.setMetadata('source', 'web-app');
await eventBus.emit(event); // All handlers receive it / Wszyscy otrzymająComplete Example / Kompletny Przykład
import { Event, EventBus, EventBusConfig } from 'pp-event-bus';
// Define event with versioning and validation / Zdefiniuj zdarzenie z wersjonowaniem i walidacją
class OrderCreatedEvent extends Event {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly totalAmount: number,
public readonly currency: string = 'PLN'
) {
super('2.0.0'); // Semantic versioning
this.validate();
}
protected validate(): void {
if (!this.orderId?.trim()) throw new Error('OrderId required');
if (!this.customerId?.trim()) throw new Error('CustomerId required');
if (this.totalAmount <= 0) throw new Error('Amount must be > 0');
}
}
async function main() {
const config = new EventBusConfig({
redisUrl: 'redis://localhost:6379',
streamMaxLen: 10000,
logger: console,
});
const eventBus = new EventBus(config);
// Email service handler / Handler serwisu email
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📧 [${event.__id}] Sending confirmation for order ${event.__payload.orderId}`);
console.log(` Timestamp: ${new Date(event.__time).toISOString()}`);
console.log(` Version: ${event.__version}`);
// Access metadata
const source = event.__metadata['source'];
console.log(` Source: ${source || 'unknown'}`);
}, { groupName: 'email-service' });
// Inventory service handler / Handler serwisu magazynowego
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📦 Inventory: Reserving items for ${event.__payload.orderId}`);
console.log(` Amount: ${event.__payload.totalAmount} ${event.__payload.currency}`);
}, { groupName: 'inventory-service' });
// Analytics service handler / Handler serwisu analitycznego
await eventBus.subscribe(OrderCreatedEvent, async (event) => {
console.log(`📊 Analytics: Recording order (event v${event.__version})`);
// Store all metadata
const metadata = { ...event.__metadata };
console.log(` Metadata:`, metadata);
}, { groupName: 'analytics-service' });
// Create and emit event with metadata / Stwórz i emituj zdarzenie z metadata
const event = new OrderCreatedEvent('ORDER-123', 'CUST-456', 299.99, 'PLN');
event.setMetadata('source', 'web-app');
event.setMetadata('userId', 'USER-789');
event.setMetadata('correlationId', crypto.randomUUID());
await eventBus.emit(event);
console.log('✅ Event emitted to all subscribers');
}
main().catch(console.error);Environment Variables / Zmienne Środowiskowe
| Variable | Default | Description |
|----------|---------|-------------|
| REDIS_URL | redis://localhost:6379 | Redis/DragonflyDB connection URL |
| EVENT_BUS_STREAM_MAX_LEN | 10000 | Max stream length |
| EVENT_BUS_CONSUMER_TIMEOUT | 5000 | Consumer read timeout (ms) |
| EVENT_BUS_LOG | "" | Path to event log directory (optional) |
| LOG_LEVEL | info | Log level: debug, info, warn, error |
Running Examples / Uruchamianie Przykładów
# Build / Zbuduj
npm run build
# Run Redis / Uruchom Redis
docker compose up -d
# Example 1: Order Service Consumer / Przykład 1: Konsument Serwisu Zamówień
npm run order-service
# Example 2: Shop Simulator Producer / Przykład 2: Symulator Sklepu (Producer)
npm run shop-simulator
# Example 3: Temperature Monitor / Przykład 3: Monitor Temperatury
npm run temperature-monitorWhen to Use EventBus? / Kiedy Używać EventBus?
Use EventBus when / Użyj EventBus gdy:
- Multiple services need to react to the same event
- You're broadcasting facts that happened
- You need fanout capability (1:N pattern)
- Services should remain loosely coupled
- Examples: OrderCreated, UserRegistered, PaymentProcessed, TemperatureAlert
Note: For 1:1 command patterns, see pp-command-bus
Configuration / Konfiguracja
import { EventBusConfig } from 'pp-event-bus';
// EventBus configuration
const config = new EventBusConfig({
redisUrl: 'redis://localhost:6379',
streamMaxLen: 10000,
streamConsumerTimeout: 5000,
logger: console
});
const eventBus = new EventBus(config);EventBus Components / Komponenty EventBus
🇬🇧 EventBus follows the Single Responsibility Principle - each component has one clear responsibility. 🇵🇱 EventBus przestrzega zasady pojedynczej odpowiedzialności - każdy komponent ma jedną jasną odpowiedzialność.
StreamManager
Responsibility: Stream key management and caching
getStreamKey(eventName: string): string
// Generates: "event-stream:{EventClassName}"
// Uses Map<> cache for O(1) lookupsEventPublisher
Responsibility: Event serialization and publication to Redis
publish(event: Event, streamKey: string): Promise<string>
// 1. Serialize event to JSON
// 2. Redis XADD with MAXLEN trimming
// 3. Returns messageId (e.g., "1735894800000-0")ConsumerGroupManager
Responsibility: Create and manage Redis consumer groups
createGroup(streamKey: string, groupName: string): Promise<void>
// 1. Check if group exists (XINFO GROUPS)
// 2. Create group if needed (XGROUP CREATE)
// 3. Handle BUSYGROUP race condition (group already exists)ConsumerOrchestrator
Responsibility: Consumer lifecycle management
registerConsumer(...): Promise<void>
// 1. Create StreamConsumer instance
// 2. Start consumption loop
// 3. Track active consumers in Map<>
// 4. Auto-cleanup inactive consumers (5min threshold)StreamConsumer
Responsibility: Message consumption and processing
consume(): Promise<void>
// Infinite loop while running:
// 1. XREADGROUP (block 5s, read 10 messages)
// 2. Deserialize each message
// 3. Call handler(event)
// 4. XACK on success (auto-retry on failure)EventLogger
Responsibility: Optional event persistence to JSONL files
logEvent(event: Event): Promise<void>
// Append to: eventbus-stream_YYYY-MM-DD.jsonl
// Format: one JSON per line (JSONL)Event Class Structure / Struktura Klasy Event
🇬🇧 The Event class is an abstract base class that encapsulates both infrastructure and business data.
🇵🇱 Klasa Event jest abstrakcyjną klasą bazową, która enkapsuluje zarówno dane infrastrukturalne, jak i biznesowe.
Infrastructure Fields / Pola Infrastrukturalne
All events contain these read-only infrastructure fields (prefixed with __):
abstract class Event<TPayload = Record<string, unknown>> {
public readonly __id: string // UUID v4 - unique event ID
public readonly __name: string // Event class name (e.g., "OrderCreatedEvent")
public readonly __time: number // Unix timestamp in milliseconds
public readonly __version: string // Semantic version (e.g., "1.0.0")
public readonly __metadata: Record<...> // Additional contextual data
public readonly __payload: TPayload // Business data (type-safe)
}🎯 Design principle: Separation of infrastructure (__* fields) from business logic (__payload)
Getters for Convenient Access
const event = new OrderCreatedEvent({ orderId: 'ORDER-123', amount: 299.99 });
// Access without __ prefix
console.log(event.id); // Same as event.__id
console.log(event.name); // Same as event.__name
console.log(event.timestamp); // Same as event.__time
console.log(event.timestampDate); // Converts __time to Date object
console.log(event.version); // Same as event.__version
console.log(event.payload); // Same as event.__payloadType Safety with Generics
interface OrderPayload {
orderId: string;
amount: number;
}
class OrderCreatedEvent extends Event<OrderPayload> {
constructor(payload: OrderPayload) {
super(payload);
}
}
const event = new OrderCreatedEvent({ orderId: 'ORDER-123', amount: 299.99 });
event.payload.orderId; // ✅ Type-safe access
event.payload.amount; // ✅ TypeScript knows the structureEvent Class Features / Funkcjonalności Klasy Event
🇬🇧 Version 1.10.2+ introduces enhanced Event class with getters, metadata API, versioning, and validation support. 🇵🇱 Wersja 1.10.2+ wprowadza ulepszoną klasę Event z getterami, API metadata, wersjonowaniem i wsparciem walidacji.
Built-in Properties / Wbudowane Właściwości
class OrderCreatedEvent extends Event {
constructor(
public readonly orderId: string,
public readonly amount: number
) {
super('1.0.0'); // Optional version parameter
}
}
const event = new OrderCreatedEvent('ORDER-123', 299.99);
// Getters / Gettery
console.log(event.id); // UUID v4: "550e8400-e29b-41d4-a716-446655440000"
console.log(event.name); // Class name: "OrderCreatedEvent"
console.log(event.timestamp); // Unix timestamp: 1735894800000
console.log(event.timestampDate); // Date object: 2025-01-03T10:00:00.000Z
console.log(event.version); // "1.0.0"Metadata API
🇬🇧 Metadata allows attaching additional contextual information to events. 🇵🇱 Metadata umożliwia dołączanie dodatkowych informacji kontekstowych do zdarzeń.
Available Methods / Dostępne Metody
// Set metadata / Ustaw metadata
setMetadata(key: string, value: unknown): void
// Store any additional data (correlation ID, user ID, source, etc.)
// Get metadata with type safety / Pobierz metadata z typowaniem
getMetadata<T = unknown>(key: string): T | undefined
// Returns the value or undefined if key doesn't exist
// Generic type T for type-safe access
// Check existence / Sprawdź istnienie
hasMetadata(key: string): boolean
// Returns true if key exists in metadata
// Remove metadata / Usuń metadata
removeMetadata(key: string): void
// Deletes the key from metadata
// Get all metadata / Pobierz wszystkie metadata
getAllMetadata(): Record<string, unknown>
// Returns a copy of all metadata (safe from external modifications)Usage Example / Przykład Użycia
const event = new OrderCreatedEvent({ orderId: 'ORDER-123', amount: 299.99 });
// Attach tracking information / Dołącz informacje śledzenia
event.setMetadata('userId', 'USER-456');
event.setMetadata('source', 'web-app');
event.setMetadata('correlationId', crypto.randomUUID());
event.setMetadata('ipAddress', '192.168.1.100');
// Type-safe retrieval / Bezpieczne pobieranie z typowaniem
const userId = event.getMetadata<string>('userId'); // string | undefined
const source = event.getMetadata<string>('source'); // string | undefined
// Conditional access / Warunkowy dostęp
if (event.hasMetadata('correlationId')) {
const correlationId = event.getMetadata<string>('correlationId');
console.log('Correlation ID:', correlationId);
}
// Get all for logging / Pobierz wszystkie dla logowania
const allMetadata = event.getAllMetadata();
console.log('Event metadata:', allMetadata);
// { userId: 'USER-456', source: 'web-app', correlationId: '...', ipAddress: '192.168.1.100' }
// Remove sensitive data before publishing / Usuń wrażliwe dane przed publikacją
event.removeMetadata('ipAddress');Common use cases / Typowe przypadki użycia:
- Correlation ID: Track related events across services
- User ID: Identify which user triggered the event
- Source: Track event origin (web-app, mobile-app, admin-panel)
- Tenant ID: Multi-tenancy support
- Transaction ID: Link events to database transactions
- Debug flags: Enable verbose logging for specific events
Versioning Support / Wsparcie Wersjonowania
// Different versions of the same event
class OrderCreatedEvent extends Event {
constructor(
public readonly orderId: string,
public readonly amount: number,
public readonly currency?: string // Added in v2.0.0
) {
super(currency ? '2.0.0' : '1.0.0');
}
}
const eventV1 = new OrderCreatedEvent('ORDER-123', 299.99);
console.log(eventV1.version); // "1.0.0"
const eventV2 = new OrderCreatedEvent('ORDER-124', 199.99, 'PLN');
console.log(eventV2.version); // "2.0.0"Validation Hook / Hook Walidacyjny
class OrderCreatedEvent extends Event {
constructor(
public readonly orderId: string,
public readonly amount: number
) {
super();
this.validate(); // Call after super() and field initialization
}
protected validate(): void {
if (!this.orderId || this.orderId.trim() === '') {
throw new Error('OrderId cannot be empty');
}
if (this.amount <= 0) {
throw new Error('Amount must be greater than 0');
}
}
}
// Throws error on invalid data
try {
const invalidEvent = new OrderCreatedEvent('', -100);
} catch (error) {
console.error(error.message); // "OrderId cannot be empty"
}Serialization / Serializacja
const event = new OrderCreatedEvent('ORDER-123', 299.99);
event.setMetadata('userId', 'USER-456');
// Serialize to JSON string / Serializuj do JSON string
const jsonString = event.toJSONString();
// '{"orderId":"ORDER-123","amount":299.99,"__name":"OrderCreatedEvent",...}'
// Convert to plain object / Konwertuj do zwykłego obiektu
const plainObject = event.toPlainObject();
// { orderId: 'ORDER-123', amount: 299.99, __name: 'OrderCreatedEvent', ... }Data Flow / Przepływ Danych
Emit Flow (Producer → Redis)
User Code
│
└─► eventBus.emit(event)
│
├─► StreamManager.getStreamKey('OrderCreatedEvent')
│ └─► Returns: "event-stream:OrderCreatedEvent"
│
└─► EventPublisher.publish(event, streamKey)
│
├─► Serialize: { data, eventName, eventId, timestamp }
│
└─► Redis XADD event-stream:OrderCreatedEvent
MAXLEN ~ 10000 *
data {json} eventName {name} ...
└─► Returns messageId: "1735894800000-0"Subscribe Flow (Consumer Registration)
User Code
│
└─► eventBus.subscribe(OrderCreatedEvent, handler, { groupName: 'email-service' })
│
├─► StreamManager.getStreamKey('OrderCreatedEvent')
│ └─► "event-stream:OrderCreatedEvent"
│
├─► ConsumerGroupManager.createGroup(streamKey, 'email-service')
│ └─► XGROUP CREATE event-stream:OrderCreatedEvent
│ email-service $ MKSTREAM
│
└─► ConsumerOrchestrator.registerConsumer(...)
└─► new StreamConsumer(...) → consumer.start()Consume Flow (Message Processing)
StreamConsumer Loop (while running)
│
├─► XREADGROUP GROUP email-service consumer-uuid
│ COUNT 10 BLOCK 5000
│ STREAMS event-stream:OrderCreatedEvent >
│
├─► For each message:
│ ├─► JSON.parse(data) → event (plain object)
│ ├─► await handler(event)
│ └─► Success? → XACK (acknowledge)
│ Error? → NO ACK (auto-retry later)
│
└─► Loop continues...API Reference / Dokumentacja API
EventBus
const eventBus = new EventBus(config);
// Subscribe with consumer group / Subskrybuj z grupą konsumentów
await eventBus.subscribe(EventClass, async (event) => {
// Handle event / Obsłuż zdarzenie
// event is a plain object with: __id, __name, __time, __version, __payload, __metadata
console.log(event.__payload.orderId);
}, {
groupName: 'service-name', // Required / Wymagane
consumerName: 'instance-1' // Optional / Opcjonalne
});
// Emit event / Emituj zdarzenie
await eventBus.emit(event);
// Close / Zamknij
await eventBus.close();PlainEvent Type
🇬🇧 Important: Handlers receive plain objects, not class instances (due to JSON serialization through Redis). 🇵🇱 Ważne: Handlery otrzymują zwykłe obiekty, a nie instancje klas (z powodu serializacji JSON przez Redis).
type PlainEvent<T = Record<string, unknown>> = {
__id: string; // Event UUID
__name: string; // Event class name
__time: number; // Unix timestamp (milliseconds)
__version: string; // Semantic version
__payload: T; // Business data (type-safe)
__metadata: Record<string, unknown>; // Additional context
};
// Usage in handlers / Użycie w handlerach
await eventBus.subscribe(OrderCreatedEvent, async (event: PlainEvent<OrderPayload>) => {
console.log(event.__id); // Direct access
console.log(event.__payload.orderId); // Type-safe payload access
console.log(new Date(event.__time)); // Convert timestamp to Date
console.log(event.__metadata['userId']); // Metadata access
});Design Patterns / Wzorce Projektowe
🇬🇧 pp-event-bus is built on proven software design patterns. 🇵🇱 pp-event-bus jest zbudowany na sprawdzonych wzorcach projektowych.
Single Responsibility Principle (SRP)
Each component has exactly one responsibility:
StreamManager→ Stream keys onlyEventPublisher→ Publishing onlyConsumerGroupManager→ Consumer groups onlyConsumerOrchestrator→ Consumer lifecycle onlyStreamConsumer→ Message consumption only
Benefits: Easier testing, maintenance, and reasoning about code.
Domain-Driven Design (DDD)
Event as Domain Aggregate:
- Events represent immutable facts that happened in the domain
- Infrastructure (
__*fields) separated from business logic (__payload) - Ubiquitous language:
OrderCreatedEvent,PaymentProcessedEvent
Value Objects:
- Events are immutable (all fields
readonly) - Event identity through
__id(UUID)
Observer Pattern (Pub/Sub)
Fanout Pattern (1:N):
// One event → Multiple independent handlers
await eventBus.emit(new OrderCreatedEvent(...)); // 1 publisher
await eventBus.subscribe(OrderCreatedEvent, emailHandler); // Handler 1
await eventBus.subscribe(OrderCreatedEvent, inventoryHandler); // Handler 2
await eventBus.subscribe(OrderCreatedEvent, analyticsHandler); // Handler NBenefits: Loose coupling, scalability, independent service evolution.
Repository Pattern
StreamManager acts as a repository for stream keys:
private streamKeyCache: Map<string, string> = new Map()
getStreamKey(eventName: string): string {
// Cache lookup (O(1))
// Abstract Redis stream access
}Factory Pattern
ConsumerOrchestrator creates StreamConsumer instances:
registerConsumer(...) {
const consumer = new StreamConsumer(...); // Factory
consumer.start();
this.consumers.set(consumerName, consumer);
}Template Method Pattern
Event class provides a validation hook:
abstract class Event {
protected validate(): void {
// Template method - override in subclasses
}
}
class OrderCreatedEvent extends Event {
protected validate(): void {
// Custom validation logic
}
}Dependency Injection
All components receive dependencies through constructor:
constructor(
private readonly redis: Redis,
private readonly logger: ILogger,
private readonly streamManager: StreamManager
) {}Benefits: Testability (mock dependencies), loose coupling, flexibility.
License / Licencja
MIT
