npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

redis-live-collection

v0.2.1

Published

Redis-based realtime collection with compare-and-set support

Readme

Redis Live Collection npm version Build Status

Redis-based realtime collection with compare-and-set support for NodeJS.

Installation

yarn add redis-live-collection

Design

A collection consists of key-value pairs, where keys and values are Buffers and have a numeric version. A collection uses a set of Redis keys:

  • <collectionName>:values: Hash mapping keys to values.
  • <collectionName>:keys: Sorted Set of keys with all scores set to 0.
  • <collectionName>:versions: Sorted Set of keys scored by version.
  • <collectionName>:changes: Stream of collection changes.
  • <collectionName>:revision: Stream ID of the last change entry.

By storing keys in a Sorted Set keys we get the ability to efficiently retrieve or remove lexicographic ranges of keys. Sorted Set versions allows to retrieve or remove ranges of versions.

A version is an arbitrary number (including Infinity). It is up to user to assign a meaning to versions or even use versions at all.

Versions enable compare-and-set operations, where you can atomically compare current version of a key with given number and only update the key if the condition is true.

Stream changes allows to watch a collection and receive updates in realtime.

Usage

Call initCommands on ioredis instance to initialize Lua scripts:

import * as IORedis from 'ioredis';
import {initCommands} from 'redis-live-collection';

const redis = new IORedis();
initCommands(redis);

Now you can invoke collection operations using this instance:

import {set, getAll, watch} from 'redis-live-collection';

await set(redis, 'my-collection', 'some-key', Buffer.from('value'));
const {revision, items} = await getAll(redis, 'my-collection');
const observable = watch(redis, 'my-collection', revision);

Connection Pool

Since reading from a Stream is blocking, to use watch you need to set up a connection pool. Example using generic-pool:

import {createPool} from 'generic-pool';
import {defer} from 'rxjs';
import {finalize} from 'rxjs/operators';

const redisPool = createPool(
  {
    async create() {
      const redis = new IORedis({
        lazyConnect: true,
      });
      initCommands(redis);

      await redis.connect();

      return redis;
    },
    async destroy(redis) {
      await redis.quit();
    },
  },
  {
    min: 1,
    max: 100,
    evictionRunIntervalMillis: 10_000,
    acquireTimeoutMillis: 10_000,
  },
);

const {revision, items} = await pool.use((redis) =>
  getAll(redis, 'my-collection'),
);

const observable = defer(() => pool.acquire()).pipe(
  concatMap((redis) =>
    watch(redis, 'my-collection', revision).pipe(
      finalize(() => {
        pool.release(redis);
      }),
    ),
  ),
);

Pipelining

It is possible to use some operations with Pipelining. For example, to transactionally get multiple fields:

import {transformGetArguments, transformGetReply} from 'redis-live-collection';

const rawResults = await redis
  .multi()
  .lcGetBuffer(...transformGetArguments('my-collection', 'key-1'))
  .lcGetBuffer(...transformGetArguments('my-collection', 'key-2'))
  .lcGetBuffer(...transformGetArguments('my-collection', 'key-3'))
  .exec();

const results = rawResults.map(([err, res]) => {
  if (err) {
    throw err;
  }

  return transformGetReply(res);
});

Cluster

With Redis Cluster, it is important that all the keys used by collection are placed on the same shard. To achieve this, make sure to have hash tags in collection names:

const collection = '{my-collection}';

await getAll(redis, collection);

API

Every operation returns a promise that resolves to an object containing revision string property, which is the collection revision after the operation is applied.

Every write operation has maxlen integer argument that specifies how much entries in the Stream of changes we want to retain. If you do high rate writes, the default of 1000 may be too small for watch to catch up. See Redis docs.

get

function get(
  redis: IORedis.Redis,
  collection: string,
  key: string | Buffer,
): Promise<{
  revision: string;
  value: Buffer | null;
  version: number;
}>;

Get a value of a single key. If the key does not exist, returned value is null and version is 0.

get can be used with Pipelining.

getAll

function getAll(
  redis: IORedis.Redis,
  collection: string,
): Promise<{
  revision: string;
  items: Array<{
    key: Buffer;
    version: number;
    value: Buffer;
  }>;
}>;

Get all key-value pairs.

getKeyRange

function getKeyRange(
  redis: IORedis.Redis,
  collection: string,
  min: string | Buffer,
  max: string | Buffer,
): Promise<{
  revision: string;
  items: Array<{
    key: Buffer;
    version: number;
    value: Buffer;
  }>;
}>;

Get all key-value pairs with keys lexicographically sorted between min and max. See Redis docs on how to specify intervals. Use makePrefixRange to get all keys starting with a prefix.

getKeyRange can be used with Pipelining.

Example:

// get keys from `a` (inclusive) to `z` (exclusive)
await getKeyRange(redis, 'my-collection', '[a', '(z');

// get all keys from `z` (inclusive) onwards
await getKeyRange(redis, 'my-collection', '[z', '+');

getVersionRange

function getVersionRange(
  redis: IORedis.Redis,
  collection: string,
  min: number | string,
  max: number | string,
): Promise<{
  revision: string;
  items: Array<{
    key: Buffer;
    version: number;
    value: Buffer;
  }>;
}>;

