npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

pp-command-bus

v1.4.0

Published

Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB

Readme

PP Command Bus

Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB.

Opis

pp-command-bus to zaawansowana biblioteka do obsługi rozproszonych komend zgodna ze wzorcem CQRS (Command Query Responsibility Segregation). Zapewnia wysoką wydajność, automatyczną optymalizację i zaawansowane funkcje produkcyjne.

Kluczowe Cechy

  • Fire-and-forget commands - wysyłanie komend bez oczekiwania na wynik
  • RPC (Remote Procedure Call) - synchroniczne wywołania przez Redis Pub/Sub z oczekiwaniem na odpowiedź
  • Automatyczna kompresja - gzip dla payloadów RPC >1KB (konfigurowalne)
  • Auto-optymalizacja - dynamiczne dostosowywanie concurrency na podstawie CPU/RAM
  • Process isolation - izolacja odpowiedzi RPC między procesami Node.js
  • Job queuing - kolejkowanie zadań z retry, backoff i delayed execution
  • BullMQ integration - wydajna kolejka zadań na Redis/DragonflyDB
  • TypeScript - pełne wsparcie typów i strict mode
  • Memory leak protection - zaawansowana diagnostyka i cleanup
  • Command logging - opcjonalne logowanie komend do plików JSONL
  • RPC Job Cancellation - automatyczne usuwanie niepodjętych jobów RPC przy timeout
  • Modularna architektura - komponenty zgodne z zasadami SOLID, DDD i SRP

Architektura Systemu

Diagram Komponentów

┌─────────────────────────────────────────────────────────────────────┐
│                           CommandBus                                 │
│  ┌────────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ QueueManager   │  │ RpcCoordinator│  │ WorkerOrchestrator      │ │
│  │ - Cache kolejek│  │ - Pub/Sub     │  │ - Dynamiczne concurrency│ │
│  │ - BullMQ Queue │  │ - Process ID  │  │ - Worker benchmark      │ │
│  └────────┬───────┘  └──────┬───────┘  └────────┬────────────────┘ │
│           │                  │                    │                  │
│           │                  │                    │                  │
│  ┌────────▼──────────────────▼────────────────────▼───────────────┐ │
│  │            JobProcessor (Command Handler Execution)             │ │
│  │  - Kompresja/dekompresja payloadów                             │ │
│  │  - Wykonywanie handlerów                                        │ │
│  │  - Wysyłanie odpowiedzi RPC przez Pub/Sub                      │ │
│  └──────────────────────────┬──────────────────────────────────────┘ │
└─────────────────────────────┼────────────────────────────────────────┘
                              │
                    ┌─────────▼──────────┐
                    │  PayloadCompression │
                    │  Service            │
                    │  - gzip compression │
                    │  - threshold: 1KB   │
                    └─────────────────────┘

Główne Serwisy

1. CommandBus (główna klasa orkiestrująca)

  • Odpowiedzialność: Główny punkt wejścia dla użytkownika, orkiestracja wszystkich komponentów
  • Metody publiczne:
    • dispatch(command) - wysyłanie fire-and-forget
    • call(command, timeout) - synchroniczne RPC
    • handle(commandClass, handler) - rejestracja handlerów
    • close() - graceful shutdown
  • Zarządzanie połączeniami: 3 dedykowane połączenia Redis (Queue, Worker, RPC)

2. RpcCoordinator (zarządzanie RPC)

  • Odpowiedzialność: Zarządzanie cyklem życia wywołań RPC przez Redis Pub/Sub
  • Kluczowe funkcje:
    • Shared Subscriber - jeden subscriber dla wszystkich RPC calls (pattern matching)
    • Process Isolation - UUID procesu Node.js dla izolacji odpowiedzi między procesami
    • Timeout Management - automatyczne cleanup po timeout
    • Job Cancellation - usuwanie niepodjętych jobów przy timeout (optymalizacja zasobów)
    • Multiplexing - routing odpowiedzi do odpowiednich promises
  • Kanały: rpc:response:{processId}:{correlationId}

3. WorkerOrchestrator (orkiestracja workerów)

  • Odpowiedzialność: Zarządzanie workerami BullMQ i dynamiczna optymalizacja
  • Kluczowe funkcje:
    • WorkerBenchmark - automatyczny benchmark przy rejestracji handlera
    • WorkerMetricsCollector - event-driven metrics collection
    • Dynamic Concurrency - dostosowywanie concurrency +/-20% co 30s
    • Event Handlers - obsługa zdarzeń: active, completed, failed, stalled
  • Limity concurrency: min 10, max 2000

4. JobProcessor (wykonywanie handlerów)

  • Odpowiedzialność: Wykonywanie handlerów komend i obsługa odpowiedzi RPC
  • Flow przetwarzania:
    1. Dekompresja payloadu (jeśli skompresowany)
    2. Sprawdzenie flagi cancellation (pomijanie anulowanych RPC)
    3. Rekonstrukcja obiektów Date
    4. Opcjonalne logowanie komendy
    5. Wykonanie handlera
    6. Wysłanie odpowiedzi RPC przez Pub/Sub (jeśli RPC)
  • Kompresja odpowiedzi: automatyczna kompresja przez PayloadCompressionService
  • RPC Cancellation: pomijanie jobów oznaczonych jako anulowane (timeout)

