npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@logistically/events

v3.3.0

Published

A production-ready event-driven architecture library with Redis Streams, comprehensive batching, reliable consumption, and enterprise-grade features.

Readme

@logistically/events

A production-ready event-driven architecture library with Redis Streams, comprehensive batching, reliable consumption, and enterprise-grade features.

🚀 Features

Core Event System

  • Event System Builder: Fluent API for configuring and building event systems
  • Event Publisher: Reliable event publishing with batching, retry, and rate limiting
  • Event Consumer: Pattern-based event consumption with consumer groups and poison message handling
  • Event Router: Advanced pattern-based routing with origin prefix support
  • Transport Agnostic: Plugin-based transport system supporting Redis Streams and Memory

Redis Streams Transport

  • Consumer Groups: Reliable message consumption with automatic failover
  • Batching: Configurable message batching for high throughput
  • Partitioning: Message partitioning for parallel processing
  • Message Ordering: Strict ordering guarantees
  • Message Replay: Historical message replay capabilities
  • Schema Management: Event schema validation and evolution
  • Dead Letter Queues: Automatic failed message handling
  • Cluster Support: Redis Cluster support for high availability

Memory Transport

  • In-Memory Processing: Fast local event processing
  • Pattern Matching: Regex-based pattern matching
  • Testing Support: Ideal for unit and integration tests

Enterprise Features

  • Origin-Based Routing: Regional isolation and namespace separation
  • Validation: Comprehensive event validation
  • Error Handling: Poison message handling and retry mechanisms
  • Monitoring: Built-in statistics and metrics
  • Type Safety: Full TypeScript support with comprehensive types

📦 Installation

npm install @logistically/events

🏗️ Quick Start

Basic Event System

import { createEventSystemBuilder } from '@logistically/events';

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    groupId: 'my-service-group'
  })
  .build();

await eventSystem.connect();

// Publish events
await eventSystem.publisher.publish('user.created', { userId: '123' });

// Subscribe to events
await eventSystem.consumer.subscribe('user.created', async (message, metadata) => {
  console.log('Received:', message.body);
});

await eventSystem.close();

Advanced Configuration

import { createEventSystemBuilder } from '@logistically/events';

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    groupId: 'my-service-group',
    batchSize: 100,
    enableDLQ: true,
    dlqStreamPrefix: 'dlq:',
    maxRetries: 3
  })
  .originPrefix('eu.de')
  .setValidationMode('warn')
  .enablePublisherBatching({
    enabled: true,
    maxSize: 1000,
    maxWaitMs: 100,
    maxConcurrentBatches: 5,
    strategy: 'size'
  })
  .enablePublisherRetry({
    maxRetries: 3,
    backoffStrategy: 'exponential',
    baseDelay: 1000,
    maxDelay: 10000
  })
  .enableConsumerPatternRouting()
  .enableConsumerGroups()
  .build();

🔧 Configuration

Event System Configuration

interface EventSystemConfig {
  service: string;                    // Required service name
  transports: Map<string, Transport>; // Transport instances
  routing?: RoutingConfig;            // Optional routing configuration
  validationMode?: 'strict' | 'warn' | 'ignore';
  originPrefix?: string;              // Regional prefix (e.g., 'eu.de')
  origins?: string[];                 // Allowed origin prefixes
  
  // Publisher configuration
  publisher?: {
    batching?: {
      enabled: boolean;
      maxSize: number;
      maxWaitMs: number;
      maxConcurrentBatches: number;
      strategy: 'time' | 'size' | 'partition';
      compression?: boolean;
    };
    retry?: {
      maxRetries: number;
      backoffStrategy: 'fixed' | 'exponential' | 'fibonacci';
      baseDelay: number;
      maxDelay: number;
    };
    rateLimiting?: {
      maxRequests: number;
      timeWindow: number;
      strategy: 'sliding-window' | 'token-bucket';
    };
    validationMode?: 'strict' | 'warn' | 'ignore';
  };
  
  // Consumer configuration
  consumer?: {
    enablePatternRouting?: boolean;
    enableConsumerGroups?: boolean;
    poisonMessageHandler?: (message: any, error: Error, metadata: any) => Promise<void>;
    validationMode?: 'strict' | 'warn' | 'ignore';
  };
}

