@polinater/redis-event-bus
v4.0.0
Published
Redis-backed event bus with strict type safety and automatic retries
Maintainers
Readme
Redis Event Bus
What’s New in v4.0.0 – The BullMQ Update
v4 migrates the underlying transport from Redis Pub/Sub to BullMQ, bringing:
- Durable storage – jobs survive restarts & deployments.
- Automatic retries with exponential back-off.
- Stalled-job detection & dead-letter inspection via Bull Board.
- Horizontal-scaling friendliness (work-queue model).
- No breaking TypeScript API changes – keep using
emit,on,off, and all Prisma generics.
30-second migration
Install the new dependency
npm install bullmq --saveRun a long-lived worker process (outside Vercel/Netlify functions).
# scripts/event-worker.ts import { initializeSubscribers } from '../src/initializeSubscribers'; import { createEventBus } from '@polinater/redis-event-bus'; await createEventBus(); await initializeSubscribers(); console.log('Event-bus worker ready'); process.stdin.resume();Deploy that script on Render, Fly.io, Docker, k8s, etc. Your Next.js app can stay on Vercel.
No other code changes required.
Installation
npm install @polinater/redis-event-bus bullmqPrerequisites
- Node.js 16 or higher
- Redis server (local or cloud)
- Prisma Client (optional but recommended for type safety)
- Next.js 13+ (for app router examples)
Peer Dependencies
The package works with Prisma v5 and v6:
npm install @prisma/client # ^5.0.0 || ^6.0.0Quick Start
// lib/eventBus.ts
import { PrismaClient } from '@prisma/client';
import { createEventBusFromClient, EventType } from '@polinater/redis-event-bus';
const prisma = new PrismaClient();
export const eventBus = createEventBusFromClient(prisma);
// Emit an event
await eventBus.emit(EventType.BusinessCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
},
data: {
id: 'biz_123',
business: await prisma.business.findUnique({ where: { id: 'biz_123' } })
}
});Setup Guide
Step 1: Environment Configuration
Create or update your .env.local file:
# Required: Redis connection URL
REDIS_EVENTS_URL=redis://localhost:6379
# Optional: Redis configuration
REDIS_PASSWORD=your_password
REDIS_DB=0Step 2: Initialize Event Bus
Create a centralized event bus instance:
// lib/eventBus.ts
import { PrismaClient } from '@prisma/client';
import {
createEventBusFromClient,
type ExtractPrismaTypes
} from '@polinater/redis-event-bus';
// Initialize Prisma
const prisma = new PrismaClient();
// Create type-safe event bus
export const eventBus = createEventBusFromClient(prisma);
// Extract your Prisma types for reuse
export type MyPrismaTypes = ExtractPrismaTypes<typeof prisma>;
// Export prisma for use in other files
export { prisma };Step 3: Set Up Event Subscribers
Create a centralized subscribers file:
// lib/subscribers.ts
import { EventType } from '@polinater/redis-event-bus';
import { eventBus } from './eventBus';
import { sendWelcomeEmail } from './email';
import { updateAnalytics } from './analytics';
export async function initializeSubscribers() {
// Business Events
await eventBus.on(EventType.BusinessCreated, async (event) => {
const business = event.data.data;
// Send welcome email
if (business.email) {
await sendWelcomeEmail(business.email, business.name);
}
// Update analytics
await updateAnalytics('business_created', {
businessId: business.id,
timestamp: event.metadata.timestamp
});
});
await eventBus.on(EventType.BusinessUpdated, async (event) => {
console.log('Business updated:', event.data.businessId);
console.log('Changes:', event.data.changes);
// Sync with external services
await syncWithCRM(event.data.data);
});
await eventBus.on(EventType.BusinessDeleted, async (event) => {
// Clean up related data
await cleanupBusinessData(event.data.businessId);
});
console.log('Event subscribers initialized');
}
async function syncWithCRM(business: any) {
// Your CRM sync logic
}
async function cleanupBusinessData(businessId: string) {
// Your cleanup logic
}Step 4: Initialize in Next.js
For Next.js App Router, use the instrumentation file:
// instrumentation.ts (in your project root)
export async function register() {
if (process.env.NEXT_RUNTIME === 'nodejs') {
// Only run in Node.js runtime (server-side)
const { initializeSubscribers } = await import('./lib/subscribers');
await initializeSubscribers();
console.log('Redis Event Bus initialized');
}
}Make sure to enable instrumentation in next.config.js:
// next.config.js
/** @type {import('next').NextConfig} */
const nextConfig = {
experimental: {
instrumentationHook: true,
},
}
module.exports = nextConfig;For Next.js Pages Router, initialize in a startup script or API route:
// pages/api/init.ts
import { initializeSubscribers } from '../../lib/subscribers';
let initialized = false;
export default async function handler(req: any, res: any) {
if (!initialized) {
await initializeSubscribers();
initialized = true;
}
res.status(200).json({ message: 'Event bus initialized' });
}Step 5: Using in API Routes
// app/api/businesses/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { eventBus, prisma } from '@/lib/eventBus';
import { EventType } from '@polinater/redis-event-bus';
export async function POST(request: NextRequest) {
try {
const data = await request.json();
// Create business in database
const business = await prisma.business.create({
data: {
name: data.name,
email: data.email,
// ... other fields
}
});
// Emit event (this will trigger all subscribers)
await eventBus.emit(EventType.BusinessCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0',
correlationId: data.correlationId // Optional
},
data: {
businessId: business.id,
data: business
}
});
return NextResponse.json(business);
} catch (error) {
console.error('Error creating business:', error);
return NextResponse.json(
{ error: 'Failed to create business' },
{ status: 500 }
);
}
}
export async function PUT(request: NextRequest) {
try {
const { searchParams } = new URL(request.url);
const id = searchParams.get('id');
const data = await request.json();
if (!id) {
return NextResponse.json({ error: 'ID required' }, { status: 400 });
}
// Get original business for change comparison
const originalBusiness = await prisma.business.findUnique({
where: { id }
});
if (!originalBusiness) {
return NextResponse.json({ error: 'Business not found' }, { status: 404 });
}
// Update business
const updatedBusiness = await prisma.business.update({
where: { id },
data
});
// Calculate changes
const changes: Record<string, any> = {};
Object.keys(data).forEach(key => {
if (originalBusiness[key as keyof typeof originalBusiness] !== data[key]) {
changes[key] = {
from: originalBusiness[key as keyof typeof originalBusiness],
to: data[key]
};
}
});
// Emit update event
await eventBus.emit(EventType.BusinessUpdated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
},
data: {
businessId: id,
data: updatedBusiness,
changes
}
});
return NextResponse.json(updatedBusiness);
} catch (error) {
console.error('Error updating business:', error);
return NextResponse.json(
{ error: 'Failed to update business' },
{ status: 500 }
);
}
}Usage Patterns
Pattern 1: Simple Event Emission
import { eventBus } from '@/lib/eventBus';
import { EventType } from '@polinater/redis-event-bus';
// Basic event emission
await eventBus.emit(EventType.BusinessCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
},
data: {
businessId: business.id,
data: business
}
});Pattern 2: Event Chaining
// In your subscriber
await eventBus.on(EventType.BusinessCreated, async (event) => {
const business = event.data.data;
// Process business
await processNewBusiness(business);
// Emit follow-up event
await eventBus.emit(EventType.BusinessBrandingCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0',
correlationId: event.metadata.eventId // Link events
},
data: {
businessId: business.id,
data: await createDefaultBranding(business.id)
}
});
});Pattern 3: Conditional Event Handling
await eventBus.on(EventType.BusinessUpdated, async (event) => {
const { data: business, changes } = event.data;
// Only process if email changed
if (changes.email) {
await updateEmailInCRM(business.id, changes.email.to);
}
// Only process if status changed to active
if (changes.status?.to === 'active') {
await activateBusinessServices(business.id);
}
});Pattern 4: Error Handling in Subscribers
await eventBus.on(EventType.BusinessCreated, async (event) => {
try {
await sendWelcomeEmail(event.data.data.email);
} catch (error) {
console.error('Failed to send welcome email:', error);
// Emit error event for monitoring
await eventBus.emit(EventType.EventCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
},
data: {
id: crypto.randomUUID(),
businessId: event.data.businessId,
data: {
id: crypto.randomUUID(),
businessId: event.data.businessId,
eventType: 'email_send_failed',
createdAt: new Date(),
updatedAt: new Date()
}
}
});
// Don't rethrow - let other subscribers continue
}
});Configuration
Basic Configuration
The event bus automatically uses environment variables:
REDIS_EVENTS_URL=redis://localhost:6379Advanced Configuration
For more control, use the configuration API:
import { configureRedisEventBus } from '@polinater/redis-event-bus';
// Configure before creating event bus instances
configureRedisEventBus({
url: process.env.REDIS_URL || 'redis://localhost:6379',
enableLogging: process.env.NODE_ENV !== 'production',
options: {
// Redis connection options
maxRetriesPerRequest: 5,
connectTimeout: 15000,
lazyConnect: true,
// Custom retry strategy
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
// TLS configuration for cloud Redis
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
// Password authentication
password: process.env.REDIS_PASSWORD,
// Database selection
db: parseInt(process.env.REDIS_DB || '0'),
}
});Cloud Redis Providers
Redis Cloud
configureRedisEventBus({
url: 'rediss://username:password@your-endpoint:port',
options: {
tls: {
rejectUnauthorized: true
}
}
});AWS ElastiCache
configureRedisEventBus({
url: 'redis://your-cluster.cache.amazonaws.com:6379',
options: {
family: 4, // IPv4
keepAlive: true,
maxRetriesPerRequest: 3
}
});Upstash Redis
configureRedisEventBus({
url: process.env.UPSTASH_REDIS_REST_URL,
options: {
password: process.env.UPSTASH_REDIS_REST_TOKEN,
tls: {}
}
});Event Types
The package includes predefined event types for common business operations:
Business Events
EventType.BusinessCreatedEventType.BusinessUpdatedEventType.BusinessDeleted
Business Component Events
EventType.BusinessBrandingCreated/Updated/DeletedEventType.BusinessBuyerInfoCreated/Updated/DeletedEventType.BusinessSupplierInfoCreated/Updated/DeletedEventType.BusinessFinanceCreated/Updated/DeletedEventType.BusinessPartnerCreated/Updated/Deleted
Item Management Events
EventType.BuyerItemAliasCreated/Updated/DeletedEventType.CustomItemCostCreated/Updated/DeletedEventType.CustomItemTaxCreated/Updated/Deleted
System Events
EventType.EventCreated/Updated/DeletedEventType.IdempotencyKeyCreated/Updated/Deleted
Event Metadata
All events include standardized metadata:
interface EventMetadata {
eventId: string; // Unique identifier for this event
timestamp: string; // ISO timestamp of when event occurred
version: string; // Event schema version
membershipId?: string; // Optional: User/membership context
integrationId?: string; // Optional: External integration context
correlationId?: string; // Optional: Link related events
}Error Handling
Connection Errors
import { eventBus } from '@/lib/eventBus';
// Check connection health
const isHealthy = await eventBus.isHealthy();
if (!isHealthy) {
console.error('Redis connection is not healthy');
// Handle gracefully - maybe use fallback or queue
}
// Get connection status
const isConnected = eventBus.getConnectionStatus();Emission Errors
The event bus automatically retries failed emissions:
try {
await eventBus.emit(EventType.BusinessCreated, payload);
} catch (error) {
if (error.name === 'PublishError') {
console.error('Failed to publish event after retries:', error.message);
// Handle failed emission - maybe store for later retry
}
}Subscription Errors
try {
await eventBus.on(EventType.BusinessCreated, handler);
} catch (error) {
if (error.name === 'SubscriptionError') {
console.error('Failed to subscribe to event:', error.message);
// Handle subscription failure
}
}Graceful Shutdown
// In your shutdown handler (e.g., process.on('SIGTERM'))
async function gracefulShutdown() {
try {
await eventBus.disconnect();
console.log('Event bus disconnected gracefully');
} catch (error) {
console.error('Error during shutdown:', error);
}
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);Performance & Scaling
Connection Pooling
The event bus uses Redis connection pooling automatically:
configureRedisEventBus({
options: {
maxRetriesPerRequest: 3,
enableOfflineQueue: true,
enableAutoPipelining: true, // Automatic command batching
maxmemoryPolicy: 'allkeys-lru'
}
});Event Batching
For high-throughput scenarios, batch events:
async function batchEmitBusinessEvents(businesses: Business[]) {
const promises = businesses.map(business =>
eventBus.emit(EventType.BusinessCreated, {
metadata: {
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
},
data: {
businessId: business.id,
data: business
}
})
);
// Emit all events concurrently
await Promise.allSettled(promises);
}Memory Management
Monitor and manage event handlers:
// Remove specific handler
const handler = async (event: any) => { /* ... */ };
await eventBus.on(EventType.BusinessCreated, handler);
// Later, remove it
await eventBus.off(EventType.BusinessCreated, handler);
// Remove all handlers for an event type
await eventBus.off(EventType.BusinessCreated);Scaling Considerations
- Multiple Instances: Each app instance creates its own Redis connections
- Load Balancing: Redis pub/sub works across multiple consumers
- Persistence: Events are not persisted by default - consider Redis persistence settings
- Monitoring: Monitor Redis memory usage and connection counts
Troubleshooting
Common Issues
1. Events Not Being Received
Problem: Subscribers not receiving events Solutions:
// Ensure subscribers are initialized before emitting
await initializeSubscribers();
await new Promise(resolve => setTimeout(resolve, 1000)); // Brief delay
await eventBus.emit(EventType.BusinessCreated, payload);
// Check Redis connection
const isHealthy = await eventBus.isHealthy();
console.log('Redis healthy:', isHealthy);2. Type Errors with Prisma
Problem: TypeScript errors with Prisma types Solutions:
// Ensure Prisma client is generated
// Run: npx prisma generate
// Use type assertion if needed
import type { Business } from '@prisma/client';
const business = event.data.data as Business;
// Or regenerate types
import type { ExtractPrismaTypes } from '@polinater/redis-event-bus';
import { PrismaClient } from '@prisma/client';
type MyTypes = ExtractPrismaTypes<PrismaClient>;3. Redis Connection Issues
Problem: Cannot connect to Redis Solutions:
# Check Redis is running
redis-cli ping
# Check environment variables
echo $REDIS_EVENTS_URL
# Test connection manually
redis-cli -u $REDIS_EVENTS_URL ping4. Memory Leaks
Problem: Memory usage growing over time Solutions:
// Clean up event handlers
await eventBus.off(EventType.BusinessCreated);
// Implement connection cleanup
process.on('exit', async () => {
await eventBus.disconnect();
});Debug Mode
Enable detailed logging:
configureRedisEventBus({
enableLogging: true,
options: {
showFriendlyErrorStack: true,
maxmemoryPolicy: 'allkeys-lru'
}
});Health Checks
Implement health checks for monitoring:
// app/api/health/redis/route.ts
import { eventBus } from '@/lib/eventBus';
export async function GET() {
try {
const isHealthy = await eventBus.isHealthy();
if (isHealthy) {
return Response.json({ status: 'healthy', redis: 'connected' });
} else {
return Response.json(
{ status: 'unhealthy', redis: 'disconnected' },
{ status: 503 }
);
}
} catch (error) {
return Response.json(
{ status: 'error', message: error.message },
{ status: 500 }
);
}
}API Reference
Factory Functions
// Create with default types
const eventBus = createEventBus();
// Create with Prisma types
const eventBus = createEventBusWithPrisma<MyPrismaTypes>();
// Create from Prisma client (recommended)
const eventBus = createEventBusFromClient(prismaClient);Event Bus Methods
// Subscribe to events
await eventBus.on(eventType, handler);
// Unsubscribe from events
await eventBus.off(eventType, handler?);
// Emit events
await eventBus.emit(eventType, payload);
// Health checks
const healthy = await eventBus.isHealthy();
const connected = eventBus.getConnectionStatus();
// Cleanup
await eventBus.disconnect();Configuration
// Configure Redis connection
configureRedisEventBus(config);
// Configuration interface
interface RedisEventBusConfig {
url?: string;
options?: RedisOptions;
enableLogging?: boolean;
}Type Utilities
// Extract types from Prisma client
type MyTypes = ExtractPrismaTypes<typeof prismaClient>;
// Infer types from instance
type MyTypes = InferPrismaTypes<typeof prismaClient>;Support
For issues, questions, or contributions, please visit the GitHub repository.
License
MIT License - see LICENSE file for details.