5. QueueManager (zarządzanie kolejkami)

  • Odpowiedzialność: Cache kolejek BullMQ dla optymalizacji pamięci
  • Funkcje:
    • getOrCreateQueue(commandName) - lazy loading kolejek
    • closeAllQueues() - graceful shutdown wszystkich kolejek
  • Naming: {CommandName} jako nazwa kolejki

6. PayloadCompressionService (kompresja)

  • Odpowiedzialność: Automatyczna kompresja/dekompresja payloadów gzip
  • Threshold: 1024 bajty (1KB) domyślnie, konfigurowalne przez ENV
  • Metody:
    • compressCommand(command) - dodaje flagę __compressed
    • decompressCommand(command) - dekompresja i usunięcie flagi
    • compress(data) - generyczna kompresja do base64
    • decompress(data, compressed) - generyczna dekompresja
  • Współdzielony serwis: jedna instancja dla całego CommandBus

7. RpcJobCancellationService (anulowanie jobów RPC)

  • Odpowiedzialność: Zarządzanie anulowaniem niepodjętych jobów RPC przy timeout
  • Kluczowe funkcje:
    • markAsCancelled(correlationId) - oznacza job jako anulowany w Redis
    • isCancelled(correlationId) - sprawdza flagę cancellation
    • clearCancellation(correlationId) - usuwa flagę po przetworzeniu/usunięciu
    • tryRemoveJob(jobId, queueName, callback) - próba usunięcia joba z kolejki
  • TTL kluczy Redis: 24 godziny (automatyczne wygaśnięcie)
  • Graceful degradation: błędy Redis nie blokują przetwarzania (zwraca false)
  • Klucze Redis: rpc:cancelled:{correlationId}

8. CommandLogger (opcjonalne logowanie)

  • Odpowiedzialność: Persystencja komend do plików JSONL
  • Format: {timestamp}.jsonl - rotacja co godzinę
  • Zawartość: pełny payload komendy z metadanymi
  • Aktywacja: przez ENV COMMAND_BUS_LOG=./command-logs

9. AutoConfigOptimizer (auto-optymalizacja)

  • Odpowiedzialność: Obliczanie optymalnego concurrency na podstawie zasobów systemowych
  • Heurystyka:
    • I/O-heavy workload (Redis/BullMQ)
    • concurrency = CPU cores * 2 + (availableMemory / 512MB)
    • Zakładane 512MB RAM per worker
  • Aktywacja: domyślnie włączone (ENV COMMAND_BUS_AUTO_OPTIMIZE=false wyłącza)

10. RedisConnectionFactory (fabryka połączeń Redis)

  • Odpowiedzialność: Tworzenie połączeń Redis z wbudowaną obsługą błędów i eventów
  • Zgodność z SRP: CommandBus nie zarządza bezpośrednio połączeniami
  • Kluczowe funkcje:
    • create(options, name) - tworzy połączenie z pełną obsługą eventów
    • createForWorker(options, name) - dodaje maxRetriesPerRequest: null dla BullMQ
    • Obsługa eventów: error, close, reconnecting, connect, ready
    • Formatowanie błędów AggregateError (Node.js dual-stack IPv4/IPv6)
  • Event Handlers:
    • on('error') - logowanie błędów połączenia (zapobiega Unhandled error event)
    • on('close') - informacja o zamknięciu połączenia
    • on('reconnecting') - informacja o ponownym łączeniu
    • on('connect') / on('ready') - potwierdzenie nawiązania połączenia

Flow Przepływu Danych

Flow 1: dispatch() - Fire-and-forget

User Code
   │
   ├─→ 1. commandBus.dispatch(command)
   │
   ├─→ 2. PayloadCompressionService.compressCommand(command)
   │      └─→ Jeśli payload >1KB → gzip → base64 → __compressed: true
   │
   ├─→ 3. QueueManager.getOrCreateQueue(commandName)
   │      └─→ Cache hit/miss → zwraca Queue
   │
   ├─→ 4. queue.add(commandName, compressedCommand, options)
   │      └─→ BullMQ dodaje job do Redis
   │
   └─→ 5. Promise<void> resolved (nie czekamy na wynik)

Worker Side (asynchronicznie)
   │
   ├─→ 6. Worker pobiera job z kolejki
   │
   ├─→ 7. JobProcessor.process(job)
   │      ├─→ Dekompresja (jeśli __compressed)
   │      ├─→ Rekonstrukcja Date
   │      ├─→ Opcjonalne logowanie (CommandLogger)
   │      └─→ Wykonanie handlera
   │
   └─→ 8. Worker kończy job (success/fail)

Flow 2: call() - RPC przez Redis Pub/Sub