Redis Transport Configuration

interface RedisStreamsConfig {
  // Connection settings
  url?: string;                       // Redis connection URL
  host?: string;                      // Redis host
  port?: number;                      // Redis port
  db?: number;                        // Redis database
  password?: string;                  // Redis password
  
  // Consumer group settings
  groupId?: string;                   // Consumer group name
  consumerId?: string;                // Consumer ID
  
  // Stream settings
  streamPrefix?: string;              // Stream prefix
  maxLen?: number;                    // Maximum stream length
  trimStrategy?: 'MAXLEN' | 'MINID'; // Stream trimming strategy
  
  // Consumer settings
  batchSize?: number;                 // Batch size for consumption
  blockTime?: number;                 // Block time for polling
  pollInterval?: number;              // Poll interval
  maxRetries?: number;                // Maximum retries
  retryDelay?: number;                // Retry delay
  
  // Dead letter queue settings
  enableDLQ?: boolean;                // Enable dead letter queue
  dlqStreamPrefix?: string;           // DLQ stream prefix
  maxRetriesBeforeDLQ?: number;      // Max retries before DLQ
  
  // Performance settings
  enablePipelining?: boolean;         // Enable pipelining
  pipelineSize?: number;              // Pipeline size
  
  // Enterprise features
  ordering?: OrderingConfig;          // Message ordering configuration
  partitioning?: PartitioningConfig;  // Partitioning configuration
  schema?: SchemaConfig;              // Schema management
  replay?: ReplayConfig;              // Message replay configuration
}

Memory Transport Configuration

interface MemoryTransportConfig {
  originPrefix?: string;              // Origin prefix for routing
}

📚 Usage Examples

Event Publishing

// Basic publishing
await eventSystem.publisher.publish('user.created', { userId: '123' });

// Publishing with options
await eventSystem.publisher.publish('order.completed', 
  { orderId: '456', total: 99.99 },
  { partition: 1, headers: { priority: 'high' } }
);

// Batch publishing
const events = [
  { eventType: 'user.created', body: { userId: '123' } },
  { eventType: 'user.created', body: { userId: '456' } }
];

await eventSystem.publisher.publishBatch(events);

Event Consumption

// Basic subscription
await eventSystem.consumer.subscribe('user.created', async (message, metadata) => {
  console.log('User created:', message.body);
});

// Pattern subscription
await eventSystem.consumer.subscribePattern('user.*', async (message, metadata, pattern) => {
  console.log('User event:', message.header.type, message.body, 'Pattern:', pattern);
});

// Subscription with options
await eventSystem.consumer.subscribe('order.*', async (message, metadata) => {
  console.log('Order event:', message.body);
}, {
  groupId: 'order-processors',
  partition: 1
});

Advanced Routing

import { createEventRouter, createBasicRoutingConfig } from '@logistically/events';

const routingConfig = createBasicRoutingConfig(
  [
    {
      pattern: 'user.*',
      transport: 'redis',
      priority: 1,
      options: {
        topic: 'user-events',
        partition: 1,
        ordering: 'strict'
      }
    }
  ],
  'warn',
  'eu.de',
  {
    'user.*': 'user',
    'order.*': 'order'
  },
  'namespace'
);

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .routing(routingConfig)
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .build();

🔍 Pattern Matching

Supported Patterns

  • Exact: user.created - Matches exactly
  • Wildcard: user.* - Matches all user events
  • Prefix Wildcard: *.user.* - Matches user events with any prefix/suffix
  • Suffix Wildcard: user.*.completed - Matches user events ending with completed

Pattern Examples

// Valid patterns
'user.created'           // Exact match
'user.*'                // All user events
'*.user.*'              // User events with any prefix/suffix
'order.*.completed'     // Order events ending with completed
'product.inventory.*'   // Product inventory events

// Invalid patterns
'.user.*'               // Cannot start with dot
'user.*.'               // Cannot end with dot
'user..created'         // Cannot have consecutive dots
'user**created'         // Cannot have consecutive asterisks
'user*created'          // Asterisk must be standalone

