npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@sprout-idws/sprout-redis

v1.0.1

Published

Reusable NestJS Redis package: cache, lock, and stream modules

Readme

@sprout-idws/sprout-redis

NestJS integration for Redis (ioredis): shared client, layered cache (memory and/or Redis), distributed locks, and streams (publish, consume, retry, monitoring).

Purpose

  • Standardize Redis connection settings from ConfigService (REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB).
  • Provide injectable building blocks used across Sprout backends: cache abstraction, locking, and stream-based workers.

Installation

npm install @sprout-idws/sprout-redis @sprout-idws/sprout-context ioredis @nestjs/config

Peer dependencies: @nestjs/common, @nestjs/config, @sprout-idws/sprout-context, ioredis (v5+).

Configuration

| Variable | Default | Description | |----------|---------|-------------| | REDIS_HOST | localhost | Redis host | | REDIS_PORT | 6379 | Redis port | | REDIS_PASSWORD | (none) | Optional auth | | REDIS_DB | 0 | Database index |

Modules and exports

RedisBaseModule (@Global)

  • Provider token REDIS_CLIENT: ioredis instance from env.
  • Exports: REDIS_CLIENT, RedisLockService, RedisStreamService, StreamPublisherService.
  • Implements graceful shutdown in OnModuleDestroy.

Import ContextModule from @sprout-idws/sprout-context is required internally; ensure your app registers ContextModule where stream/cache features need request context.

CacheModule (@Global)

Builds three CacheManager tokens (multi-level LRU + optional Redis):

| Token | Behavior | |-------|----------| | REDIS_CACHE_MANAGER | Redis only | | IN_MEMORY_CACHE_MANAGER | In-memory LRU (max 500 entries) | | IN_MEMORY_THEN_REDIS_CACHE_MANAGER | Memory first, then Redis |

CacheManager supports TTL and pluggable ICacheStore levels.

RedisStreamConsumerModule

Registers stream processing services:

  • StreamConsumerService, StreamMessageHandlerService, StreamRetryService, StreamConsumerMonitorService

Use with RedisBaseModule for consumers that read Redis streams, handle messages, retries, and monitoring.

Other notable exports

  • RedisLockService — distributed locks with configurable options.
  • RedisStreamService, StreamPublisherService — stream I/O helpers.
  • Constants: REDIS_CACHE_MANAGER, IN_MEMORY_CACHE_MANAGER, IN_MEMORY_THEN_REDIS_CACHE_MANAGER, DEFAULT_CACHE_TTL_SECONDS, CONSUMER_INTERVAL_TIME, and related types.

The build copies Lua assets used by the implementation (scripts/copy-lua.sh).

Examples

Register modules

RedisBaseModule is global and provides the Redis client, locking, and stream helpers. Add CacheModule for cache tokens. For stream consumers (read, retry loop, idle-consumer monitor), import RedisStreamConsumerModule.

Register ConfigModule so REDIS_* is available, and ContextModule from @sprout-idws/sprout-context wherever publishing or consumer handlers should propagate request context.

import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ContextModule } from '@sprout-idws/sprout-context';
import {
  RedisBaseModule,
  CacheModule,
  RedisStreamConsumerModule,
} from '@sprout-idws/sprout-redis';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ContextModule,
    RedisBaseModule,
    CacheModule,
    RedisStreamConsumerModule, // optional: omit if you only use cache, locks, or publishing
  ],
})
export class AppModule {}

Inject the Redis client (REDIS_CLIENT)

Use the shared ioredis instance for commands not wrapped by this package.

import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis';

@Injectable()
export class HealthService {
  constructor(@Inject('REDIS_CLIENT') private readonly redis: Redis) {}

  async ping(): Promise<string> {
    return this.redis.ping();
  }
}

Layered cache (CacheManager)

Inject one of REDIS_CACHE_MANAGER, IN_MEMORY_CACHE_MANAGER, or IN_MEMORY_THEN_REDIS_CACHE_MANAGER. get calls notFoundFn when the key is missing at every level, then writes through all levels. requiredGet throws if the resolved value is null or undefined. set and delete apply to every level.

import { Inject, Injectable } from '@nestjs/common';
import {
  CacheManager,
  IN_MEMORY_THEN_REDIS_CACHE_MANAGER,
  DEFAULT_CACHE_TTL_SECONDS,
} from '@sprout-idws/sprout-redis';

@Injectable()
export class UserCacheService {
  constructor(
    @Inject(IN_MEMORY_THEN_REDIS_CACHE_MANAGER)
    private readonly cache: CacheManager
  ) {}

  async getUser(id: string) {
    return this.cache.get(
      `user:${id}`,
      async () => fetchUserFromDb(id),
      { ttlSeconds: DEFAULT_CACHE_TTL_SECONDS }
    );
  }

  warmUser(id: string, user: unknown) {
    this.cache.set(`user:${id}`, user, { ttlSeconds: 600 });
  }

  async invalidateUser(id: string) {
    await this.cache.delete(`user:${id}`);
  }
}
  • REDIS_CACHE_MANAGER — Redis only; shared across app instances.
  • IN_MEMORY_CACHE_MANAGER — process-local LRU (max 500 entries).
  • IN_MEMORY_THEN_REDIS_CACHE_MANAGER — memory first; promotes hits to upper levels; on miss, fills memory and Redis.

Custom CacheManager (optional)

Build your own stack with InMemoryCacheStore and RedisCacheStore if you register providers manually (see cache.module.ts in this package for the Nest pattern).

import { CacheManager, InMemoryCacheStore, RedisCacheStore } from '@sprout-idws/sprout-redis';
import Redis from 'ioredis';

const redis = new Redis(/* ... */);
const cache = new CacheManager({
  levels: [new InMemoryCacheStore({ maxSize: 1000 }), new RedisCacheStore(redis)],
  defaultTtlSeconds: 300,
});