User Code
   │
   ├─→ 1. commandBus.call(command, timeout)
   │
   ├─→ 2. RpcCoordinator.registerCall(correlationId, commandName, timeout)
   │      ├─→ Oczekiwanie na gotowość shared subscriber (5s timeout)
   │      ├─→ Utworzenie Promise<T> dla odpowiedzi
   │      ├─→ Zapisanie pending call w Map
   │      └─→ Zwrócenie responsePromise (bez blokowania)
   │
   ├─→ 3. PayloadCompressionService.compressCommand(command)
   │      └─→ Jeśli payload >1KB → gzip → __compressed: true
   │
   ├─→ 4. RpcCoordinator.prepareRpcCommand(compressedCommand)
   │      └─→ Dodaje __rpcMetadata: { correlationId, responseChannel, timestamp }
   │
   ├─→ 5. QueueManager.getOrCreateQueue(commandName)
   │      └─→ Cache hit/miss → zwraca Queue
   │
   ├─→ 6. queue.add(commandName, commandWithMetadata, options)
   │      └─→ BullMQ dodaje job do Redis
   │
   └─→ 7. await responsePromise (czeka na odpowiedź z Worker)

Worker Side
   │
   ├─→ 8. Worker pobiera job z kolejki
   │
   ├─→ 9. JobProcessor.process(job)
   │      ├─→ Dekompresja payloadu (jeśli __compressed)
   │      ├─→ Rekonstrukcja Date
   │      ├─→ Wykonanie handlera → result
   │      │
   │      └─→ 10. JobProcessor.sendRpcResponse(rpcMetadata, result, null)
   │             ├─→ PayloadCompressionService.compress({ correlationId, result, error })
   │             ├─→ Wrapper: { data, compressed }
   │             └─→ redis.publish(responseChannel, JSON.stringify(wrapper))
   │
Shared Subscriber (RpcCoordinator)
   │
   ├─→ 11. pmessage event → handleRpcMessage(channel, message)
   │      ├─→ Ekstraktuj correlationId z channel
   │      ├─→ Weryfikuj processInstanceId (process isolation)
   │      ├─→ Znajdź pending call w Map
   │      ├─→ PayloadCompressionService.decompress(wrapper.data, wrapper.compressed)
   │      ├─→ resolve(result) lub reject(error)
   │      └─→ Cleanup: clearTimeout, delete z Map
   │
   └─→ 12. User Code otrzymuje wynik → Promise<T> resolved

Instalacja

npm install pp-command-bus

Wymagania

  • Node.js >= 14
  • Redis >= 5 lub DragonflyDB
  • TypeScript >= 4.5 (opcjonalnie)

Szybki start

1. Konfiguracja

import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';

// Konfiguracja CommandBus
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  logger: console, // ILogger interface
  logLevel: 'log', // debug | log | warn | error
  concurrency: 5,
  maxAttempts: 1,
});

// Utwórz instancję CommandBus
const commandBus = new CommandBus(config);

2. Definiowanie komend

class CreateUserCommand extends Command {
  constructor(
    public readonly email: string,
    public readonly name: string,
  ) {
    super();
  }
}

3. Rejestracja handlerów

commandBus.handle(CreateUserCommand, async (command) => {
  console.log(`Creating user: ${command.name} (${command.email})`);

  // Twoja logika biznesowa
  const user = await createUser(command.email, command.name);

  // Zwróć wynik (opcjonalnie)
  return { userId: user.id };
});

4. Wysyłanie komend (Fire-and-forget)

const command = new CreateUserCommand('[email protected]', 'Jan Kowalski');

// Wyślij komendę bez oczekiwania na wynik
await commandBus.dispatch(command);

5. RPC - wywołania synchroniczne

// Wywołaj komendę i poczekaj na wynik
const result = await commandBus.call<{ userId: string }>(command, 5000); // timeout 5s

console.log(`User created with ID: ${result.userId}`);

Zmienne Środowiskowe

Biblioteka wspiera konfigurację poprzez zmienne środowiskowe z prefiksem COMMAND_BUS_ (z fallbackiem do starszych nazw EVENT_BUS_*):

Kompletna Lista Zmiennych

| Zmienna | Typ | Wartość Domyślna | Opis | |---------|-----|------------------|------| | REDIS_URL | string | redis://localhost:6379 | URL połączenia Redis/DragonflyDB (wspiera username, password, db) | | REDIS_RETRY_DELAY | number | 5000 | Opóźnienie między próbami reconnect do Redis w milisekundach | | REDIS_MAX_RETRIES | number | 0 | Maksymalna liczba prób reconnect (0 = nieskończoność) | | LOG_LEVEL | enum | log | Poziom logowania: debug, log, warn, error | | COMMAND_BUS_CONCURRENCY | number | 1 (lub auto) | Liczba równoległych workerów do przetwarzania komend | | COMMAND_BUS_MAX_ATTEMPTS | number | 1 | Maksymalna liczba prób przetworzenia zadania | | COMMAND_BUS_BACKOFF_DELAY | number | 2000 | Opóźnienie między próbami w milisekundach | | COMMAND_BUS_QUEUE_MODE | enum | fifo | Tryb przetwarzania kolejki: fifo (First In First Out) lub lifo (Last In First Out) | | COMMAND_BUS_LOG | string | (puste) | Ścieżka do katalogu logów komend (JSONL format, rotacja co godzinę) | | COMMAND_BUS_AUTO_OPTIMIZE | boolean | true | Włącz auto-optymalizację concurrency na podstawie CPU/RAM | | COMMAND_BUS_COMPRESSION_THRESHOLD | number | 1024 | Próg kompresji gzip dla payloadów RPC w bajtach (1KB domyślnie) |