🌍 Origin-Based Routing

Regional Isolation

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .originPrefix('eu.de')  // European Germany
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .build();

// Events are automatically prefixed
// 'user.created' becomes 'eu.de.user.created'
await eventSystem.publisher.publish('user.created', { userId: '123' });

// Patterns respect origin prefix
await eventSystem.consumer.subscribe('user.created', handler);
// Only matches 'eu.de.user.created', not 'us.ca.user.created'

Multi-Region Support

// European region
const euSystem = createEventSystemBuilder()
  .service('eu-service')
  .originPrefix('eu.de')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://eu-redis:6379'
  })
  .build();

// US region
const usSystem = createEventSystemBuilder()
  .service('us-service')
  .originPrefix('us.ca')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://us-redis:6379'
  })
  .build();

Origin Filtering

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .origins(['eu.de', 'us.ca'])  // Allow multiple origins
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .build();

// Only events from eu.de or us.ca origins will be processed
await eventSystem.consumer.subscribe('user.*', handler);

📊 Monitoring and Statistics

Consumer Statistics

const stats = await eventSystem.consumer.getStats();

console.log('Total messages:', stats.totalMessagesReceived);
console.log('Failed messages:', stats.failedMessages);
console.log('Poison messages:', stats.poisonMessages);
console.log('Average processing time:', stats.averageProcessingTime);
console.log('Last message time:', stats.lastMessageTime);

Publisher Statistics

const stats = await eventSystem.publisher.getStats();

console.log('Total published:', stats.totalMessagesSent);
console.log('Failed publishes:', stats.failedMessages);
console.log('Batch count:', stats.totalBatchesSent);
console.log('Average latency:', stats.averageLatency);

System Status

const status = await eventSystem.getStatus();

console.log('Connected:', status.connected);
console.log('Healthy:', status.healthy);
console.log('Uptime:', status.uptime);
console.log('Version:', status.version);

// Transport status
for (const [name, transportStatus] of status.transports) {
  console.log(`Transport ${name}:`, transportStatus.connected, transportStatus.healthy);
}

🧪 Testing and Development

Memory Transport for Testing

import { createEventSystemBuilder } from '@logistically/events';

const testSystem = createEventSystemBuilder()
  .service('test-service')
  .addTransportFromFactory('memory', 'memory', {
    enablePatternRouting: true
  })
  .build();

// Use in your tests
await testSystem.publisher.publish('test.event', { data: 'test' });
await testSystem.consumer.subscribe('test.*', handler);

Integration Testing

// Use Redis transport for integration tests
const integrationSystem = createEventSystemBuilder()
  .service('integration-test')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    groupId: 'integration-test-group'
  })
  .build();

// Test real Redis functionality
await integrationSystem.publisher.publish('integration.test', { test: true });

Comprehensive Test Suite

The library includes extensive testing capabilities:

  • Unit Tests: 700+ tests covering all components
  • Integration Tests: End-to-end testing with real transports
  • Performance Tests: Load testing and benchmarking
  • Memory Leak Detection: Automatic leak detection in tests
  • Coverage Reporting: Detailed coverage analysis

🚨 Error Handling and Resilience

Poison Message Handling

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .setPoisonMessageHandler(async (message, error, metadata) => {
    console.error('Poison message:', message, error);
    // Handle failed messages (e.g., log to monitoring system)
    await logToMonitoringSystem(message, error);
  })
  .build();

Validation Errors

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .setValidationMode('strict')  // Throw on validation errors
  .build();

try {
  await eventSystem.publisher.publish('invalid.event', { invalid: 'data' });
} catch (error) {
  console.error('Validation failed:', error.message);
}

Retry with Exponential Backoff

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .enablePublisherRetry({
    maxRetries: 5,
    backoffStrategy: 'exponential',
    baseDelay: 1000,
    maxDelay: 30000
  })
  .build();

// Intelligent retry with exponential backoff for transient failures

🏭 Enterprise Features

