@sprout-idws/sprout-redis
v1.0.1
Published
Reusable NestJS Redis package: cache, lock, and stream modules
Maintainers
Readme
@sprout-idws/sprout-redis
NestJS integration for Redis (ioredis): shared client, layered cache (memory and/or Redis), distributed locks, and streams (publish, consume, retry, monitoring).
Purpose
- Standardize Redis connection settings from
ConfigService(REDIS_HOST,REDIS_PORT,REDIS_PASSWORD,REDIS_DB). - Provide injectable building blocks used across Sprout backends: cache abstraction, locking, and stream-based workers.
Installation
npm install @sprout-idws/sprout-redis @sprout-idws/sprout-context ioredis @nestjs/configPeer dependencies: @nestjs/common, @nestjs/config, @sprout-idws/sprout-context, ioredis (v5+).
Configuration
| Variable | Default | Description |
|----------|---------|-------------|
| REDIS_HOST | localhost | Redis host |
| REDIS_PORT | 6379 | Redis port |
| REDIS_PASSWORD | (none) | Optional auth |
| REDIS_DB | 0 | Database index |
Modules and exports
RedisBaseModule (@Global)
- Provider token
REDIS_CLIENT:ioredisinstance from env. - Exports:
REDIS_CLIENT,RedisLockService,RedisStreamService,StreamPublisherService. - Implements graceful shutdown in
OnModuleDestroy.
Import ContextModule from @sprout-idws/sprout-context is required internally; ensure your app registers ContextModule where stream/cache features need request context.
CacheModule (@Global)
Builds three CacheManager tokens (multi-level LRU + optional Redis):
| Token | Behavior |
|-------|----------|
| REDIS_CACHE_MANAGER | Redis only |
| IN_MEMORY_CACHE_MANAGER | In-memory LRU (max 500 entries) |
| IN_MEMORY_THEN_REDIS_CACHE_MANAGER | Memory first, then Redis |
CacheManager supports TTL and pluggable ICacheStore levels.
RedisStreamConsumerModule
Registers stream processing services:
StreamConsumerService,StreamMessageHandlerService,StreamRetryService,StreamConsumerMonitorService
Use with RedisBaseModule for consumers that read Redis streams, handle messages, retries, and monitoring.
Other notable exports
RedisLockService— distributed locks with configurable options.RedisStreamService,StreamPublisherService— stream I/O helpers.- Constants:
REDIS_CACHE_MANAGER,IN_MEMORY_CACHE_MANAGER,IN_MEMORY_THEN_REDIS_CACHE_MANAGER,DEFAULT_CACHE_TTL_SECONDS,CONSUMER_INTERVAL_TIME, and related types.
The build copies Lua assets used by the implementation (scripts/copy-lua.sh).
Examples
Register modules
RedisBaseModule is global and provides the Redis client, locking, and stream helpers. Add CacheModule for cache tokens. For stream consumers (read, retry loop, idle-consumer monitor), import RedisStreamConsumerModule.
Register ConfigModule so REDIS_* is available, and ContextModule from @sprout-idws/sprout-context wherever publishing or consumer handlers should propagate request context.
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ContextModule } from '@sprout-idws/sprout-context';
import {
RedisBaseModule,
CacheModule,
RedisStreamConsumerModule,
} from '@sprout-idws/sprout-redis';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ContextModule,
RedisBaseModule,
CacheModule,
RedisStreamConsumerModule, // optional: omit if you only use cache, locks, or publishing
],
})
export class AppModule {}Inject the Redis client (REDIS_CLIENT)
Use the shared ioredis instance for commands not wrapped by this package.
import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis';
@Injectable()
export class HealthService {
constructor(@Inject('REDIS_CLIENT') private readonly redis: Redis) {}
async ping(): Promise<string> {
return this.redis.ping();
}
}Layered cache (CacheManager)
Inject one of REDIS_CACHE_MANAGER, IN_MEMORY_CACHE_MANAGER, or IN_MEMORY_THEN_REDIS_CACHE_MANAGER. get calls notFoundFn when the key is missing at every level, then writes through all levels. requiredGet throws if the resolved value is null or undefined. set and delete apply to every level.
import { Inject, Injectable } from '@nestjs/common';
import {
CacheManager,
IN_MEMORY_THEN_REDIS_CACHE_MANAGER,
DEFAULT_CACHE_TTL_SECONDS,
} from '@sprout-idws/sprout-redis';
@Injectable()
export class UserCacheService {
constructor(
@Inject(IN_MEMORY_THEN_REDIS_CACHE_MANAGER)
private readonly cache: CacheManager
) {}
async getUser(id: string) {
return this.cache.get(
`user:${id}`,
async () => fetchUserFromDb(id),
{ ttlSeconds: DEFAULT_CACHE_TTL_SECONDS }
);
}
warmUser(id: string, user: unknown) {
this.cache.set(`user:${id}`, user, { ttlSeconds: 600 });
}
async invalidateUser(id: string) {
await this.cache.delete(`user:${id}`);
}
}REDIS_CACHE_MANAGER— Redis only; shared across app instances.IN_MEMORY_CACHE_MANAGER— process-local LRU (max 500 entries).IN_MEMORY_THEN_REDIS_CACHE_MANAGER— memory first; promotes hits to upper levels; on miss, fills memory and Redis.
Custom CacheManager (optional)
Build your own stack with InMemoryCacheStore and RedisCacheStore if you register providers manually (see cache.module.ts in this package for the Nest pattern).
import { CacheManager, InMemoryCacheStore, RedisCacheStore } from '@sprout-idws/sprout-redis';
import Redis from 'ioredis';
const redis = new Redis(/* ... */);
const cache = new CacheManager({
levels: [new InMemoryCacheStore({ maxSize: 1000 }), new RedisCacheStore(redis)],
defaultTtlSeconds: 300,
});Distributed locks (RedisLockService)
runWithLock acquires the lock, runs the callback, then releases. If the lock cannot be acquired after optional retries, it returns without invoking the callback.
import { Injectable } from '@nestjs/common';
import { RedisLockService } from '@sprout-idws/sprout-redis';
@Injectable()
export class ReconciliationService {
constructor(private readonly locks: RedisLockService) {}
async runOncePerCluster() {
await this.locks.runWithLock(
'lock:reconciliation:daily',
{ ttl: 600, retryDelay: 200, maxRetries: 5 },
async () => {
// only one holder across processes until TTL expires
}
);
}
}Manual acquire / release — when you need the lock token (e.g. work split across several steps):
const token = `my-lock:${Date.now()}:${Math.random()}:${process.pid}`;
const acquired = await this.locks.acquireLock('lock:resource:1', token, {
ttl: 120,
maxRetries: 10,
retryDelay: 50,
});
if (acquired) {
try {
// ...
} finally {
await this.locks.releaseLock('lock:resource:1', token);
}
}Publish to a stream
StreamPublisherService — use from HTTP/gRPC handlers: fills metadata and attaches the current encoded context from ContextService.
import { Injectable } from '@nestjs/common';
import { StreamPublisherService } from '@sprout-idws/sprout-redis';
@Injectable()
export class OrderEventsService {
constructor(private readonly publisher: StreamPublisherService) {}
async emitCreated(orderId: string) {
return this.publisher.publishToStream('orders:events', {
type: 'created',
orderId,
});
}
}RedisStreamService.publishToStream — full StreamMessage shape; if context is omitted, it defaults to the current encoded context.
import { Injectable } from '@nestjs/common';
import { RedisStreamService } from '@sprout-idws/sprout-redis';
@Injectable()
export class AdminPublisher {
constructor(private readonly streams: RedisStreamService) {}
async publish() {
await this.streams.publishToStream('my:stream', {
data: { action: 'reindex' },
metadata: {
timestamp: Date.now(),
streamName: 'my:stream',
retryCount: 0,
},
});
}
}Low-level stream helpers (RedisStreamService)
Create a consumer group, read pending-to-group messages, and acknowledge (used internally by the consumer stack; useful for custom tooling).
await this.streams.createConsumerGroup('orders:events', 'order-processors', '0');
const batch = await this.streams.readFromConsumerGroup({
streamName: 'orders:events',
groupName: 'order-processors',
consumerName: 'worker-1',
count: 10,
blockTime: 5000,
});
for (const msg of batch) {
// process msg.data, msg.context, msg.id
await this.streams.ackDelete('orders:events', 'order-processors', msg.id!);
}
const streamNames = await this.streams.getAllStreams('orders:*');
const groups = await this.streams.getConsumerGroups('orders:events');
const consumers = await this.streams.getConsumers('orders:events', 'order-processors');sendToDlq, delayedRetry, republishDelayedMessages, and republishInactiveConsumerMessages power retries, the DLQ, and the consumer monitor; call them directly only if you extend the same semantics.
Consume messages (StreamConsumerService)
Registers the stream’s delayed-retry key, ensures the consumer group exists, then polls every CONSUMER_INTERVAL_TIME (1s). The consumer name is suffixed with a unique id. Handler failures trigger retries through StreamRetryService; after maxRetries, messages go to the DLQ stream {streamName}:dlq.
import { Injectable, OnModuleInit } from '@nestjs/common';
import { StreamConsumerService } from '@sprout-idws/sprout-redis';
@Injectable()
export class OrderConsumer implements OnModuleInit {
constructor(private readonly consumer: StreamConsumerService) {}
onModuleInit() {
void this.consumer.consumeMessages(
{
streamName: 'orders:events',
groupName: 'order-processors',
consumerName: 'api-worker',
maxRetries: 3,
retryDelay: 5 * 60 * 1000, // ms before retry (scheduled via sorted set)
blockTime: 2000,
prefetchCount: 50,
maxConcurrency: 25,
},
async (entry) => {
// entry.data, entry.context (restored into ContextService for this callback)
await handleOrderEvent(entry.data);
}
);
}
}Process a batch yourself (StreamMessageHandlerService)
Normally used by StreamConsumerService. You can call processMessages if you already have StreamMessage[] (e.g. tests or a custom reader). Exposes getProcessingMessagesCount() / isMessageProcessing(id) for observability.
await this.messageHandler.processMessages({
streamName: 'orders:events',
groupName: 'order-processors',
maxRetries: 3,
retryDelay: 300_000,
maxConcurrency: 10,
messages: fetchedMessages,
messageHandler: async (entry) => {
/* ... */
},
});Retries and delayed republish (StreamRetryService)
When RedisStreamConsumerModule loads, StreamRetryService starts a periodic job that calls republishDelayedMessages on every registered delayed queue ({streamName}:delayed). StreamConsumerService.consumeMessages registers its stream automatically. You rarely call delayedRetry yourself; it is used on handler failure to re-queue the message after retryDelay.
Idle consumers and reclaim (StreamConsumerMonitorService)
Also starts on module init. Periodically scans Redis for stream keys, lists groups and consumers, and for consumers idle longer than 10 × CONSUMER_INTERVAL_TIME with pending messages, republishes those messages and removes the stale consumer name. No application code is required beyond importing RedisStreamConsumerModule.
Tests
npm test(from this package directory, or via the monorepo test task)
Repository
sprout-typescript-backend — packages/sprout-redis.