Fallback do Starszych Nazw

Dla kompatybilności wstecznej obsługiwane są również prefiksy EVENT_BUS_*:

  • EVENT_BUS_CONCURRENCYCOMMAND_BUS_CONCURRENCY
  • EVENT_BUS_MAX_ATTEMPTSCOMMAND_BUS_MAX_ATTEMPTS
  • EVENT_BUS_BACKOFF_DELAYCOMMAND_BUS_BACKOFF_DELAY
  • EVENT_BUS_QUEUE_MODECOMMAND_BUS_QUEUE_MODE
  • EVENT_BUS_LOGCOMMAND_BUS_LOG

Przykład Konfiguracji .env

# Połączenie Redis
REDIS_URL=redis://username:password@localhost:6379/0

# Strategia reconnect Redis (stałe opóźnienie)
REDIS_RETRY_DELAY=5000                     # 5 sekund między próbami (domyślnie)
REDIS_MAX_RETRIES=0                        # 0 = nieskończoność (domyślnie)

# Poziom logowania
LOG_LEVEL=log                              # debug | log | warn | error

# Auto-optymalizacja (domyślnie włączona)
COMMAND_BUS_AUTO_OPTIMIZE=true

# Concurrency (opcjonalnie - auto-optymalizacja ustawi optymalną wartość)
# COMMAND_BUS_CONCURRENCY=10

# Retry i backoff
COMMAND_BUS_MAX_ATTEMPTS=1                 # Maksymalna liczba prób
COMMAND_BUS_BACKOFF_DELAY=3000             # Opóźnienie między próbami (3s)

# Tryb kolejki
COMMAND_BUS_QUEUE_MODE=fifo                # fifo lub lifo

# Kompresja payloadów (próg w bajtach)
COMMAND_BUS_COMPRESSION_THRESHOLD=2048     # 2KB (domyślnie 1KB)

# Logowanie komend do plików
COMMAND_BUS_LOG=./command-logs             # Ścieżka do katalogu logów

Priorytety Konfiguracji

  1. Parametry konstruktora - najwyższy priorytet
  2. Zmienne środowiskowe - średni priorytet
  3. Wartości domyślne - najniższy priorytet
// Przykład: parametry konstruktora nadpisują ENV
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  concurrency: 20, // Nadpisuje COMMAND_BUS_CONCURRENCY
  autoOptimize: false, // Wyłącza auto-optymalizację
});

Konfiguracja zaawansowana

Auto-optymalizacja Concurrency

Auto-optymalizacja automatycznie oblicza optymalną wartość concurrency na podstawie zasobów systemowych:

// Auto-optymalizacja włączona (domyślnie)
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  autoOptimize: true, // Domyślnie true
  // concurrency zostanie obliczone jako: CPU cores * 2 + (availableMemory / 512MB)
});

// Wyłączenie auto-optymalizacji
const config2 = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  autoOptimize: false,
  concurrency: 10, // Ręczna wartość
});

Algorytm auto-optymalizacji:

concurrency = (CPU cores * 2) + Math.floor(availableMemory / 512MB)

Przykład:
- 8 CPU cores
- 16GB RAM dostępne
- concurrency = (8 * 2) + Math.floor(16384 / 512) = 16 + 32 = 48

Kompresja Payloadów

Automatyczna kompresja gzip dla payloadów RPC większych niż threshold:

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  compressionThreshold: 2048, // 2KB (domyślnie 1KB)
});

// Przykład: payload 3KB zostanie automatycznie skompresowany
const largeCommand = new ProcessReportCommand(largeData); // 3KB
const result = await commandBus.call(largeCommand); // Automatyczna kompresja/dekompresja

Korzyści kompresji:

  • Redukcja transferu danych przez Redis
  • Szybsze przesyłanie dużych payloadów
  • Niższe zużycie pamięci Redis
  • Transparent dla użytkownika (automatyczna dekompresja)

Command Logging

Logowanie komend do plików JSONL (rotacja co godzinę):

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  commandLog: './command-logs', // Ścieżka do katalogu logów
});

// Struktura plików:
// ./command-logs/2025-01-27T10.jsonl
// ./command-logs/2025-01-27T11.jsonl

Format JSONL (JSON Lines):

{"__name":"CreateUserCommand","__id":"uuid","__time":1706347200000,"email":"[email protected]","name":"Jan"}
{"__name":"ProcessOrderCommand","__id":"uuid","__time":1706347201000,"orderId":"12345"}

Opcje Redis

const config = new CommandBusConfig({
  redisUrl: 'redis://username:password@localhost:6379/0',
  // Parsuje się do:
  // - host: localhost
  // - port: 6379
  // - username: username (opcjonalnie)
  // - password: password (opcjonalnie)
  // - db: 0 (opcjonalnie)
});

Concurrency i Retry

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  concurrency: 10, // Liczba równoległych workerów (lub auto)
  maxAttempts: 5, // Maksymalna liczba prób przetworzenia zadania
  backoffDelay: 3000, // Opóźnienie między próbami (3s)
});

Tryb kolejki

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  queueMode: 'fifo', // 'fifo' (First In First Out) lub 'lifo' (Last In First Out)
});

Custom Logger