High Availability

  • Redis Cluster Support: Basic cluster support with failover capabilities
  • Consumer Groups: Reliable message consumption with automatic failover
  • Health Status: Comprehensive health status through API calls
  • Error Handling: Comprehensive error handling and retry mechanisms

Dead Letter Queues (Redis Transport)

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    enableDLQ: true,                    // Enable automatic DLQ
    dlqStreamPrefix: 'dlq:',            // DLQ stream prefix
    maxRetriesBeforeDLQ: 3              // Retry count before DLQ
  })
  .build();

// Failed messages are automatically moved to DLQ streams after max retries
// DLQ streams are named with 'dlq:' prefix (e.g., 'dlq:user-events')

Message Ordering

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    ordering: {
      enabled: true,
      strategy: 'strict',
      partitionKey: 'userId'
    }
  })
  .build();

// Maintain strict ordering for messages with the same partition key

Schema Management

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    schema: {
      enabled: true,
      validationMode: 'strict',
      autoEvolution: true,
      compatibilityCheck: true
    }
  })
  .build();

// Automatic schema validation and evolution

Message Replay

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    replay: {
      enabled: true,
      maxReplayMessages: 1000,
      replayTimeout: 300000,
      validateReplay: true
    }
  })
  .build();

// Historical messages can be replayed for testing or recovery

Security

  • Event Validation: Comprehensive input validation
  • Origin Isolation: Regional data isolation through origin prefixes
  • Transport Security: Security depends on transport implementation
  • No Built-in Encryption: Encryption must be implemented at transport level

Monitoring and Observability

  • Built-in Statistics: Publisher and consumer performance metrics
  • System Status: Connection status and comprehensive health information
  • Error Tracking: Failed message and poison message tracking
  • Transport Metrics: Detailed transport-level performance data

📈 Performance Tuning

Batching Configuration

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .enablePublisherBatching({
    enabled: true,
    maxSize: 1000,           // Maximum batch size
    maxWaitMs: 100,          // Maximum wait time
    maxConcurrentBatches: 5, // Parallel batch processing
    strategy: 'size'          // Batch by size, not time
  })
  .build();

Partitioning

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    partitioning: {
      enabled: true,
      strategy: 'hash',
      partitions: 8,
      partitionKey: 'userId'
    }
  })
  .build();

Rate Limiting

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .enablePublisherRateLimiting({
    maxRequests: 1000,
    timeWindow: 60000,
    strategy: 'sliding-window'
  })
  .build();

🔧 Advanced Configuration

Custom Transport

import { Transport, TransportCapabilities } from '@logistically/events';

class CustomTransport implements Transport {
  readonly name = 'custom';
  readonly capabilities: TransportCapabilities = {
    supportsPublishing: true,
    supportsSubscription: true,
    supportsBatching: false,
    supportsPartitioning: false,
    supportsOrdering: false,
    supportsPatternRouting: false,
    supportsConsumerGroups: false,
    supportsDeadLetterQueues: false,
    supportsMessageRetention: false,
    supportsMessageCompression: false,
    maxMessageSize: 1024,
    maxBatchSize: 1,
    maxTopics: 100,
    maxPartitions: 1,
    maxConsumerGroups: 0,
    supportsPersistence: false,
    supportsReplication: false,
    supportsFailover: false,
    supportsTransactions: false,
    supportsMetrics: true,
    supportsTracing: false,
    supportsHealthChecks: true
  };

  // Implement required methods...
}

const eventSystem = createEventSystemBuilder()
  .service('my-service')
  .addTransport('custom', new CustomTransport())
  .build();

Transport Factory

import { createTransportFactory, RedisStreamsPlugin, MemoryTransportPlugin } from '@logistically/events';

const factory = createTransportFactory();

// Register plugins
factory.registerPlugin(new RedisStreamsPlugin());
factory.registerPlugin(new MemoryTransportPlugin());

// Create transports
const redisTransport = factory.createTransport({
  type: 'redis-streams',
  options: {
    url: 'redis://localhost:6379',
    groupId: 'my-group'
  }
});

const memoryTransport = factory.createTransport({
  type: 'memory',
  options: {
    enablePatternRouting: true
  }
});

📚 API Reference