Distributed locks (RedisLockService)

runWithLock acquires the lock, runs the callback, then releases. If the lock cannot be acquired after optional retries, it returns without invoking the callback.

import { Injectable } from '@nestjs/common';
import { RedisLockService } from '@sprout-idws/sprout-redis';

@Injectable()
export class ReconciliationService {
  constructor(private readonly locks: RedisLockService) {}

  async runOncePerCluster() {
    await this.locks.runWithLock(
      'lock:reconciliation:daily',
      { ttl: 600, retryDelay: 200, maxRetries: 5 },
      async () => {
        // only one holder across processes until TTL expires
      }
    );
  }
}

Manual acquire / release — when you need the lock token (e.g. work split across several steps):

const token = `my-lock:${Date.now()}:${Math.random()}:${process.pid}`;
const acquired = await this.locks.acquireLock('lock:resource:1', token, {
  ttl: 120,
  maxRetries: 10,
  retryDelay: 50,
});
if (acquired) {
  try {
    // ...
  } finally {
    await this.locks.releaseLock('lock:resource:1', token);
  }
}

Publish to a stream

StreamPublisherService — use from HTTP/gRPC handlers: fills metadata and attaches the current encoded context from ContextService.

import { Injectable } from '@nestjs/common';
import { StreamPublisherService } from '@sprout-idws/sprout-redis';

@Injectable()
export class OrderEventsService {
  constructor(private readonly publisher: StreamPublisherService) {}

  async emitCreated(orderId: string) {
    return this.publisher.publishToStream('orders:events', {
      type: 'created',
      orderId,
    });
  }
}

RedisStreamService.publishToStream — full StreamMessage shape; if context is omitted, it defaults to the current encoded context.

import { Injectable } from '@nestjs/common';
import { RedisStreamService } from '@sprout-idws/sprout-redis';

@Injectable()
export class AdminPublisher {
  constructor(private readonly streams: RedisStreamService) {}

  async publish() {
    await this.streams.publishToStream('my:stream', {
      data: { action: 'reindex' },
      metadata: {
        timestamp: Date.now(),
        streamName: 'my:stream',
        retryCount: 0,
      },
    });
  }
}

Low-level stream helpers (RedisStreamService)

Create a consumer group, read pending-to-group messages, and acknowledge (used internally by the consumer stack; useful for custom tooling).

await this.streams.createConsumerGroup('orders:events', 'order-processors', '0');

const batch = await this.streams.readFromConsumerGroup({
  streamName: 'orders:events',
  groupName: 'order-processors',
  consumerName: 'worker-1',
  count: 10,
  blockTime: 5000,
});

for (const msg of batch) {
  // process msg.data, msg.context, msg.id
  await this.streams.ackDelete('orders:events', 'order-processors', msg.id!);
}

const streamNames = await this.streams.getAllStreams('orders:*');
const groups = await this.streams.getConsumerGroups('orders:events');
const consumers = await this.streams.getConsumers('orders:events', 'order-processors');

sendToDlq, delayedRetry, republishDelayedMessages, and republishInactiveConsumerMessages power retries, the DLQ, and the consumer monitor; call them directly only if you extend the same semantics.

Consume messages (StreamConsumerService)

Registers the stream’s delayed-retry key, ensures the consumer group exists, then polls every CONSUMER_INTERVAL_TIME (1s). The consumer name is suffixed with a unique id. Handler failures trigger retries through StreamRetryService; after maxRetries, messages go to the DLQ stream {streamName}:dlq.

import { Injectable, OnModuleInit } from '@nestjs/common';
import { StreamConsumerService } from '@sprout-idws/sprout-redis';

@Injectable()
export class OrderConsumer implements OnModuleInit {
  constructor(private readonly consumer: StreamConsumerService) {}

  onModuleInit() {
    void this.consumer.consumeMessages(
      {
        streamName: 'orders:events',
        groupName: 'order-processors',
        consumerName: 'api-worker',
        maxRetries: 3,
        retryDelay: 5 * 60 * 1000, // ms before retry (scheduled via sorted set)
        blockTime: 2000,
        prefetchCount: 50,
        maxConcurrency: 25,
      },
      async (entry) => {
        // entry.data, entry.context (restored into ContextService for this callback)
        await handleOrderEvent(entry.data);
      }
    );
  }
}

Process a batch yourself (StreamMessageHandlerService)

Normally used by StreamConsumerService. You can call processMessages if you already have StreamMessage[] (e.g. tests or a custom reader). Exposes getProcessingMessagesCount() / isMessageProcessing(id) for observability.

await this.messageHandler.processMessages({
  streamName: 'orders:events',
  groupName: 'order-processors',
  maxRetries: 3,
  retryDelay: 300_000,
  maxConcurrency: 10,
  messages: fetchedMessages,
  messageHandler: async (entry) => {
    /* ... */
  },
});

Retries and delayed republish (StreamRetryService)

When RedisStreamConsumerModule loads, StreamRetryService starts a periodic job that calls republishDelayedMessages on every registered delayed queue ({streamName}:delayed). StreamConsumerService.consumeMessages registers its stream automatically. You rarely call delayedRetry yourself; it is used on handler failure to re-queue the message after retryDelay.

Idle consumers and reclaim (StreamConsumerMonitorService)

Also starts on module init. Periodically scans Redis for stream keys, lists groups and consumers, and for consumers idle longer than 10 × CONSUMER_INTERVAL_TIME with pending messages, republishes those messages and removes the stale consumer name. No application code is required beyond importing RedisStreamConsumerModule.

Tests

npm test

(from this package directory, or via the monorepo test task)

Repository

sprout-typescript-backendpackages/sprout-redis.