Logger jest automatycznie opakowywany przez wewnętrzny wrapper, który filtruje logi według logLevel:

// Prosty logger - używa console
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  logger: console,
  logLevel: 'log', // debug | log | warn | error
});

// Własny logger - musi implementować metody: log, error, warn, debug
class MyLogger {
  log(message: string, ...args: unknown[]): void {
    // Twoja implementacja
  }
  error(message: string, ...args: unknown[]): void {
    // Twoja implementacja
  }
  warn(message: string, ...args: unknown[]): void {
    // Twoja implementacja
  }
  debug(message: string, ...args: unknown[]): void {
    // Twoja implementacja
  }
}

const config2 = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  logger: new MyLogger(),
  logLevel: 'debug', // Wszystkie poziomy
});

API Reference

CommandBus

dispatch(command: Command): Promise<void>

Wysyła komendę do kolejki bez oczekiwania na wynik (fire-and-forget).

Flow:

  1. Kompresja payloadu (jeśli >threshold)
  2. Pobranie/utworzenie kolejki z cache
  3. Dodanie job do BullMQ
  4. Natychmiastowy return
await commandBus.dispatch(new CreateUserCommand('[email protected]', 'Jan Kowalski'));

call<T>(command: Command, timeout?: number): Promise<T>

Wywołuje komendę synchronicznie i czeka na odpowiedź przez Redis Pub/Sub (RPC). Domyślny timeout: 30000ms (30s).

Flow:

  1. Rejestracja pending call z timeoutem
  2. Kompresja payloadu (jeśli >threshold)
  3. Przygotowanie metadanych RPC (correlationId, responseChannel)
  4. Dodanie job do BullMQ
  5. Oczekiwanie na odpowiedź przez shared subscriber
  6. Dekompresja odpowiedzi
  7. Zwrócenie wyniku lub błędu
const result = await commandBus.call<{ userId: string }>(command, 5000);

handle<T>(commandClass, handler): void

Rejestruje handler dla określonej klasy komendy. Tylko jeden handler per typ komendy.

Automatyczne akcje:

  1. Rejestracja handlera w Map
  2. Utworzenie workera BullMQ
  3. Uruchomienie benchmarku dla optymalnego concurrency
  4. Utworzenie metrics collector
  5. Setup event handlers
commandBus.handle(CreateUserCommand, async (command) => {
  const user = await createUser(command.email, command.name);
  return { userId: user.id };
});

close(): Promise<void>

Zamyka wszystkie połączenia i workery z graceful shutdown.

Cleanup:

  1. Zamknięcie wszystkich workerów BullMQ
  2. Zamknięcie wszystkich kolejek z cache
  3. Zamknięcie RpcCoordinator (reject pending calls)
  4. Zamknięcie 3 połączeń Redis (Queue, Worker, RPC)
await commandBus.close();

Command

Klasa bazowa dla wszystkich komend. Każda komenda dziedziczy po Command:

class MyCommand extends Command {
  constructor(public readonly data: string) {
    super();
  }
}

Właściwości automatyczne:

  • __id - unikalny UUID (randomUUID)
  • __name - nazwa klasy komendy (constructor.name)
  • __time - timestamp utworzenia (Date.now())

Metody statyczne:

  • reconstructDates(obj) - rekonstrukcja obiektów Date z serializowanych danych

CommandBusConfig

Konfiguracja CommandBus z opcjami:

interface CommandBusConfigOptions {
  redisUrl?: string; // URL Redis (domyślnie: 'redis://localhost:6379' lub REDIS_URL)
  logger?: ILogger; // Logger (domyślnie console)
  logLevel?: 'debug' | 'log' | 'warn' | 'error'; // Poziom logowania (domyślnie 'log')
  concurrency?: number; // Liczba workerów (domyślnie 1 lub auto)
  maxAttempts?: number; // Maksymalna liczba prób (domyślnie 1)
  backoffDelay?: number; // Opóźnienie między próbami w ms (domyślnie 2000)
  queueMode?: 'fifo' | 'lifo'; // Tryb kolejki (domyślnie 'fifo')
  commandLog?: string; // Ścieżka do katalogu logów komend (opcjonalnie)
  autoOptimize?: boolean; // Auto-optymalizacja concurrency (domyślnie true)
  compressionThreshold?: number; // Próg kompresji w bajtach (domyślnie 1024)
}

Przykłady Użycia

Podstawowy przykład z RPC

import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';

// Definicja komendy
class CalculateCommand extends Command {
  constructor(
    public readonly a: number,
    public readonly b: number,
    public readonly operation: 'add' | 'multiply',
  ) {
    super();
  }
}

// Konfiguracja
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
});
const commandBus = new CommandBus(config);

// Rejestracja handlera
commandBus.handle(CalculateCommand, async (command) => {
  console.log(`Calculating: ${command.a} ${command.operation} ${command.b}`);

  switch (command.operation) {
    case 'add':
      return command.a + command.b;
    case 'multiply':
      return command.a * command.b;
  }
});

// Fire-and-forget
await commandBus.dispatch(new CalculateCommand(5, 3, 'add'));

// RPC - czekamy na wynik
const result = await commandBus.call<number>(
  new CalculateCommand(5, 3, 'multiply'),
  5000 // timeout 5s
);
console.log(`Result: ${result}`); // Result: 15