Get all key-value pairs with version between min and max (inclusive). Infinity is allowed. See Redis docs on how to specify exclusive intervals.

getVersionRange can be used with Pipelining.

watch

function watch(
  redis: IORedis.Redis,
  collection: string,
  lastRevision: string,
  blockMs: number = 2500,
): Observable<ChangeEvent[]>;

Return RxJS Observable that emits changes to a collection happening after lastRevision. Changes are emitted as arrays of events of the following shape:

type ChangeEvent = SetEvent | RemoveEvent;

type SetEvent = {
  type: 'set';
  revision: string;
  key: Buffer;
  version: number;
  value: Buffer;
};

type RemoveEvent = {
  type: 'remove';
  revision: string;
  key: Buffer;
};

Example:

// observable that emits the whole collection as a Map on each change
const collectionObservable = defer(() => getAll(redis, 'my-collection')).pipe(
  concatMap(({items, revision}) => {
    const initialState = new Map();

    for (const {key, value, version} of items) {
      initialState.set(key.toString(), {value, version});
    }

    return concat(
      of(initialState),
      watch(redis, 'my-collection', revision).pipe(
        scan((state, changes) => {
          const nextState = new Map(state);

          for (const event of changes) {
            if (event.type === 'set') {
              const {value, version} = event;

              nextState.set(event.key.toString(), {value, version});
            } else if (event.type === 'remove') {
              nextState.delete(event.key.toString());
            }
          }

          return nextState;
        }, initialState),
      ),
    );
  }),
);

set

function set(
  redis: IORedis.Redis,
  collection: string,
  key: string | Buffer,
  value: Buffer,
  version: number = Infinity,
  maxlen: number = 1000,
): Promise<{
  revision: string;
}>;

Update the key-value pair, overwriting previous value, if any. See API for details on maxlen argument.

set can be used with Pipelining.

compareAndSet

function compareAndSet(
  redis: IORedis.Redis,
  collection: string,
  key: string | Buffer,
  compareOperator: '<' | '<=' | '==' | '!=' | '>=' | '>',
  compareVersion: number,
  value: Buffer,
  version: number = Infinity,
  maxlen: number = 1000,
): Promise<{
  revision: string;
  success: boolean;
}>;

Update the key-value pair only if a comparison condition holds on the previous version of the pair. If a previous value does not exist, its version is considered to be 0. See API for details on maxlen argument.

compareAndSet can be used with Pipelining.

Example:

// only update if previous version is older
const newVersion = Date.now();
await compareAndSet(
  redis,
  'my-collection',
  'some-key',
  '<',
  newVersion,
  Buffer.from('payload'),
  newVersion,
);

// only update if the key does not exist
await compareAndSet(
  redis,
  'my-collection',
  'some-key',
  '==',
  0,
  Buffer.from('payload'),
);

remove

function remove(
  redis: IORedis.Redis,
  collection: string,
  key: string | Buffer,
  maxlen: number = 1000,
): Promise<{
  revision: string;
}>;

Delete the key-value pair, if it exists. See API for details on maxlen argument.

remove can be used with Pipelining.

compareAndRemove

function compareAndRemove(
  redis: IORedis.Redis,
  collection: string,
  key: string | Buffer,
  compareOperator: '<' | '<=' | '==' | '!=' | '>=' | '>',
  compareVersion: number,
  maxlen: number = 1000,
): Promise<{
  revision: string;
  success: boolean;
}>;

Delete the key-value pair only if a comparison condition holds on the previous version of the pair. If a previous value does not exist, its version is considered to be 0. See API for details on maxlen argument.

compareAndRemove can be used with Pipelining.

removeKeyRange

function removeKeyRange(
  redis: IORedis.Redis,
  collection: string,
  min: string | Buffer,
  max: string | Buffer,
  maxlen: number = 1000,
): Promise<{
  revision: string;
  removedCount: number;
}>;

Delete all key-value pairs with keys lexicographically sorted between min and max. See Redis docs on how to specify intervals. Use makePrefixRange to delete all keys starting with a prefix. See API for details on maxlen argument.

removeKeyRange can be used with Pipelining.

removeVersionRange

function removeVersionRange(
  redis: IORedis.Redis,
  collection: string,
  min: number | string,
  max: number | string,
  maxlen: number = 1000,
): Promise<{
  revision: string;
  removedCount: number;
}>;

Delete all key-value pairs with version between min and max (inclusive). Infinity is allowed. See Redis docs on how to specify exclusive intervals. See API for details on maxlen argument.

removeVersionRange can be used with Pipelining.

makePrefixRange

function makePrefixRange(prefix: string | Buffer): [Buffer, Buffer];

Make a [min, max] range of all keys starting with a prefix. Useful with getKeyRange and removeKeyRange operations:

const [min, max] = makePrefixRange('prefix:');

await getKeyRange(redis, 'my-collection', min, max);
await removeKeyRange(redis, 'my-collection', min, max);

initCommands

function initCommands(redis: IORedis.Redis): void;

Enable collection operations on ioredis instance by defining Lua commands.

Development

Start Redis:

docker-compose up -d

Run tests:

yarn test