npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@polinater/redis-event-bus

v4.0.0

Published

Redis-backed event bus with strict type safety and automatic retries

Readme

Redis Event Bus

What’s New in v4.0.0 – The BullMQ Update

v4 migrates the underlying transport from Redis Pub/Sub to BullMQ, bringing:

  • Durable storage – jobs survive restarts & deployments.
  • Automatic retries with exponential back-off.
  • Stalled-job detection & dead-letter inspection via Bull Board.
  • Horizontal-scaling friendliness (work-queue model).
  • No breaking TypeScript API changes – keep using emit, on, off, and all Prisma generics.

30-second migration

  1. Install the new dependency

    npm install bullmq --save
  2. Run a long-lived worker process (outside Vercel/Netlify functions).

    # scripts/event-worker.ts
    import { initializeSubscribers } from '../src/initializeSubscribers';
    import { createEventBus } from '@polinater/redis-event-bus';
    
    await createEventBus();
    await initializeSubscribers();
    console.log('Event-bus worker ready');
    process.stdin.resume();
  3. Deploy that script on Render, Fly.io, Docker, k8s, etc. Your Next.js app can stay on Vercel.

No other code changes required.


Installation

npm install @polinater/redis-event-bus bullmq

Prerequisites

  • Node.js 16 or higher
  • Redis server (local or cloud)
  • Prisma Client (optional but recommended for type safety)
  • Next.js 13+ (for app router examples)

Peer Dependencies

The package works with Prisma v5 and v6:

npm install @prisma/client  # ^5.0.0 || ^6.0.0

Quick Start

// lib/eventBus.ts
import { PrismaClient } from '@prisma/client';
import { createEventBusFromClient, EventType } from '@polinater/redis-event-bus';

const prisma = new PrismaClient();
export const eventBus = createEventBusFromClient(prisma);

// Emit an event
await eventBus.emit(EventType.BusinessCreated, {
  metadata: {
    eventId: crypto.randomUUID(),
    timestamp: new Date().toISOString(),
    version: '1.0'
  },
  data: {
    id: 'biz_123',
    business: await prisma.business.findUnique({ where: { id: 'biz_123' } })
  }
});

Setup Guide

Step 1: Environment Configuration

Create or update your .env.local file:

# Required: Redis connection URL
REDIS_EVENTS_URL=redis://localhost:6379

# Optional: Redis configuration
REDIS_PASSWORD=your_password
REDIS_DB=0

Step 2: Initialize Event Bus

Create a centralized event bus instance:

// lib/eventBus.ts
import { PrismaClient } from '@prisma/client';
import { 
  createEventBusFromClient, 
  type ExtractPrismaTypes 
} from '@polinater/redis-event-bus';

// Initialize Prisma
const prisma = new PrismaClient();

// Create type-safe event bus
export const eventBus = createEventBusFromClient(prisma);

// Extract your Prisma types for reuse
export type MyPrismaTypes = ExtractPrismaTypes<typeof prisma>;

// Export prisma for use in other files
export { prisma };

Step 3: Set Up Event Subscribers

Create a centralized subscribers file:

// lib/subscribers.ts
import { EventType } from '@polinater/redis-event-bus';
import { eventBus } from './eventBus';
import { sendWelcomeEmail } from './email';
import { updateAnalytics } from './analytics';

export async function initializeSubscribers() {
  // Business Events
  await eventBus.on(EventType.BusinessCreated, async (event) => {
    const business = event.data.data;
    
    // Send welcome email
    if (business.email) {
      await sendWelcomeEmail(business.email, business.name);
    }
    
    // Update analytics
    await updateAnalytics('business_created', {
      businessId: business.id,
      timestamp: event.metadata.timestamp
    });
  });

  await eventBus.on(EventType.BusinessUpdated, async (event) => {
    console.log('Business updated:', event.data.businessId);
    console.log('Changes:', event.data.changes);
    
    // Sync with external services
    await syncWithCRM(event.data.data);
  });

  await eventBus.on(EventType.BusinessDeleted, async (event) => {
    // Clean up related data
    await cleanupBusinessData(event.data.businessId);
  });

  console.log('Event subscribers initialized');
}

async function syncWithCRM(business: any) {
  // Your CRM sync logic
}

async function cleanupBusinessData(businessId: string) {
  // Your cleanup logic
}

Step 4: Initialize in Next.js

For Next.js App Router, use the instrumentation file:

// instrumentation.ts (in your project root)
export async function register() {
  if (process.env.NEXT_RUNTIME === 'nodejs') {
    // Only run in Node.js runtime (server-side)
    const { initializeSubscribers } = await import('./lib/subscribers');
    await initializeSubscribers();
    console.log('Redis Event Bus initialized');
  }
}