Równoległe wywołania RPC

// Wiele równoległych RPC calls
const [result1, result2, result3] = await Promise.all([
  commandBus.call<number>(new CalculateCommand(10, 5, 'add')),
  commandBus.call<UserInfo>(new GetUserInfoCommand('user-1')),
  commandBus.call<ValidationResult>(new ValidateUserCommand('[email protected]', 30)),
]);

console.log('All results:', { result1, result2, result3 });

Obsługa błędów

commandBus.handle(CreateUserCommand, async (command) => {
  try {
    // Walidacja
    if (!command.email.includes('@')) {
      throw new Error('Invalid email format');
    }

    const user = await createUser(command.email, command.name);
    return { userId: user.id };
  } catch (error) {
    // Loguj błąd
    console.error('Failed to create user:', error);

    // Rzuć błąd - BullMQ spróbuje ponownie (maxAttempts)
    throw error;
  }
});

// Obsługa błędów w RPC
try {
  const result = await commandBus.call(new CreateUserCommand('invalid-email', 'Jan'));
} catch (error) {
  console.error('RPC failed:', error.message);
}

Graceful Shutdown

process.on('SIGTERM', async () => {
  console.log('Shutting down CommandBus...');
  await commandBus.close();
  process.exit(0);
});

process.on('SIGINT', async () => {
  console.log('Shutting down CommandBus...');
  await commandBus.close();
  process.exit(0);
});

Zaawansowane Funkcje

1. Dynamiczne Concurrency

WorkerOrchestrator automatycznie dostosowuje concurrency na podstawie metryk:

  • Benchmark przy starcie - optymalny concurrency dla każdego workera
  • Event-driven metrics - zbieranie metryk z workerów
  • Dynamiczne dostosowanie - +/-20% co 30s (cooldown)
  • Limity - min 10, max 2000
// Automatyczne - benchmark ustali optymalną wartość
const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  // concurrency zostanie ustalone przez benchmark (np. 15-20)
});

2. Process Isolation w RPC

Każdy proces Node.js ma unikalny UUID - odpowiedzi RPC są izolowane między procesami:

Process A (UUID: abc-123):
- Kanał: rpc:response:abc-123:*
- Otrzymuje tylko swoje odpowiedzi

Process B (UUID: def-456):
- Kanał: rpc:response:def-456:*
- Otrzymuje tylko swoje odpowiedzi

3. Shared Subscriber Pattern

Jeden shared subscriber dla wszystkich RPC calls zamiast N subskrybentów:

Tradycyjne podejście (N subskrybentów):

1000 RPC calls → 1000 Redis subscriptions → duże obciążenie

Shared Subscriber (1 subskrybent):

1000 RPC calls → 1 Redis pattern subscription → multiplexing w pamięci

4. Automatyczna Kompresja

PayloadCompressionService automatycznie kompresuje duże payloady:

// Mała komenda (<1KB) - brak kompresji
const smallCommand = new CreateUserCommand('[email protected]', 'Jan');
await commandBus.call(smallCommand); // Bez kompresji

// Duża komenda (>1KB) - automatyczna kompresja
const largeCommand = new ProcessReportCommand(largeData); // 5KB
await commandBus.call(largeCommand); // Automatyczna kompresja gzip → base64

Flagi kompresji:

  • __compressed: true - payload został skompresowany
  • Automatyczna dekompresja w JobProcessor
  • Transparent dla użytkownika

5. Command Logging

Persystencja wszystkich komend do plików JSONL:

const config = new CommandBusConfig({
  commandLog: './command-logs',
});

// Każda komenda jest logowana do pliku:
// ./command-logs/2025-01-27T10.jsonl

Use cases:

  • Auditing i compliance
  • Replay komend
  • Debugging produkcyjnych problemów
  • Analiza przepływu komend

6. RPC Job Cancellation

Automatyczne usuwanie niepodjętych jobów RPC przy timeout:

RPC Timeout Flow:

User Code         RpcCoordinator          Redis           JobProcessor
    │                   │                   │                   │
    │── call(cmd) ─────▶│                   │                   │
    │                   │── registerCall ──▶│                   │
    │                   │── queue.add ─────▶│                   │
    │                   │                   │                   │
    │   [timeout]       │                   │                   │
    │                   │                   │                   │
    │                   │── markCancelled ─▶│ SET rpc:cancelled:xxx
    │                   │── tryRemoveJob ──▶│ (próba usunięcia)
    │◀── Error ─────────│                   │                   │
    │                   │                   │                   │
    │                   │                   │   [job picked up] │
    │                   │                   │──────────────────▶│
    │                   │                   │                   │── isCancelled?
    │                   │                   │◀──────────────────│   → true
    │                   │                   │                   │── SKIP handler
    │                   │                   │                   │── clearCancellation

Kluczowe cechy:

  • Dwufazowe anulowanie: Flaga Redis + próba usunięcia joba z kolejki
  • Graceful degradation: Błędy Redis nie blokują przetwarzania
  • TTL 24h: Automatyczne wygaśnięcie kluczy cancellation
  • Aktywne czyszczenie: Klucze usuwane po przetworzeniu lub usunięciu joba
  • Kompatybilność wsteczna: Funkcjonalność jest opcjonalna