For detailed API documentation, see:

🏗️ System Architecture

The library follows a modular, plugin-based architecture:

┌─────────────────────────────────────────────────────────────────┐
│                        Event System                            │
├─────────────────────────────────────────────────────────────────┤
│  EventSystemBuilder                                           │
│  ├── Configuration Management                                 │
│  ├── Transport Factory                                       │
│  ├── Validation Engine                                       │
│  └── System Assembly                                         │
├─────────────────────────────────────────────────────────────────┤
│  Core Components                                              │
│  ├── EventPublisher                                          │
│  ├── EventConsumer                                           │
│  ├── EventRouter                                             │
│  └── EventValidator                                          │
├─────────────────────────────────────────────────────────────────┤
│  Transport Layer                                              │
│  ├── Redis Streams Transport                                 │
│  ├── Memory Transport                                        │
│  └── Custom Transport Plugins                                │
└─────────────────────────────────────────────────────────────────┘

Key Design Principles

  • Transport Agnostic: Core logic independent of transport implementation
  • Plugin Architecture: Easy to add new transports and features
  • Type Safety: Full TypeScript support with comprehensive types
  • Validation First: Comprehensive validation at every level
  • Performance Optimized: Efficient batching, partitioning, and routing
  • Enterprise Ready: Production-grade features for mission-critical systems

🚀 Getting Started Examples

Simple Notification Service

import { createEventSystemBuilder } from '@logistically/events';

const notificationService = createEventSystemBuilder()
  .service('notification-service')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379',
    groupId: 'notification-service-group'
  })
  .build();

await notificationService.connect();

// Subscribe to notification events
await notificationService.consumer.subscribe('notification.sent', async (message, metadata) => {
  const { userId, type, content } = message.body;
  console.log(`Sending ${type} notification to user ${userId}: ${content}`);
});

// Publish notification events
await notificationService.publisher.publish('notification.sent', {
  userId: '123',
  type: 'email',
  content: 'Welcome to our service!'
});

Multi-Transport Order Service

import { createEventSystemBuilder } from '@logistically/events';

const orderService = createEventSystemBuilder()
  .service('order-service')
  .originPrefix('eu.de')
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://redis-cluster:6379',
    groupId: 'order-service-group',
    enableDLQ: true,
    maxRetries: 3
  })
  .addTransportFromFactory('memory', 'memory', {
    enablePatternRouting: true
  })
  .enablePublisherBatching({
    enabled: true,
    maxSize: 1000,
    maxWaitMs: 100,
    maxConcurrentBatches: 5,
    strategy: 'size'
  })
  .enableConsumerPatternRouting()
  .enableConsumerGroups()
  .setPoisonMessageHandler(async (message, error, metadata) => {
    console.error('Poison message:', message, error);
    await logToMonitoringSystem(message, error);
  })
  .build();

await orderService.connect();

// Subscribe to all order events
await orderService.consumer.subscribePattern('order.*', async (message, metadata, pattern) => {
  console.log(`Order event ${pattern}:`, message.body);
});

// Publish order events
await orderService.publisher.publish('order.created', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  total: 99.99
});

Custom Routing Configuration

import { createEventSystemBuilder, createBasicRoutingConfig } from '@logistically/events';

const routingConfig = createBasicRoutingConfig(
  [
    { pattern: 'user.*', transport: 'redis', options: { topic: 'user-events' } },
    { pattern: 'order.*', transport: 'redis', options: { topic: 'order-events' } },
    { pattern: 'payment.*', transport: 'redis', options: { topic: 'payment-events' } }
  ],
  'warn',
  'eu.de',
  {
    'user.*': 'user',
    'order.*': 'order',
    'payment.*': 'payment'
  },
  'namespace'
);

const gatewayService = createEventSystemBuilder()
  .service('gateway-service')
  .originPrefix('eu.de')
  .routing(routingConfig)
  .addTransportFromFactory('redis', 'redis-streams', {
    url: 'redis://localhost:6379'
  })
  .build();

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Ensure all tests pass
  6. Submit a pull request

📄 License

MIT License - see LICENSE file for details.

🆘 Support