Make sure to enable instrumentation in next.config.js:

// next.config.js
/** @type {import('next').NextConfig} */
const nextConfig = {
  experimental: {
    instrumentationHook: true,
  },
}

module.exports = nextConfig;

For Next.js Pages Router, initialize in a startup script or API route:

// pages/api/init.ts
import { initializeSubscribers } from '../../lib/subscribers';

let initialized = false;

export default async function handler(req: any, res: any) {
  if (!initialized) {
    await initializeSubscribers();
    initialized = true;
  }
  
  res.status(200).json({ message: 'Event bus initialized' });
}

Step 5: Using in API Routes

// app/api/businesses/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { eventBus, prisma } from '@/lib/eventBus';
import { EventType } from '@polinater/redis-event-bus';

export async function POST(request: NextRequest) {
  try {
    const data = await request.json();
    
    // Create business in database
    const business = await prisma.business.create({ 
      data: {
        name: data.name,
        email: data.email,
        // ... other fields
      }
    });
    
    // Emit event (this will trigger all subscribers)
    await eventBus.emit(EventType.BusinessCreated, {
      metadata: {
        eventId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: '1.0',
        correlationId: data.correlationId // Optional
      },
      data: {
        businessId: business.id,
        data: business
      }
    });
    
    return NextResponse.json(business);
  } catch (error) {
    console.error('Error creating business:', error);
    return NextResponse.json(
      { error: 'Failed to create business' }, 
      { status: 500 }
    );
  }
}

export async function PUT(request: NextRequest) {
  try {
    const { searchParams } = new URL(request.url);
    const id = searchParams.get('id');
    const data = await request.json();
    
    if (!id) {
      return NextResponse.json({ error: 'ID required' }, { status: 400 });
    }
    
    // Get original business for change comparison
    const originalBusiness = await prisma.business.findUnique({
      where: { id }
    });
    
    if (!originalBusiness) {
      return NextResponse.json({ error: 'Business not found' }, { status: 404 });
    }
    
    // Update business
    const updatedBusiness = await prisma.business.update({
      where: { id },
      data
    });
    
    // Calculate changes
    const changes: Record<string, any> = {};
    Object.keys(data).forEach(key => {
      if (originalBusiness[key as keyof typeof originalBusiness] !== data[key]) {
        changes[key] = {
          from: originalBusiness[key as keyof typeof originalBusiness],
          to: data[key]
        };
      }
    });
    
    // Emit update event
    await eventBus.emit(EventType.BusinessUpdated, {
      metadata: {
        eventId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: '1.0'
      },
      data: {
        businessId: id,
        data: updatedBusiness,
        changes
      }
    });
    
    return NextResponse.json(updatedBusiness);
  } catch (error) {
    console.error('Error updating business:', error);
    return NextResponse.json(
      { error: 'Failed to update business' }, 
      { status: 500 }
    );
  }
}

Usage Patterns

Pattern 1: Simple Event Emission

import { eventBus } from '@/lib/eventBus';
import { EventType } from '@polinater/redis-event-bus';

// Basic event emission
await eventBus.emit(EventType.BusinessCreated, {
  metadata: {
    eventId: crypto.randomUUID(),
    timestamp: new Date().toISOString(),
    version: '1.0'
  },
  data: {
    businessId: business.id,
    data: business
  }
});

Pattern 2: Event Chaining

// In your subscriber
await eventBus.on(EventType.BusinessCreated, async (event) => {
  const business = event.data.data;
  
  // Process business
  await processNewBusiness(business);
  
  // Emit follow-up event
  await eventBus.emit(EventType.BusinessBrandingCreated, {
    metadata: {
      eventId: crypto.randomUUID(),
      timestamp: new Date().toISOString(),
      version: '1.0',
      correlationId: event.metadata.eventId // Link events
    },
    data: {
      businessId: business.id,
      data: await createDefaultBranding(business.id)
    }
  });
});

Pattern 3: Conditional Event Handling

await eventBus.on(EventType.BusinessUpdated, async (event) => {
  const { data: business, changes } = event.data;
  
  // Only process if email changed
  if (changes.email) {
    await updateEmailInCRM(business.id, changes.email.to);
  }
  
  // Only process if status changed to active
  if (changes.status?.to === 'active') {
    await activateBusinessServices(business.id);
  }
});

Pattern 4: Error Handling in Subscribers

