@uploadista/event-broadcaster-ioredis
v0.1.2
Published
Redis event broadcaster for Uploadista
Readme
@uploadista/event-broadcaster-ioredis
IORedis-backed event broadcaster for Uploadista. Provides clustered event broadcasting with automatic failover and replica scaling.
Overview
The IORedis event broadcaster uses the IORedis library's advanced features for distributed event broadcasting. Perfect for:
- Redis Clustering: Built-in cluster support with auto-discovery
- Sentinel HA: Automatic failover with Redis Sentinel
- Replica Scaling: Distribute read load across replicas
- Large Deployments: 50+ server instances with efficient broadcasting
- Production High-Availability: Enterprise-grade resilience
Events published to a channel reach all subscribers across all servers, with automatic failover if nodes go down.
Installation
npm install @uploadista/event-broadcaster-ioredis ioredis
# or
pnpm add @uploadista/event-broadcaster-ioredis ioredisPrerequisites
- Node.js 18+
- Redis 5.0+ or Redis Cluster
- Optional: Redis Sentinel for automatic failover
- Two IORedis connections (one for pub, one for sub)
Quick Start
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
import { Effect } from "effect";
// Create IORedis clients (one for pub, one for sub)
const redis = new Redis({
host: "localhost",
port: 6379,
});
const subscriberRedis = new Redis({
host: "localhost",
port: 6379,
});
const program = Effect.gen(function* () {
// Event broadcaster is automatically available
});
Effect.runSync(
program.pipe(
Effect.provide(
ioRedisEventBroadcaster({
redis,
subscriberRedis,
})
),
// ... other layers
)
);Features
- ✅ Redis Clustering: Automatic cluster discovery and failover
- ✅ Sentinel Support: Monitored automatic master failover
- ✅ Replica Reads: Distribute read load across replicas
- ✅ Cluster Scaling: Add nodes dynamically without reconfiguration
- ✅ Connection Pooling: Efficient resource management
- ✅ Advanced Retry Logic: Robust connection recovery
API Reference
Main Exports
ioRedisEventBroadcaster(config: IoRedisEventBroadcasterConfig): Layer<EventBroadcasterService>
Creates an Effect layer providing the EventBroadcasterService backed by IORedis.
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({ host: "localhost" });
const subscriberRedis = new Redis({ host: "localhost" });
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});Configuration:
interface IoRedisEventBroadcasterConfig {
redis: Redis; // Connection for publishing
subscriberRedis: Redis; // Connection for subscribing
}createIoRedisEventBroadcaster(config: IoRedisEventBroadcasterConfig): EventBroadcaster
Factory function to create a broadcaster instance.
import { createIoRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
const broadcaster = createIoRedisEventBroadcaster({
redis,
subscriberRedis,
});Available Operations
The IORedis broadcaster implements the EventBroadcaster interface:
publish(channel: string, message: string): Effect<void>
Broadcast a message to all subscribers (across cluster).
const program = Effect.gen(function* () {
yield* broadcaster.publish("uploads:complete", JSON.stringify({
uploadId: "abc123",
duration: 45000,
}));
});subscribe(channel: string, handler: (message: string) => void): Effect<void>
Subscribe to a channel (with cluster awareness).
const program = Effect.gen(function* () {
yield* broadcaster.subscribe("uploads:complete", (message: string) => {
const event = JSON.parse(message);
console.log(`Upload complete: ${event.uploadId}`);
});
});unsubscribe(channel: string): Effect<void>
Unsubscribe from a channel.
const program = Effect.gen(function* () {
yield* broadcaster.unsubscribe("uploads:complete");
});Configuration
Single Instance
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({
host: "localhost",
port: 6379,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
host: "localhost",
port: 6379,
password: process.env.REDIS_PASSWORD,
});
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});Redis Cluster
With automatic discovery:
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis.Cluster(
[
{ host: "cluster-node-1", port: 6379 },
{ host: "cluster-node-2", port: 6379 },
{ host: "cluster-node-3", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const subscriberRedis = new Redis.Cluster(
[
{ host: "cluster-node-1", port: 6379 },
{ host: "cluster-node-2", port: 6379 },
{ host: "cluster-node-3", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const layer = ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});Redis Sentinel
For automatic failover:
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({
sentinels: [
{ host: "sentinel-1", port: 26379 },
{ host: "sentinel-2", port: 26379 },
{ host: "sentinel-3", port: 26379 },
],
name: "mymaster",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
sentinels: [
{ host: "sentinel-1", port: 26379 },
{ host: "sentinel-2", port: 26379 },
{ host: "sentinel-3", port: 26379 },
],
name: "mymaster",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});Environment-Based Configuration
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const createBroadcaster = () => {
let redis: Redis;
let subscriberRedis: Redis;
if (process.env.REDIS_CLUSTER_NODES) {
// Cluster mode
const nodes = process.env.REDIS_CLUSTER_NODES.split(",").map(
(addr) => {
const [host, port] = addr.split(":");
return { host, port: parseInt(port) };
}
);
redis = new Redis.Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
});
subscriberRedis = new Redis.Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
});
} else {
// Single instance
redis = new Redis({
url: process.env.REDIS_URL || "redis://localhost:6379",
});
subscriberRedis = new Redis({
url: process.env.REDIS_URL || "redis://localhost:6379",
});
}
return ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});
};
const layer = createBroadcaster();Examples
Example 1: Clustered Upload Service
Large deployment across many servers:
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import { uploadServer } from "@uploadista/server";
import Redis from "ioredis";
import { Effect } from "effect";
// 5-node cluster
const redis = new Redis.Cluster(
[
{ host: "redis-1.cluster", port: 6379 },
{ host: "redis-2.cluster", port: 6379 },
{ host: "redis-3.cluster", port: 6379 },
{ host: "redis-4.cluster", port: 6379 },
{ host: "redis-5.cluster", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const subscriberRedis = new Redis.Cluster(
// Same cluster nodes
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const program = Effect.gen(function* () {
const server = yield* uploadServer;
// Subscribe for uploads completing
yield* broadcaster.subscribe("uploads:complete", (message: string) => {
const { uploadId, size } = JSON.parse(message);
console.log(`Upload complete: ${uploadId} (${size} bytes)`);
// Trigger flow processing
// Each server independently processes
});
// Server 1 completes upload
yield* broadcaster.publish("uploads:complete", JSON.stringify({
uploadId: "upl_server1_123",
size: 1048576,
completedBy: "server-1",
}));
// ALL servers (including server 1) receive notification
});
Effect.runSync(
program.pipe(
Effect.provide(
ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
})
)
)
);Example 2: Sentinel-Based Failover
Automatic failover if primary goes down:
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
import { Effect } from "effect";
const redis = new Redis({
sentinels: [
{ host: process.env.SENTINEL_1, port: 26379 },
{ host: process.env.SENTINEL_2, port: 26379 },
{ host: process.env.SENTINEL_3, port: 26379 },
],
name: "uploadista-primary",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
sentinels: [
{ host: process.env.SENTINEL_1, port: 26379 },
{ host: process.env.SENTINEL_2, port: 26379 },
{ host: process.env.SENTINEL_3, port: 26379 },
],
name: "uploadista-primary",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const broadcaster = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});
// If Redis primary fails:
// 1. Sentinel detects failure
// 2. Promotes replica automatically
// 3. IORedis reconnects to new primary
// 4. Events continue flowing (transparent failover)Example 3: Load Balancing with Replica Reads
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
// Write to master, read from replicas
const redis = new Redis.Cluster(
[
{ host: "master", port: 6379 },
{ host: "replica-1", port: 6379 },
{ host: "replica-2", port: 6379 },
],
{
scaleReads: "slave", // Read from replicas
}
);
const subscriberRedis = new Redis.Cluster(
[
{ host: "master", port: 6379 },
{ host: "replica-1", port: 6379 },
{ host: "replica-2", port: 6379 },
],
{
scaleReads: "slave",
}
);
// Publishes always go to master
// Subscriptions distributed across replicas
const broadcaster = ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});Performance Characteristics
| Operation | Latency | Throughput | Cluster Aware | |-----------|---------|-----------|--------------| | publish() | 1-2ms | 100k+ events/sec | ✅ | | subscribe() | 2-5ms | N/A | ✅ | | unsubscribe() | 1-2ms | N/A | ✅ | | Failover | 1-5s | Auto-recovery | ✅ |
Architecture Patterns
Cluster with Replicas
App Servers ──→ Pub Redis Master ──→ Replicate to ──→ Replica 1
Replica 2
Replica 3Sentinel Monitored
Sentinel 1 ┐
Sentinel 2 ├──→ Monitor ──→ Primary Redis ──→ Replica 1
Sentinel 3 ┘ Replica 2
↓
On failure: Promote Replica 1
Apps reconnect automaticallyMulti-Region Cluster
Region 1: Cluster Nodes ─┐
Region 2: Cluster Nodes ├──→ Single logical cluster
Region 3: Cluster Nodes ─┘ (apps in any region)Best Practices
1. Use Cluster for Scale
// Development: Single instance
const redis = new Redis();
// Production: Cluster
const redis = new Redis.Cluster(nodes);2. Configure Retry Strategy
const redis = new Redis({
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});3. Monitor Cluster Health
# Check cluster nodes
redis-cli -c CLUSTER NODES
# Check sentinel status
redis-cli -p 26379 SENTINEL MASTERSDeployment
Docker Compose with Cluster
version: "3"
services:
app:
environment:
REDIS_CLUSTER_NODES: redis1:6379,redis2:6379,redis3:6379
redis1:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf
redis2:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf
redis3:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.confKubernetes Deployment
apiVersion: v1
kind: ConfigMap
metadata:
name: redis-config
data:
redis.conf: |
cluster-enabled yes
cluster-config-file nodes.conf
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis-cluster
spec:
serviceName: redis
replicas: 6
template:
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
- containerPort: 16379Related Packages
- @uploadista/core - Core types
- @uploadista/event-broadcaster-redis - Standard Redis broadcaster
- @uploadista/event-broadcaster-memory - Single-process broadcaster
- @uploadista/kv-store-ioredis - IORedis KV store
- @uploadista/server - Upload server
Troubleshooting
"Cluster connection failed"
Verify cluster nodes are reachable:
redis-cli -c
> CLUSTER INFO
# Should show: cluster_state:ok"Sentinel can't find master"
Check Sentinel configuration:
redis-cli -p 26379
> SENTINEL MASTERS
# Should list your master instanceHigh Latency in Cluster
Check cluster topology:
redis-cli -c
> CLUSTER SLOTS
# Verify even slot distributionLicense
See LICENSE in the main repository.
See Also
- EVENT_SYSTEM.md - Architecture guide
- Redis Broadcaster - Simpler Redis option
- ioredis Documentation - Official ioredis docs
- Redis Cluster Tutorial - Redis clustering
- Redis Sentinel Guide - HA with Sentinel