Korzyści:

  • Oszczędność zasobów - nieprzetworzone joby nie obciążają workerów
  • Brak efektów ubocznych - handler nie wykonuje się dla timeout'owanych RPC
  • Lepsza diagnostyka - logi pokazują pominięte joby

Best Practices

1. Używaj TypeScript

Biblioteka jest napisana w TypeScript z strict mode. Wykorzystaj typy dla lepszego DX:

interface UserCreatedResult {
  userId: string;
  createdAt: Date;
}

const result = await commandBus.call<UserCreatedResult>(command);

2. Idempotentne handlery

Handlery powinny być idempotentne (wielokrotne wykonanie = ten sam rezultat):

commandBus.handle(CreateUserCommand, async (command) => {
  // Sprawdź czy użytkownik już istnieje
  const existing = await findUserByEmail(command.email);
  if (existing) {
    return { userId: existing.id }; // Zwróć istniejącego
  }

  // Utwórz nowego
  const user = await createUser(command.email, command.name);
  return { userId: user.id };
});

3. Monitorowanie RPC timeoutów

// Ustaw timeout dostosowany do czasu przetwarzania komendy
const shortCommand = new QuickCommand();
const result1 = await commandBus.call(shortCommand, 1000); // 1s dla szybkich operacji

const longRunningCommand = new ProcessReportCommand();
const result2 = await commandBus.call(longRunningCommand, 60000); // 60s dla długich operacji

4. Walidacja w handlerach

commandBus.handle(CreateUserCommand, async (command) => {
  // Walidacja na początku
  if (!command.email || !command.email.includes('@')) {
    throw new Error('Invalid email');
  }

  if (!command.name || command.name.length < 2) {
    throw new Error('Invalid name');
  }

  // Logika biznesowa
  return await createUser(command.email, command.name);
});

5. Logowanie kontekstu

commandBus.handle(CreateUserCommand, async (command) => {
  console.log('Processing CreateUserCommand', {
    commandId: command.__id,
    timestamp: command.__time,
    email: command.email,
  });

  // Logika biznesowa
  const user = await createUser(command.email, command.name);

  console.log('User created successfully', {
    commandId: command.__id,
    userId: user.id,
  });

  return { userId: user.id };
});

Testing

Unit testy

import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';

describe('User Commands', () => {
  let commandBus: CommandBus;

  beforeAll(async () => {
    const config = new CommandBusConfig({
      redisUrl: 'redis://localhost:6379',
      logger: console,
      logLevel: 'error', // Tylko błędy w testach
    });

    commandBus = new CommandBus(config);

    // Zarejestruj handler
    commandBus.handle(CreateUserCommand, async (command) => {
      return { userId: 'test-user-id' };
    });
  });

  afterAll(async () => {
    await commandBus.close();
  });

  it('should create user', async () => {
    const command = new CreateUserCommand('[email protected]', 'Test User');
    const result = await commandBus.call<{ userId: string }>(command, 5000);

    expect(result.userId).toBeDefined();
    expect(result.userId).toBe('test-user-id');
  });

  it('should handle multiple commands in parallel', async () => {
    const commands = [
      new CreateUserCommand('[email protected]', 'User 1'),
      new CreateUserCommand('[email protected]', 'User 2'),
      new CreateUserCommand('[email protected]', 'User 3'),
    ];

    const results = await Promise.all(
      commands.map((cmd) => commandBus.call<{ userId: string }>(cmd, 5000))
    );

    expect(results).toHaveLength(3);
    results.forEach((result) => {
      expect(result.userId).toBeDefined();
    });
  });
});

Troubleshooting

Connection refused (Redis)

Upewnij się że Redis działa:

redis-cli ping
# Powinno zwrócić: PONG

Timeout na RPC call

Zwiększ timeout lub sprawdź czy handler został zarejestrowany:

// Zwiększ timeout
const result = await commandBus.call(command, 60000); // 60s

// Sprawdź czy handler został zarejestrowany PRZED wywołaniem
commandBus.handle(MyCommand, async (command) => {
  // Handler implementation
  return { result: 'success' };
});

// Teraz możesz wywołać komendę
const result = await commandBus.call(new MyCommand());

Handler nie został wywołany

Upewnij się że handler został zarejestrowany przed wysłaniem komendy:

// ❌ ŹLE - handler po dispatch
await commandBus.dispatch(new MyCommand());
commandBus.handle(MyCommand, async (cmd) => { ... }); // Za późno!

// ✅ DOBRZE - handler przed dispatch
commandBus.handle(MyCommand, async (cmd) => { ... });
await commandBus.dispatch(new MyCommand()); // Teraz OK

Wysokie zużycie pamięci

  1. Wyłącz command logging jeśli nie jest potrzebne
  2. Zmniejsz concurrency jeśli workery używają dużo pamięci
  3. Zwiększ compressionThreshold jeśli duże payloady powodują problemy
const config = new CommandBusConfig({
  commandLog: undefined, // Wyłącz logging
  concurrency: 5, // Zmniejsz concurrency
  compressionThreshold: 512, // Kompresuj już od 512B
});

Worker stalled