await eventBus.on(EventType.BusinessCreated, async (event) => {
  try {
    await sendWelcomeEmail(event.data.data.email);
  } catch (error) {
    console.error('Failed to send welcome email:', error);
    
    // Emit error event for monitoring
    await eventBus.emit(EventType.EventCreated, {
      metadata: {
        eventId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: '1.0'
      },
      data: {
        id: crypto.randomUUID(),
        businessId: event.data.businessId,
        data: {
          id: crypto.randomUUID(),
          businessId: event.data.businessId,
          eventType: 'email_send_failed',
          createdAt: new Date(),
          updatedAt: new Date()
        }
      }
    });
    
    // Don't rethrow - let other subscribers continue
  }
});

Configuration

Basic Configuration

The event bus automatically uses environment variables:

REDIS_EVENTS_URL=redis://localhost:6379

Advanced Configuration

For more control, use the configuration API:

import { configureRedisEventBus } from '@polinater/redis-event-bus';

// Configure before creating event bus instances
configureRedisEventBus({
  url: process.env.REDIS_URL || 'redis://localhost:6379',
  enableLogging: process.env.NODE_ENV !== 'production',
  options: {
    // Redis connection options
    maxRetriesPerRequest: 5,
    connectTimeout: 15000,
    lazyConnect: true,
    
    // Custom retry strategy
    retryStrategy: (times) => {
      const delay = Math.min(times * 50, 2000);
      return delay;
    },
    
    // TLS configuration for cloud Redis
    tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
    
    // Password authentication
    password: process.env.REDIS_PASSWORD,
    
    // Database selection
    db: parseInt(process.env.REDIS_DB || '0'),
  }
});

Cloud Redis Providers

Redis Cloud

configureRedisEventBus({
  url: 'rediss://username:password@your-endpoint:port',
  options: {
    tls: {
      rejectUnauthorized: true
    }
  }
});

AWS ElastiCache

configureRedisEventBus({
  url: 'redis://your-cluster.cache.amazonaws.com:6379',
  options: {
    family: 4, // IPv4
    keepAlive: true,
    maxRetriesPerRequest: 3
  }
});

Upstash Redis

configureRedisEventBus({
  url: process.env.UPSTASH_REDIS_REST_URL,
  options: {
    password: process.env.UPSTASH_REDIS_REST_TOKEN,
    tls: {}
  }
});

Event Types

The package includes predefined event types for common business operations:

Business Events

  • EventType.BusinessCreated
  • EventType.BusinessUpdated
  • EventType.BusinessDeleted

Business Component Events

  • EventType.BusinessBrandingCreated/Updated/Deleted
  • EventType.BusinessBuyerInfoCreated/Updated/Deleted
  • EventType.BusinessSupplierInfoCreated/Updated/Deleted
  • EventType.BusinessFinanceCreated/Updated/Deleted
  • EventType.BusinessPartnerCreated/Updated/Deleted

Item Management Events

  • EventType.BuyerItemAliasCreated/Updated/Deleted
  • EventType.CustomItemCostCreated/Updated/Deleted
  • EventType.CustomItemTaxCreated/Updated/Deleted

System Events

  • EventType.EventCreated/Updated/Deleted
  • EventType.IdempotencyKeyCreated/Updated/Deleted

Event Metadata

All events include standardized metadata:

interface EventMetadata {
  eventId: string;          // Unique identifier for this event
  timestamp: string;        // ISO timestamp of when event occurred
  version: string;          // Event schema version
  membershipId?: string;    // Optional: User/membership context
  integrationId?: string;   // Optional: External integration context
  correlationId?: string;   // Optional: Link related events
}

Error Handling

Connection Errors

import { eventBus } from '@/lib/eventBus';

// Check connection health
const isHealthy = await eventBus.isHealthy();
if (!isHealthy) {
  console.error('Redis connection is not healthy');
  // Handle gracefully - maybe use fallback or queue
}

// Get connection status
const isConnected = eventBus.getConnectionStatus();

Emission Errors

The event bus automatically retries failed emissions:

try {
  await eventBus.emit(EventType.BusinessCreated, payload);
} catch (error) {
  if (error.name === 'PublishError') {
    console.error('Failed to publish event after retries:', error.message);
    // Handle failed emission - maybe store for later retry
  }
}

Subscription Errors

try {
  await eventBus.on(EventType.BusinessCreated, handler);
} catch (error) {
  if (error.name === 'SubscriptionError') {
    console.error('Failed to subscribe to event:', error.message);
    // Handle subscription failure
  }
}

Graceful Shutdown

// In your shutdown handler (e.g., process.on('SIGTERM'))
async function gracefulShutdown() {
  try {
    await eventBus.disconnect();
    console.log('Event bus disconnected gracefully');
  } catch (error) {
    console.error('Error during shutdown:', error);
  }
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);

Performance & Scaling

Connection Pooling

The event bus uses Redis connection pooling automatically:

