pp-command-bus
v1.4.0
Published
Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB
Maintainers
Readme
PP Command Bus
Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB.
Opis
pp-command-bus to zaawansowana biblioteka do obsługi rozproszonych komend zgodna ze wzorcem CQRS (Command Query Responsibility Segregation). Zapewnia wysoką wydajność, automatyczną optymalizację i zaawansowane funkcje produkcyjne.
Kluczowe Cechy
- ✅ Fire-and-forget commands - wysyłanie komend bez oczekiwania na wynik
- ✅ RPC (Remote Procedure Call) - synchroniczne wywołania przez Redis Pub/Sub z oczekiwaniem na odpowiedź
- ✅ Automatyczna kompresja - gzip dla payloadów RPC >1KB (konfigurowalne)
- ✅ Auto-optymalizacja - dynamiczne dostosowywanie concurrency na podstawie CPU/RAM
- ✅ Process isolation - izolacja odpowiedzi RPC między procesami Node.js
- ✅ Job queuing - kolejkowanie zadań z retry, backoff i delayed execution
- ✅ BullMQ integration - wydajna kolejka zadań na Redis/DragonflyDB
- ✅ TypeScript - pełne wsparcie typów i strict mode
- ✅ Memory leak protection - zaawansowana diagnostyka i cleanup
- ✅ Command logging - opcjonalne logowanie komend do plików JSONL
- ✅ RPC Job Cancellation - automatyczne usuwanie niepodjętych jobów RPC przy timeout
- ✅ Modularna architektura - komponenty zgodne z zasadami SOLID, DDD i SRP
Architektura Systemu
Diagram Komponentów
┌─────────────────────────────────────────────────────────────────────┐
│ CommandBus │
│ ┌────────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ QueueManager │ │ RpcCoordinator│ │ WorkerOrchestrator │ │
│ │ - Cache kolejek│ │ - Pub/Sub │ │ - Dynamiczne concurrency│ │
│ │ - BullMQ Queue │ │ - Process ID │ │ - Worker benchmark │ │
│ └────────┬───────┘ └──────┬───────┘ └────────┬────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌────────▼──────────────────▼────────────────────▼───────────────┐ │
│ │ JobProcessor (Command Handler Execution) │ │
│ │ - Kompresja/dekompresja payloadów │ │
│ │ - Wykonywanie handlerów │ │
│ │ - Wysyłanie odpowiedzi RPC przez Pub/Sub │ │
│ └──────────────────────────┬──────────────────────────────────────┘ │
└─────────────────────────────┼────────────────────────────────────────┘
│
┌─────────▼──────────┐
│ PayloadCompression │
│ Service │
│ - gzip compression │
│ - threshold: 1KB │
└─────────────────────┘Główne Serwisy
1. CommandBus (główna klasa orkiestrująca)
- Odpowiedzialność: Główny punkt wejścia dla użytkownika, orkiestracja wszystkich komponentów
- Metody publiczne:
dispatch(command)- wysyłanie fire-and-forgetcall(command, timeout)- synchroniczne RPChandle(commandClass, handler)- rejestracja handlerówclose()- graceful shutdown
- Zarządzanie połączeniami: 3 dedykowane połączenia Redis (Queue, Worker, RPC)
2. RpcCoordinator (zarządzanie RPC)
- Odpowiedzialność: Zarządzanie cyklem życia wywołań RPC przez Redis Pub/Sub
- Kluczowe funkcje:
- Shared Subscriber - jeden subscriber dla wszystkich RPC calls (pattern matching)
- Process Isolation - UUID procesu Node.js dla izolacji odpowiedzi między procesami
- Timeout Management - automatyczne cleanup po timeout
- Job Cancellation - usuwanie niepodjętych jobów przy timeout (optymalizacja zasobów)
- Multiplexing - routing odpowiedzi do odpowiednich promises
- Kanały:
rpc:response:{processId}:{correlationId}
3. WorkerOrchestrator (orkiestracja workerów)
- Odpowiedzialność: Zarządzanie workerami BullMQ i dynamiczna optymalizacja
- Kluczowe funkcje:
- WorkerBenchmark - automatyczny benchmark przy rejestracji handlera
- WorkerMetricsCollector - event-driven metrics collection
- Dynamic Concurrency - dostosowywanie concurrency +/-20% co 30s
- Event Handlers - obsługa zdarzeń: active, completed, failed, stalled
- Limity concurrency: min 10, max 2000
4. JobProcessor (wykonywanie handlerów)
- Odpowiedzialność: Wykonywanie handlerów komend i obsługa odpowiedzi RPC
- Flow przetwarzania:
- Dekompresja payloadu (jeśli skompresowany)
- Sprawdzenie flagi cancellation (pomijanie anulowanych RPC)
- Rekonstrukcja obiektów Date
- Opcjonalne logowanie komendy
- Wykonanie handlera
- Wysłanie odpowiedzi RPC przez Pub/Sub (jeśli RPC)
- Kompresja odpowiedzi: automatyczna kompresja przez PayloadCompressionService
- RPC Cancellation: pomijanie jobów oznaczonych jako anulowane (timeout)
5. QueueManager (zarządzanie kolejkami)
- Odpowiedzialność: Cache kolejek BullMQ dla optymalizacji pamięci
- Funkcje:
getOrCreateQueue(commandName)- lazy loading kolejekcloseAllQueues()- graceful shutdown wszystkich kolejek
- Naming:
{CommandName}jako nazwa kolejki
6. PayloadCompressionService (kompresja)
- Odpowiedzialność: Automatyczna kompresja/dekompresja payloadów gzip
- Threshold: 1024 bajty (1KB) domyślnie, konfigurowalne przez ENV
- Metody:
compressCommand(command)- dodaje flagę__compresseddecompressCommand(command)- dekompresja i usunięcie flagicompress(data)- generyczna kompresja do base64decompress(data, compressed)- generyczna dekompresja
- Współdzielony serwis: jedna instancja dla całego CommandBus
7. RpcJobCancellationService (anulowanie jobów RPC)
- Odpowiedzialność: Zarządzanie anulowaniem niepodjętych jobów RPC przy timeout
- Kluczowe funkcje:
- markAsCancelled(correlationId) - oznacza job jako anulowany w Redis
- isCancelled(correlationId) - sprawdza flagę cancellation
- clearCancellation(correlationId) - usuwa flagę po przetworzeniu/usunięciu
- tryRemoveJob(jobId, queueName, callback) - próba usunięcia joba z kolejki
- TTL kluczy Redis: 24 godziny (automatyczne wygaśnięcie)
- Graceful degradation: błędy Redis nie blokują przetwarzania (zwraca false)
- Klucze Redis:
rpc:cancelled:{correlationId}
8. CommandLogger (opcjonalne logowanie)
- Odpowiedzialność: Persystencja komend do plików JSONL
- Format:
{timestamp}.jsonl- rotacja co godzinę - Zawartość: pełny payload komendy z metadanymi
- Aktywacja: przez ENV
COMMAND_BUS_LOG=./command-logs
9. AutoConfigOptimizer (auto-optymalizacja)
- Odpowiedzialność: Obliczanie optymalnego concurrency na podstawie zasobów systemowych
- Heurystyka:
- I/O-heavy workload (Redis/BullMQ)
concurrency = CPU cores * 2 + (availableMemory / 512MB)- Zakładane 512MB RAM per worker
- Aktywacja: domyślnie włączone (ENV
COMMAND_BUS_AUTO_OPTIMIZE=falsewyłącza)
10. RedisConnectionFactory (fabryka połączeń Redis)
- Odpowiedzialność: Tworzenie połączeń Redis z wbudowaną obsługą błędów i eventów
- Zgodność z SRP: CommandBus nie zarządza bezpośrednio połączeniami
- Kluczowe funkcje:
create(options, name)- tworzy połączenie z pełną obsługą eventówcreateForWorker(options, name)- dodajemaxRetriesPerRequest: nulldla BullMQ- Obsługa eventów:
error,close,reconnecting,connect,ready - Formatowanie błędów
AggregateError(Node.js dual-stack IPv4/IPv6)
- Event Handlers:
on('error')- logowanie błędów połączenia (zapobiegaUnhandled error event)on('close')- informacja o zamknięciu połączeniaon('reconnecting')- informacja o ponownym łączeniuon('connect')/on('ready')- potwierdzenie nawiązania połączenia
Flow Przepływu Danych
Flow 1: dispatch() - Fire-and-forget
User Code
│
├─→ 1. commandBus.dispatch(command)
│
├─→ 2. PayloadCompressionService.compressCommand(command)
│ └─→ Jeśli payload >1KB → gzip → base64 → __compressed: true
│
├─→ 3. QueueManager.getOrCreateQueue(commandName)
│ └─→ Cache hit/miss → zwraca Queue
│
├─→ 4. queue.add(commandName, compressedCommand, options)
│ └─→ BullMQ dodaje job do Redis
│
└─→ 5. Promise<void> resolved (nie czekamy na wynik)
Worker Side (asynchronicznie)
│
├─→ 6. Worker pobiera job z kolejki
│
├─→ 7. JobProcessor.process(job)
│ ├─→ Dekompresja (jeśli __compressed)
│ ├─→ Rekonstrukcja Date
│ ├─→ Opcjonalne logowanie (CommandLogger)
│ └─→ Wykonanie handlera
│
└─→ 8. Worker kończy job (success/fail)Flow 2: call() - RPC przez Redis Pub/Sub
User Code
│
├─→ 1. commandBus.call(command, timeout)
│
├─→ 2. RpcCoordinator.registerCall(correlationId, commandName, timeout)
│ ├─→ Oczekiwanie na gotowość shared subscriber (5s timeout)
│ ├─→ Utworzenie Promise<T> dla odpowiedzi
│ ├─→ Zapisanie pending call w Map
│ └─→ Zwrócenie responsePromise (bez blokowania)
│
├─→ 3. PayloadCompressionService.compressCommand(command)
│ └─→ Jeśli payload >1KB → gzip → __compressed: true
│
├─→ 4. RpcCoordinator.prepareRpcCommand(compressedCommand)
│ └─→ Dodaje __rpcMetadata: { correlationId, responseChannel, timestamp }
│
├─→ 5. QueueManager.getOrCreateQueue(commandName)
│ └─→ Cache hit/miss → zwraca Queue
│
├─→ 6. queue.add(commandName, commandWithMetadata, options)
│ └─→ BullMQ dodaje job do Redis
│
└─→ 7. await responsePromise (czeka na odpowiedź z Worker)
Worker Side
│
├─→ 8. Worker pobiera job z kolejki
│
├─→ 9. JobProcessor.process(job)
│ ├─→ Dekompresja payloadu (jeśli __compressed)
│ ├─→ Rekonstrukcja Date
│ ├─→ Wykonanie handlera → result
│ │
│ └─→ 10. JobProcessor.sendRpcResponse(rpcMetadata, result, null)
│ ├─→ PayloadCompressionService.compress({ correlationId, result, error })
│ ├─→ Wrapper: { data, compressed }
│ └─→ redis.publish(responseChannel, JSON.stringify(wrapper))
│
Shared Subscriber (RpcCoordinator)
│
├─→ 11. pmessage event → handleRpcMessage(channel, message)
│ ├─→ Ekstraktuj correlationId z channel
│ ├─→ Weryfikuj processInstanceId (process isolation)
│ ├─→ Znajdź pending call w Map
│ ├─→ PayloadCompressionService.decompress(wrapper.data, wrapper.compressed)
│ ├─→ resolve(result) lub reject(error)
│ └─→ Cleanup: clearTimeout, delete z Map
│
└─→ 12. User Code otrzymuje wynik → Promise<T> resolvedInstalacja
npm install pp-command-busWymagania
- Node.js >= 14
- Redis >= 5 lub DragonflyDB
- TypeScript >= 4.5 (opcjonalnie)
Szybki start
1. Konfiguracja
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';
// Konfiguracja CommandBus
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console, // ILogger interface
logLevel: 'log', // debug | log | warn | error
concurrency: 5,
maxAttempts: 1,
});
// Utwórz instancję CommandBus
const commandBus = new CommandBus(config);2. Definiowanie komend
class CreateUserCommand extends Command {
constructor(
public readonly email: string,
public readonly name: string,
) {
super();
}
}3. Rejestracja handlerów
commandBus.handle(CreateUserCommand, async (command) => {
console.log(`Creating user: ${command.name} (${command.email})`);
// Twoja logika biznesowa
const user = await createUser(command.email, command.name);
// Zwróć wynik (opcjonalnie)
return { userId: user.id };
});4. Wysyłanie komend (Fire-and-forget)
const command = new CreateUserCommand('[email protected]', 'Jan Kowalski');
// Wyślij komendę bez oczekiwania na wynik
await commandBus.dispatch(command);5. RPC - wywołania synchroniczne
// Wywołaj komendę i poczekaj na wynik
const result = await commandBus.call<{ userId: string }>(command, 5000); // timeout 5s
console.log(`User created with ID: ${result.userId}`);Zmienne Środowiskowe
Biblioteka wspiera konfigurację poprzez zmienne środowiskowe z prefiksem COMMAND_BUS_ (z fallbackiem do starszych nazw EVENT_BUS_*):
Kompletna Lista Zmiennych
| Zmienna | Typ | Wartość Domyślna | Opis |
|---------|-----|------------------|------|
| REDIS_URL | string | redis://localhost:6379 | URL połączenia Redis/DragonflyDB (wspiera username, password, db) |
| REDIS_RETRY_DELAY | number | 5000 | Opóźnienie między próbami reconnect do Redis w milisekundach |
| REDIS_MAX_RETRIES | number | 0 | Maksymalna liczba prób reconnect (0 = nieskończoność) |
| LOG_LEVEL | enum | log | Poziom logowania: debug, log, warn, error |
| COMMAND_BUS_CONCURRENCY | number | 1 (lub auto) | Liczba równoległych workerów do przetwarzania komend |
| COMMAND_BUS_MAX_ATTEMPTS | number | 1 | Maksymalna liczba prób przetworzenia zadania |
| COMMAND_BUS_BACKOFF_DELAY | number | 2000 | Opóźnienie między próbami w milisekundach |
| COMMAND_BUS_QUEUE_MODE | enum | fifo | Tryb przetwarzania kolejki: fifo (First In First Out) lub lifo (Last In First Out) |
| COMMAND_BUS_LOG | string | (puste) | Ścieżka do katalogu logów komend (JSONL format, rotacja co godzinę) |
| COMMAND_BUS_AUTO_OPTIMIZE | boolean | true | Włącz auto-optymalizację concurrency na podstawie CPU/RAM |
| COMMAND_BUS_COMPRESSION_THRESHOLD | number | 1024 | Próg kompresji gzip dla payloadów RPC w bajtach (1KB domyślnie) |
Fallback do Starszych Nazw
Dla kompatybilności wstecznej obsługiwane są również prefiksy EVENT_BUS_*:
EVENT_BUS_CONCURRENCY→COMMAND_BUS_CONCURRENCYEVENT_BUS_MAX_ATTEMPTS→COMMAND_BUS_MAX_ATTEMPTSEVENT_BUS_BACKOFF_DELAY→COMMAND_BUS_BACKOFF_DELAYEVENT_BUS_QUEUE_MODE→COMMAND_BUS_QUEUE_MODEEVENT_BUS_LOG→COMMAND_BUS_LOG
Przykład Konfiguracji .env
# Połączenie Redis
REDIS_URL=redis://username:password@localhost:6379/0
# Strategia reconnect Redis (stałe opóźnienie)
REDIS_RETRY_DELAY=5000 # 5 sekund między próbami (domyślnie)
REDIS_MAX_RETRIES=0 # 0 = nieskończoność (domyślnie)
# Poziom logowania
LOG_LEVEL=log # debug | log | warn | error
# Auto-optymalizacja (domyślnie włączona)
COMMAND_BUS_AUTO_OPTIMIZE=true
# Concurrency (opcjonalnie - auto-optymalizacja ustawi optymalną wartość)
# COMMAND_BUS_CONCURRENCY=10
# Retry i backoff
COMMAND_BUS_MAX_ATTEMPTS=1 # Maksymalna liczba prób
COMMAND_BUS_BACKOFF_DELAY=3000 # Opóźnienie między próbami (3s)
# Tryb kolejki
COMMAND_BUS_QUEUE_MODE=fifo # fifo lub lifo
# Kompresja payloadów (próg w bajtach)
COMMAND_BUS_COMPRESSION_THRESHOLD=2048 # 2KB (domyślnie 1KB)
# Logowanie komend do plików
COMMAND_BUS_LOG=./command-logs # Ścieżka do katalogu logówPriorytety Konfiguracji
- Parametry konstruktora - najwyższy priorytet
- Zmienne środowiskowe - średni priorytet
- Wartości domyślne - najniższy priorytet
// Przykład: parametry konstruktora nadpisują ENV
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
concurrency: 20, // Nadpisuje COMMAND_BUS_CONCURRENCY
autoOptimize: false, // Wyłącza auto-optymalizację
});Konfiguracja zaawansowana
Auto-optymalizacja Concurrency
Auto-optymalizacja automatycznie oblicza optymalną wartość concurrency na podstawie zasobów systemowych:
// Auto-optymalizacja włączona (domyślnie)
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
autoOptimize: true, // Domyślnie true
// concurrency zostanie obliczone jako: CPU cores * 2 + (availableMemory / 512MB)
});
// Wyłączenie auto-optymalizacji
const config2 = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
autoOptimize: false,
concurrency: 10, // Ręczna wartość
});Algorytm auto-optymalizacji:
concurrency = (CPU cores * 2) + Math.floor(availableMemory / 512MB)
Przykład:
- 8 CPU cores
- 16GB RAM dostępne
- concurrency = (8 * 2) + Math.floor(16384 / 512) = 16 + 32 = 48Kompresja Payloadów
Automatyczna kompresja gzip dla payloadów RPC większych niż threshold:
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
compressionThreshold: 2048, // 2KB (domyślnie 1KB)
});
// Przykład: payload 3KB zostanie automatycznie skompresowany
const largeCommand = new ProcessReportCommand(largeData); // 3KB
const result = await commandBus.call(largeCommand); // Automatyczna kompresja/dekompresjaKorzyści kompresji:
- Redukcja transferu danych przez Redis
- Szybsze przesyłanie dużych payloadów
- Niższe zużycie pamięci Redis
- Transparent dla użytkownika (automatyczna dekompresja)
Command Logging
Logowanie komend do plików JSONL (rotacja co godzinę):
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
commandLog: './command-logs', // Ścieżka do katalogu logów
});
// Struktura plików:
// ./command-logs/2025-01-27T10.jsonl
// ./command-logs/2025-01-27T11.jsonlFormat JSONL (JSON Lines):
{"__name":"CreateUserCommand","__id":"uuid","__time":1706347200000,"email":"[email protected]","name":"Jan"}
{"__name":"ProcessOrderCommand","__id":"uuid","__time":1706347201000,"orderId":"12345"}Opcje Redis
const config = new CommandBusConfig({
redisUrl: 'redis://username:password@localhost:6379/0',
// Parsuje się do:
// - host: localhost
// - port: 6379
// - username: username (opcjonalnie)
// - password: password (opcjonalnie)
// - db: 0 (opcjonalnie)
});Concurrency i Retry
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
concurrency: 10, // Liczba równoległych workerów (lub auto)
maxAttempts: 5, // Maksymalna liczba prób przetworzenia zadania
backoffDelay: 3000, // Opóźnienie między próbami (3s)
});Tryb kolejki
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
queueMode: 'fifo', // 'fifo' (First In First Out) lub 'lifo' (Last In First Out)
});Custom Logger
Logger jest automatycznie opakowywany przez wewnętrzny wrapper, który filtruje logi według logLevel:
// Prosty logger - używa console
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
logLevel: 'log', // debug | log | warn | error
});
// Własny logger - musi implementować metody: log, error, warn, debug
class MyLogger {
log(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
error(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
warn(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
debug(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
}
const config2 = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: new MyLogger(),
logLevel: 'debug', // Wszystkie poziomy
});API Reference
CommandBus
dispatch(command: Command): Promise<void>
Wysyła komendę do kolejki bez oczekiwania na wynik (fire-and-forget).
Flow:
- Kompresja payloadu (jeśli >threshold)
- Pobranie/utworzenie kolejki z cache
- Dodanie job do BullMQ
- Natychmiastowy return
await commandBus.dispatch(new CreateUserCommand('[email protected]', 'Jan Kowalski'));call<T>(command: Command, timeout?: number): Promise<T>
Wywołuje komendę synchronicznie i czeka na odpowiedź przez Redis Pub/Sub (RPC). Domyślny timeout: 30000ms (30s).
Flow:
- Rejestracja pending call z timeoutem
- Kompresja payloadu (jeśli >threshold)
- Przygotowanie metadanych RPC (correlationId, responseChannel)
- Dodanie job do BullMQ
- Oczekiwanie na odpowiedź przez shared subscriber
- Dekompresja odpowiedzi
- Zwrócenie wyniku lub błędu
const result = await commandBus.call<{ userId: string }>(command, 5000);handle<T>(commandClass, handler): void
Rejestruje handler dla określonej klasy komendy. Tylko jeden handler per typ komendy.
Automatyczne akcje:
- Rejestracja handlera w Map
- Utworzenie workera BullMQ
- Uruchomienie benchmarku dla optymalnego concurrency
- Utworzenie metrics collector
- Setup event handlers
commandBus.handle(CreateUserCommand, async (command) => {
const user = await createUser(command.email, command.name);
return { userId: user.id };
});close(): Promise<void>
Zamyka wszystkie połączenia i workery z graceful shutdown.
Cleanup:
- Zamknięcie wszystkich workerów BullMQ
- Zamknięcie wszystkich kolejek z cache
- Zamknięcie RpcCoordinator (reject pending calls)
- Zamknięcie 3 połączeń Redis (Queue, Worker, RPC)
await commandBus.close();Command
Klasa bazowa dla wszystkich komend. Każda komenda dziedziczy po Command:
class MyCommand extends Command {
constructor(public readonly data: string) {
super();
}
}Właściwości automatyczne:
__id- unikalny UUID (randomUUID)__name- nazwa klasy komendy (constructor.name)__time- timestamp utworzenia (Date.now())
Metody statyczne:
reconstructDates(obj)- rekonstrukcja obiektów Date z serializowanych danych
CommandBusConfig
Konfiguracja CommandBus z opcjami:
interface CommandBusConfigOptions {
redisUrl?: string; // URL Redis (domyślnie: 'redis://localhost:6379' lub REDIS_URL)
logger?: ILogger; // Logger (domyślnie console)
logLevel?: 'debug' | 'log' | 'warn' | 'error'; // Poziom logowania (domyślnie 'log')
concurrency?: number; // Liczba workerów (domyślnie 1 lub auto)
maxAttempts?: number; // Maksymalna liczba prób (domyślnie 1)
backoffDelay?: number; // Opóźnienie między próbami w ms (domyślnie 2000)
queueMode?: 'fifo' | 'lifo'; // Tryb kolejki (domyślnie 'fifo')
commandLog?: string; // Ścieżka do katalogu logów komend (opcjonalnie)
autoOptimize?: boolean; // Auto-optymalizacja concurrency (domyślnie true)
compressionThreshold?: number; // Próg kompresji w bajtach (domyślnie 1024)
}Przykłady Użycia
Podstawowy przykład z RPC
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';
// Definicja komendy
class CalculateCommand extends Command {
constructor(
public readonly a: number,
public readonly b: number,
public readonly operation: 'add' | 'multiply',
) {
super();
}
}
// Konfiguracja
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
});
const commandBus = new CommandBus(config);
// Rejestracja handlera
commandBus.handle(CalculateCommand, async (command) => {
console.log(`Calculating: ${command.a} ${command.operation} ${command.b}`);
switch (command.operation) {
case 'add':
return command.a + command.b;
case 'multiply':
return command.a * command.b;
}
});
// Fire-and-forget
await commandBus.dispatch(new CalculateCommand(5, 3, 'add'));
// RPC - czekamy na wynik
const result = await commandBus.call<number>(
new CalculateCommand(5, 3, 'multiply'),
5000 // timeout 5s
);
console.log(`Result: ${result}`); // Result: 15Równoległe wywołania RPC
// Wiele równoległych RPC calls
const [result1, result2, result3] = await Promise.all([
commandBus.call<number>(new CalculateCommand(10, 5, 'add')),
commandBus.call<UserInfo>(new GetUserInfoCommand('user-1')),
commandBus.call<ValidationResult>(new ValidateUserCommand('[email protected]', 30)),
]);
console.log('All results:', { result1, result2, result3 });Obsługa błędów
commandBus.handle(CreateUserCommand, async (command) => {
try {
// Walidacja
if (!command.email.includes('@')) {
throw new Error('Invalid email format');
}
const user = await createUser(command.email, command.name);
return { userId: user.id };
} catch (error) {
// Loguj błąd
console.error('Failed to create user:', error);
// Rzuć błąd - BullMQ spróbuje ponownie (maxAttempts)
throw error;
}
});
// Obsługa błędów w RPC
try {
const result = await commandBus.call(new CreateUserCommand('invalid-email', 'Jan'));
} catch (error) {
console.error('RPC failed:', error.message);
}Graceful Shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down CommandBus...');
await commandBus.close();
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('Shutting down CommandBus...');
await commandBus.close();
process.exit(0);
});Zaawansowane Funkcje
1. Dynamiczne Concurrency
WorkerOrchestrator automatycznie dostosowuje concurrency na podstawie metryk:
- Benchmark przy starcie - optymalny concurrency dla każdego workera
- Event-driven metrics - zbieranie metryk z workerów
- Dynamiczne dostosowanie - +/-20% co 30s (cooldown)
- Limity - min 10, max 2000
// Automatyczne - benchmark ustali optymalną wartość
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
// concurrency zostanie ustalone przez benchmark (np. 15-20)
});2. Process Isolation w RPC
Każdy proces Node.js ma unikalny UUID - odpowiedzi RPC są izolowane między procesami:
Process A (UUID: abc-123):
- Kanał: rpc:response:abc-123:*
- Otrzymuje tylko swoje odpowiedzi
Process B (UUID: def-456):
- Kanał: rpc:response:def-456:*
- Otrzymuje tylko swoje odpowiedzi3. Shared Subscriber Pattern
Jeden shared subscriber dla wszystkich RPC calls zamiast N subskrybentów:
Tradycyjne podejście (N subskrybentów):
1000 RPC calls → 1000 Redis subscriptions → duże obciążenieShared Subscriber (1 subskrybent):
1000 RPC calls → 1 Redis pattern subscription → multiplexing w pamięci4. Automatyczna Kompresja
PayloadCompressionService automatycznie kompresuje duże payloady:
// Mała komenda (<1KB) - brak kompresji
const smallCommand = new CreateUserCommand('[email protected]', 'Jan');
await commandBus.call(smallCommand); // Bez kompresji
// Duża komenda (>1KB) - automatyczna kompresja
const largeCommand = new ProcessReportCommand(largeData); // 5KB
await commandBus.call(largeCommand); // Automatyczna kompresja gzip → base64Flagi kompresji:
__compressed: true- payload został skompresowany- Automatyczna dekompresja w JobProcessor
- Transparent dla użytkownika
5. Command Logging
Persystencja wszystkich komend do plików JSONL:
const config = new CommandBusConfig({
commandLog: './command-logs',
});
// Każda komenda jest logowana do pliku:
// ./command-logs/2025-01-27T10.jsonlUse cases:
- Auditing i compliance
- Replay komend
- Debugging produkcyjnych problemów
- Analiza przepływu komend
6. RPC Job Cancellation
Automatyczne usuwanie niepodjętych jobów RPC przy timeout:
RPC Timeout Flow:
User Code RpcCoordinator Redis JobProcessor
│ │ │ │
│── call(cmd) ─────▶│ │ │
│ │── registerCall ──▶│ │
│ │── queue.add ─────▶│ │
│ │ │ │
│ [timeout] │ │ │
│ │ │ │
│ │── markCancelled ─▶│ SET rpc:cancelled:xxx
│ │── tryRemoveJob ──▶│ (próba usunięcia)
│◀── Error ─────────│ │ │
│ │ │ │
│ │ │ [job picked up] │
│ │ │──────────────────▶│
│ │ │ │── isCancelled?
│ │ │◀──────────────────│ → true
│ │ │ │── SKIP handler
│ │ │ │── clearCancellationKluczowe cechy:
- Dwufazowe anulowanie: Flaga Redis + próba usunięcia joba z kolejki
- Graceful degradation: Błędy Redis nie blokują przetwarzania
- TTL 24h: Automatyczne wygaśnięcie kluczy cancellation
- Aktywne czyszczenie: Klucze usuwane po przetworzeniu lub usunięciu joba
- Kompatybilność wsteczna: Funkcjonalność jest opcjonalna
Korzyści:
- Oszczędność zasobów - nieprzetworzone joby nie obciążają workerów
- Brak efektów ubocznych - handler nie wykonuje się dla timeout'owanych RPC
- Lepsza diagnostyka - logi pokazują pominięte joby
Best Practices
1. Używaj TypeScript
Biblioteka jest napisana w TypeScript z strict mode. Wykorzystaj typy dla lepszego DX:
interface UserCreatedResult {
userId: string;
createdAt: Date;
}
const result = await commandBus.call<UserCreatedResult>(command);2. Idempotentne handlery
Handlery powinny być idempotentne (wielokrotne wykonanie = ten sam rezultat):
commandBus.handle(CreateUserCommand, async (command) => {
// Sprawdź czy użytkownik już istnieje
const existing = await findUserByEmail(command.email);
if (existing) {
return { userId: existing.id }; // Zwróć istniejącego
}
// Utwórz nowego
const user = await createUser(command.email, command.name);
return { userId: user.id };
});3. Monitorowanie RPC timeoutów
// Ustaw timeout dostosowany do czasu przetwarzania komendy
const shortCommand = new QuickCommand();
const result1 = await commandBus.call(shortCommand, 1000); // 1s dla szybkich operacji
const longRunningCommand = new ProcessReportCommand();
const result2 = await commandBus.call(longRunningCommand, 60000); // 60s dla długich operacji4. Walidacja w handlerach
commandBus.handle(CreateUserCommand, async (command) => {
// Walidacja na początku
if (!command.email || !command.email.includes('@')) {
throw new Error('Invalid email');
}
if (!command.name || command.name.length < 2) {
throw new Error('Invalid name');
}
// Logika biznesowa
return await createUser(command.email, command.name);
});5. Logowanie kontekstu
commandBus.handle(CreateUserCommand, async (command) => {
console.log('Processing CreateUserCommand', {
commandId: command.__id,
timestamp: command.__time,
email: command.email,
});
// Logika biznesowa
const user = await createUser(command.email, command.name);
console.log('User created successfully', {
commandId: command.__id,
userId: user.id,
});
return { userId: user.id };
});Testing
Unit testy
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';
describe('User Commands', () => {
let commandBus: CommandBus;
beforeAll(async () => {
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
logLevel: 'error', // Tylko błędy w testach
});
commandBus = new CommandBus(config);
// Zarejestruj handler
commandBus.handle(CreateUserCommand, async (command) => {
return { userId: 'test-user-id' };
});
});
afterAll(async () => {
await commandBus.close();
});
it('should create user', async () => {
const command = new CreateUserCommand('[email protected]', 'Test User');
const result = await commandBus.call<{ userId: string }>(command, 5000);
expect(result.userId).toBeDefined();
expect(result.userId).toBe('test-user-id');
});
it('should handle multiple commands in parallel', async () => {
const commands = [
new CreateUserCommand('[email protected]', 'User 1'),
new CreateUserCommand('[email protected]', 'User 2'),
new CreateUserCommand('[email protected]', 'User 3'),
];
const results = await Promise.all(
commands.map((cmd) => commandBus.call<{ userId: string }>(cmd, 5000))
);
expect(results).toHaveLength(3);
results.forEach((result) => {
expect(result.userId).toBeDefined();
});
});
});Troubleshooting
Connection refused (Redis)
Upewnij się że Redis działa:
redis-cli ping
# Powinno zwrócić: PONGTimeout na RPC call
Zwiększ timeout lub sprawdź czy handler został zarejestrowany:
// Zwiększ timeout
const result = await commandBus.call(command, 60000); // 60s
// Sprawdź czy handler został zarejestrowany PRZED wywołaniem
commandBus.handle(MyCommand, async (command) => {
// Handler implementation
return { result: 'success' };
});
// Teraz możesz wywołać komendę
const result = await commandBus.call(new MyCommand());Handler nie został wywołany
Upewnij się że handler został zarejestrowany przed wysłaniem komendy:
// ❌ ŹLE - handler po dispatch
await commandBus.dispatch(new MyCommand());
commandBus.handle(MyCommand, async (cmd) => { ... }); // Za późno!
// ✅ DOBRZE - handler przed dispatch
commandBus.handle(MyCommand, async (cmd) => { ... });
await commandBus.dispatch(new MyCommand()); // Teraz OKWysokie zużycie pamięci
- Wyłącz command logging jeśli nie jest potrzebne
- Zmniejsz concurrency jeśli workery używają dużo pamięci
- Zwiększ compressionThreshold jeśli duże payloady powodują problemy
const config = new CommandBusConfig({
commandLog: undefined, // Wyłącz logging
concurrency: 5, // Zmniejsz concurrency
compressionThreshold: 512, // Kompresuj już od 512B
});Worker stalled
Worker został zatrzymany (prawdopodobnie crashed). BullMQ automatycznie przeniesie job do innego workera.
Przyczyny:
- Out of memory
- Uncaught exception w handlerze
- Timeout w handlerze
Rozwiązanie:
- Dodaj try/catch w handlerze
- Zwiększ pamięć dla procesu
- Zmniejsz concurrency
Struktura Projektu
src/
├── command-bus/ # Główna logika CommandBus
│ ├── config/ # Konfiguracja CommandBus
│ │ ├── command-bus-config.ts
│ │ └── auto-config-optimizer.ts
│ ├── job/ # Przetwarzanie jobów i opcje
│ │ ├── job-processor.ts
│ │ └── job-options-builder.ts
│ ├── logging/ # Command logging do plików
│ │ └── command-logger.ts
│ ├── queue/ # Queue management i cache
│ │ └── queue-manager.ts
│ ├── rpc/ # RPC coordination
│ │ ├── rpc-coordinator.ts
│ │ ├── payload-compression.service.ts
│ │ └── rpc-job-cancellation.service.ts # Anulowanie jobów RPC przy timeout
│ ├── worker/ # Worker orchestration
│ │ ├── worker-orchestrator.ts
│ │ ├── worker-benchmark.ts
│ │ └── worker-metrics-collector.ts
│ ├── types/ # Typy TypeScript
│ │ └── index.ts
│ ├── command.ts # Klasa bazowa Command
│ └── index.ts # CommandBus główna klasa
├── shared/ # Wspólne komponenty
│ ├── config/ # Base config z Redis
│ │ └── base-config.ts
│ ├── logging/ # Logger wrapper z poziomami
│ │ ├── logger.ts
│ │ └── log-level.ts
│ ├── redis/ # Fabryka połączeń Redis (SRP)
│ │ ├── redis-connection-factory.ts # Tworzenie połączeń z event handlers
│ │ ├── redis-error-formatter.ts # Formatowanie błędów (AggregateError)
│ │ └── index.ts # Eksporty modułu
│ └── types.ts # Współdzielone typy
├── examples/ # Przykłady użycia
│ ├── rpc.demo.ts # Demo RPC calls
│ ├── rpc-throughput.demo.ts # Demo wydajności RPC
│ ├── rpc-compression.demo.ts # Demo kompresji
│ └── rpc-resilience.demo.ts # Demo reconnect/failover (5 min test)
└── index.ts # Główny export pakietuMigracja z pp-event-bus 1.x
Jeśli używałeś Command Bus z pakietu pp-event-bus w wersji 1.x:
Przed:
import { CommandBus, Command, CommandBusConfig } from 'pp-event-bus';Po:
npm install pp-command-busimport { CommandBus, Command, CommandBusConfig } from 'pp-command-bus';Pełna zgodność API - jedyna zmiana to źródło importu.
Wersjonowanie i Releases
Projekt używa Semantic Versioning oraz automatycznych release'ów dzięki semantic-release.
Konwencja commitów
Używamy Conventional Commits:
# Nowa funkcjonalność (minor release)
feat: dodano wsparcie dla delayed commands
# Poprawka błędu (patch release)
fix: naprawiono memory leak w RPC
# Breaking change (major release)
feat!: zmieniono API CommandBusConfig
# Inne typy (patch release)
docs: zaktualizowano dokumentację API
perf: zoptymalizowano przetwarzanie komend
refactor: uproszczono kod RPC handler
test: dodano testy dla command loggingTypy commitów
feat:- nowa funkcjonalność (→ minor release)fix:- poprawka błędu (→ patch release)perf:- optymalizacja wydajności (→ patch release)docs:- zmiany w dokumentacji (→ patch release)style:- formatowanie kodu (→ patch release)refactor:- refaktoryzacja (→ patch release)test:- dodanie/poprawka testów (→ patch release)build:- zmiany w buildzie/zależnościach (→ patch release)ci:- zmiany w CI/CD (→ patch release)chore:- inne zmiany (bez release)!lubBREAKING CHANGE:- breaking change (→ major release)
Dokumentacja
Architektura
Szczegółowa dokumentacja architektury systemu, wzorców projektowych i zasad SOLID:
- ARCHITECTURE.md - Kompletny opis architektury, diagramy komponentów, przepływy danych
Komponenty
- QueueManager - zarządzanie kolejkami BullMQ i cache kolejek
- WorkerOrchestrator - orkiestracja workerów, dynamiczne concurrency, benchmark
- JobProcessor - przetwarzanie jobów i wykonanie handlerów
- RpcCoordinator - zarządzanie wywołaniami RPC przez Redis Pub/Sub
- RpcJobCancellationService - anulowanie niepodjętych jobów RPC przy timeout
- JobOptionsBuilder - konfiguracja opcji dla jobów BullMQ
- CommandLogger - persystencja komend do plików JSONL
- PayloadCompressionService - automatyczna kompresja gzip
- AutoConfigOptimizer - optymalizacja concurrency na podstawie zasobów
Licencja
MIT
Autorzy
- Mariusz Lejkowski [email protected]