Worker został zatrzymany (prawdopodobnie crashed). BullMQ automatycznie przeniesie job do innego workera.

Przyczyny:

  • Out of memory
  • Uncaught exception w handlerze
  • Timeout w handlerze

Rozwiązanie:

  • Dodaj try/catch w handlerze
  • Zwiększ pamięć dla procesu
  • Zmniejsz concurrency

Struktura Projektu

src/
├── command-bus/                # Główna logika CommandBus
│   ├── config/                 # Konfiguracja CommandBus
│   │   ├── command-bus-config.ts
│   │   └── auto-config-optimizer.ts
│   ├── job/                    # Przetwarzanie jobów i opcje
│   │   ├── job-processor.ts
│   │   └── job-options-builder.ts
│   ├── logging/                # Command logging do plików
│   │   └── command-logger.ts
│   ├── queue/                  # Queue management i cache
│   │   └── queue-manager.ts
│   ├── rpc/                    # RPC coordination
│   │   ├── rpc-coordinator.ts
│   │   ├── payload-compression.service.ts
│   │   └── rpc-job-cancellation.service.ts  # Anulowanie jobów RPC przy timeout
│   ├── worker/                 # Worker orchestration
│   │   ├── worker-orchestrator.ts
│   │   ├── worker-benchmark.ts
│   │   └── worker-metrics-collector.ts
│   ├── types/                  # Typy TypeScript
│   │   └── index.ts
│   ├── command.ts              # Klasa bazowa Command
│   └── index.ts                # CommandBus główna klasa
├── shared/                     # Wspólne komponenty
│   ├── config/                 # Base config z Redis
│   │   └── base-config.ts
│   ├── logging/                # Logger wrapper z poziomami
│   │   ├── logger.ts
│   │   └── log-level.ts
│   ├── redis/                  # Fabryka połączeń Redis (SRP)
│   │   ├── redis-connection-factory.ts  # Tworzenie połączeń z event handlers
│   │   ├── redis-error-formatter.ts     # Formatowanie błędów (AggregateError)
│   │   └── index.ts            # Eksporty modułu
│   └── types.ts                # Współdzielone typy
├── examples/                   # Przykłady użycia
│   ├── rpc.demo.ts             # Demo RPC calls
│   ├── rpc-throughput.demo.ts  # Demo wydajności RPC
│   ├── rpc-compression.demo.ts # Demo kompresji
│   └── rpc-resilience.demo.ts  # Demo reconnect/failover (5 min test)
└── index.ts                    # Główny export pakietu

Migracja z pp-event-bus 1.x

Jeśli używałeś Command Bus z pakietu pp-event-bus w wersji 1.x:

Przed:

import { CommandBus, Command, CommandBusConfig } from 'pp-event-bus';

Po:

npm install pp-command-bus
import { CommandBus, Command, CommandBusConfig } from 'pp-command-bus';

Pełna zgodność API - jedyna zmiana to źródło importu.

Wersjonowanie i Releases

Projekt używa Semantic Versioning oraz automatycznych release'ów dzięki semantic-release.

Konwencja commitów

Używamy Conventional Commits:

# Nowa funkcjonalność (minor release)
feat: dodano wsparcie dla delayed commands

# Poprawka błędu (patch release)
fix: naprawiono memory leak w RPC

# Breaking change (major release)
feat!: zmieniono API CommandBusConfig

# Inne typy (patch release)
docs: zaktualizowano dokumentację API
perf: zoptymalizowano przetwarzanie komend
refactor: uproszczono kod RPC handler
test: dodano testy dla command logging

Typy commitów

  • feat: - nowa funkcjonalność (→ minor release)
  • fix: - poprawka błędu (→ patch release)
  • perf: - optymalizacja wydajności (→ patch release)
  • docs: - zmiany w dokumentacji (→ patch release)
  • style: - formatowanie kodu (→ patch release)
  • refactor: - refaktoryzacja (→ patch release)
  • test: - dodanie/poprawka testów (→ patch release)
  • build: - zmiany w buildzie/zależnościach (→ patch release)
  • ci: - zmiany w CI/CD (→ patch release)
  • chore: - inne zmiany (bez release)
  • ! lub BREAKING CHANGE: - breaking change (→ major release)

Dokumentacja

Architektura

Szczegółowa dokumentacja architektury systemu, wzorców projektowych i zasad SOLID:

  • ARCHITECTURE.md - Kompletny opis architektury, diagramy komponentów, przepływy danych

Komponenty

  • QueueManager - zarządzanie kolejkami BullMQ i cache kolejek
  • WorkerOrchestrator - orkiestracja workerów, dynamiczne concurrency, benchmark
  • JobProcessor - przetwarzanie jobów i wykonanie handlerów
  • RpcCoordinator - zarządzanie wywołaniami RPC przez Redis Pub/Sub
  • RpcJobCancellationService - anulowanie niepodjętych jobów RPC przy timeout
  • JobOptionsBuilder - konfiguracja opcji dla jobów BullMQ
  • CommandLogger - persystencja komend do plików JSONL
  • PayloadCompressionService - automatyczna kompresja gzip
  • AutoConfigOptimizer - optymalizacja concurrency na podstawie zasobów

Licencja

MIT

Autorzy

Linki