configureRedisEventBus({
  options: {
    maxRetriesPerRequest: 3,
    enableOfflineQueue: true,
    enableAutoPipelining: true, // Automatic command batching
    maxmemoryPolicy: 'allkeys-lru'
  }
});

Event Batching

For high-throughput scenarios, batch events:

async function batchEmitBusinessEvents(businesses: Business[]) {
  const promises = businesses.map(business => 
    eventBus.emit(EventType.BusinessCreated, {
      metadata: {
        eventId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: '1.0'
      },
      data: {
        businessId: business.id,
        data: business
      }
    })
  );
  
  // Emit all events concurrently
  await Promise.allSettled(promises);
}

Memory Management

Monitor and manage event handlers:

// Remove specific handler
const handler = async (event: any) => { /* ... */ };
await eventBus.on(EventType.BusinessCreated, handler);

// Later, remove it
await eventBus.off(EventType.BusinessCreated, handler);

// Remove all handlers for an event type
await eventBus.off(EventType.BusinessCreated);

Scaling Considerations

  1. Multiple Instances: Each app instance creates its own Redis connections
  2. Load Balancing: Redis pub/sub works across multiple consumers
  3. Persistence: Events are not persisted by default - consider Redis persistence settings
  4. Monitoring: Monitor Redis memory usage and connection counts

Troubleshooting

Common Issues

1. Events Not Being Received

Problem: Subscribers not receiving events Solutions:

// Ensure subscribers are initialized before emitting
await initializeSubscribers();
await new Promise(resolve => setTimeout(resolve, 1000)); // Brief delay
await eventBus.emit(EventType.BusinessCreated, payload);

// Check Redis connection
const isHealthy = await eventBus.isHealthy();
console.log('Redis healthy:', isHealthy);

2. Type Errors with Prisma

Problem: TypeScript errors with Prisma types Solutions:

// Ensure Prisma client is generated
// Run: npx prisma generate

// Use type assertion if needed
import type { Business } from '@prisma/client';
const business = event.data.data as Business;

// Or regenerate types
import type { ExtractPrismaTypes } from '@polinater/redis-event-bus';
import { PrismaClient } from '@prisma/client';
type MyTypes = ExtractPrismaTypes<PrismaClient>;

3. Redis Connection Issues

Problem: Cannot connect to Redis Solutions:

# Check Redis is running
redis-cli ping

# Check environment variables
echo $REDIS_EVENTS_URL

# Test connection manually
redis-cli -u $REDIS_EVENTS_URL ping

4. Memory Leaks

Problem: Memory usage growing over time Solutions:

// Clean up event handlers
await eventBus.off(EventType.BusinessCreated);

// Implement connection cleanup
process.on('exit', async () => {
  await eventBus.disconnect();
});

Debug Mode

Enable detailed logging:

configureRedisEventBus({
  enableLogging: true,
  options: {
    showFriendlyErrorStack: true,
    maxmemoryPolicy: 'allkeys-lru'
  }
});

Health Checks

Implement health checks for monitoring:

// app/api/health/redis/route.ts
import { eventBus } from '@/lib/eventBus';

export async function GET() {
  try {
    const isHealthy = await eventBus.isHealthy();
    
    if (isHealthy) {
      return Response.json({ status: 'healthy', redis: 'connected' });
    } else {
      return Response.json(
        { status: 'unhealthy', redis: 'disconnected' }, 
        { status: 503 }
      );
    }
  } catch (error) {
    return Response.json(
      { status: 'error', message: error.message }, 
      { status: 500 }
    );
  }
}

API Reference

Factory Functions

// Create with default types
const eventBus = createEventBus();

// Create with Prisma types
const eventBus = createEventBusWithPrisma<MyPrismaTypes>();

// Create from Prisma client (recommended)
const eventBus = createEventBusFromClient(prismaClient);

Event Bus Methods

// Subscribe to events
await eventBus.on(eventType, handler);

// Unsubscribe from events
await eventBus.off(eventType, handler?);

// Emit events
await eventBus.emit(eventType, payload);

// Health checks
const healthy = await eventBus.isHealthy();
const connected = eventBus.getConnectionStatus();

// Cleanup
await eventBus.disconnect();

Configuration

// Configure Redis connection
configureRedisEventBus(config);

// Configuration interface
interface RedisEventBusConfig {
  url?: string;
  options?: RedisOptions;
  enableLogging?: boolean;
}

Type Utilities

// Extract types from Prisma client
type MyTypes = ExtractPrismaTypes<typeof prismaClient>;

// Infer types from instance
type MyTypes = InferPrismaTypes<typeof prismaClient>;

Support

For issues, questions, or contributions, please visit the GitHub repository.

License

MIT License - see LICENSE file for details.