npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@clamator/over-redis

v0.1.9

Published

Redis-streams transport for clamator (pre-1.0).

Readme

@clamator/over-redis

Redis-streams transport for clamator. Implements the Transport interface from @clamator/protocol so JSON-RPC traffic flows over Redis streams between processes — typically a TS service and a Py service, or two TS services on different hosts.

Install

npm install @clamator/over-redis @clamator/protocol ioredis zod

⚠️ Required: declare zod in your own package.json.

zod is a peer dependency. Add "zod": "^3.23.0" to your package's dependencies even if you don't import zod directly. Without it, TypeScript can see two distinct ZodType identities and reject mixed-source schemas. See @clamator/protocol's install section for the full rationale.

Quickstart

Define the contract in TypeScript:

import { z } from 'zod';
import { defineContract, defineMethod, defineNotification } from '@clamator/protocol';

export const arithContract = defineContract('arith', {
  add: defineMethod({
    params: z.object({ a: z.number(), b: z.number() }),
    result: z.object({ sum: z.number() }),
  }),
  ping: defineNotification({ params: z.object({}) }),
});

(Verbatim from ts/packages/over-redis/tests/contracts/arith.ts:1-10.)

Run @clamator/codegen to emit a typed client and a service interface from that contract:

npx @clamator/codegen --src contracts --out-ts generated --ts-contract-import '../contracts/arith.js'

Server-side — register handlers and start:

import type IORedis from 'ioredis';
import { RedisRpcServer } from '../src/index.js';
import { arithContract } from './contracts/arith.js';
import type { ArithService } from './generated/arith.js';

export async function buildArithServer(opts: { redis: IORedis; keyPrefix: string }) {
  const server = new RedisRpcServer({ redis: opts.redis, keyPrefix: opts.keyPrefix }); // injected redis= not closed by stop() — caller owns lifecycle; omit to let transport own it
  const handlers: ArithService = {
    add: async ({ a, b }) => ({ sum: a + b }),
    ping: async (_params) => {},
  };
  server.registerService(arithContract, handlers); // must precede start() — post-start registrations are silently ignored, no consumer group or read loop is created
  await server.start();
  return server;
}

(Verbatim from ts/packages/over-redis/tests/server.ts:1-15. In your own code, replace ../src/index.js with @clamator/over-redis.)

Client-side — call the typed proxy:

import type IORedis from 'ioredis';
import { RedisRpcClient } from '../src/index.js';
import { ArithClient } from './generated/arith.js';

export async function callArith(opts: { redis: IORedis; keyPrefix: string }) {
  const client = new RedisRpcClient({ redis: opts.redis, keyPrefix: opts.keyPrefix, defaultTimeoutMs: 3000 }); // default timeout 30 s on the full round-trip (xadd → handler → reply); no auto-retry on disconnect; timeouts not propagated to server (server completes the handler and writes a reply the client ignores)
  await client.start();
  const arith = new ArithClient(client);
  const r = await arith.add({ a: 2, b: 3 });
  await client.stop();
  return r;
}

(Verbatim from ts/packages/over-redis/tests/client.ts:1-12. In your own code, replace ../src/index.js with @clamator/over-redis.)

server.start() returns once each registered service has its consumer group created and its read loop spawned; it does not block. Your application controls the server's lifetime. Call await server.stop() to shut down — drains in-flight handlers up to graceMs (default 5 s) before disconnecting. start() and stop() are both idempotent (calling either twice is a no-op); once stop() has been called, calling start() again raises — create a new instance to restart. Client-side cancellation (e.g., an aborted Promise) is not propagated to the server; the server completes the handler and writes a reply that the canceled caller never reads.

A single server can host multiple services. Call registerService(contract, handlers) once per contract before start(); each service gets its own consumer group keyed by the service name. Registrations after start() are silently ignored — no consumer group or read loop is created for them.

By default the connection is built from $REDIS_URL (or redis://localhost:6379). Pass redisUrl for a different URL, or redis for a pre-built ioredis instance.

keyPrefix is used as a literal Redis key prefix — clamator does not parse it. Any string Redis accepts as a key works, including slashes, colons, and embedded path-like separators (e.g., my-app/tenant-42). Pick a keyPrefix that doesn't collide with non-clamator usage of the same Redis instance: clamator owns only keys under its prefix (see "Keys owned" below), but a sibling app writing to those same keys would corrupt clamator's streams (and vice versa).

Consumer-group cleanup on server crash. clamator never calls XGROUP DELCONSUMER. A crashed or stopped server leaves its <service>:<instanceId> consumer entry in the group with whatever pending entries it had unacknowledged (those get reclaimed by XAUTOCLAIM after consumerClaimIdleMs). The consumer entry itself persists. Long-lived deployments with frequent restarts accumulate dead-consumer entries; periodic operator cleanup via XINFO CONSUMERS <stream> <group> + XGROUP DELCONSUMER for entries with idle time well past your reclaim window is recommended.

Sharing one injected redis instance across multiple RedisRpcServer and RedisRpcClient instances — and across your application's other Redis usage on the same instance — is safe. Each server/client manages its own subscription internally: for blocking stream reads (XREADGROUP, XREAD on the reply stream), the transport calls redis.duplicate() to obtain a dedicated connection so the injected one stays available for non-blocking ops (XADD, XACK, XAUTOCLAIM).

Per-client reply streams are bounded: the server XADDs replies with MAXLEN ~ replyStreamMaxLen (default 1024) and the client DELs its reply stream on stop(). If a client process crashes without calling stop(), the reply-stream key persists with up to ~1024 entries until manually deleted; there is no Redis-side TTL. From the server's perspective the abandoned reply stream is harmless — XADD to a stream nobody is reading is still a normal stream write; the bounded MAXLEN keeps memory usage finite.

Lifecycle integration

start() returns immediately, leaving the server running in the background — your application owns the wait-and-shutdown loop. The canonical pattern is to await a Promise that resolves when SIGTERM/SIGINT arrives, with the signal handler triggering await server.stop():

import type IORedis from 'ioredis';
import { RedisRpcServer } from '../src/index.js';
import { arithContract } from './contracts/arith.js';
import type { ArithService } from './generated/arith.js';

// Long-running server that stops gracefully on SIGTERM/SIGINT.
// Wire pattern: start the server, then await a Promise that resolves when a
// signal arrives. The signal handler triggers stop() and resolves.
export async function runArithServer(opts: { redis: IORedis; keyPrefix: string }) {
  const server = new RedisRpcServer({ redis: opts.redis, keyPrefix: opts.keyPrefix });
  const handlers: ArithService = {
    add: async ({ a, b }) => ({ sum: a + b }),
    ping: async (_p) => {},
  };
  server.registerService(arithContract, handlers);
  await server.start();
  await new Promise<void>((resolve) => {
    const stop = async () => {
      await server.stop();
      resolve();
    };
    process.once('SIGTERM', stop);
    process.once('SIGINT', stop);
  });
}

(Verbatim from ts/packages/over-redis/tests/server-lifecycle.example.ts:1-25. In your own code, replace ../src/index.js with @clamator/over-redis.)

Key surface

  • RedisRpcServer({ keyPrefix, redis?, redisUrl?, instanceId?, consumerClaimIdleMs?, replyStreamMaxLen?, shutdownGraceMs? })registerService(contract, handlers), start(), stop({ graceMs? }). Defaults: consumerClaimIdleMs: 60_000, replyStreamMaxLen: 1024, shutdownGraceMs: 5_000, stop.graceMs: 5_000.
  • RedisRpcClient({ keyPrefix, redis?, redisUrl?, defaultTimeoutMs?, instanceId? })start(), stop(). Default defaultTimeoutMs: 30_000. The instance is also a ClamatorClient, so it can be wrapped by a generated *Client proxy.

Client lifetime and fan-out

RedisRpcClient and RedisRpcServer are stateful: each spawns a background reply/consumer loop, calls redis.duplicate() for a dedicated blocking connection, and (RedisRpcClient) maintains a per-instance reply-stream key in Redis. Construct once and keep alive for the application's lifetime — do not construct/destroy per call.

A keyPrefix identifies a backend, not a service. One RedisRpcClient can back many service proxies — wrap it with each generated *Client:

import { RedisRpcClient } from '../src/index.js';
import { ArithClient } from './generated/arith.js';
import { LoggerClient } from './generated/logger.js';

// One keyPrefix-pinned RedisRpcClient backs many service proxies.
export async function callMultipleServices(keyPrefix: string) {
  const client = new RedisRpcClient({ keyPrefix });
  await client.start();
  const arith = new ArithClient(client);
  const logger = new LoggerClient(client);
  const sum = await arith.add({ a: 2, b: 3 });
  await logger.log({ msg: `sum=${sum.sum}` });
  await client.stop();
  return sum;
}

(Verbatim from ts/packages/over-redis/tests/multi-service.example.ts:1-15. In your own code, replace ../src/index.js with @clamator/over-redis.)

For multiple backends, construct one RedisRpcClient per keyPrefix and hold them in named variables. The same injected redis instance can back every client, so the marginal cost of an additional keyPrefix is one background task + one duplicated TCP connection + one reply-stream key in Redis. The "multiple backends, one keyPrefix per backend" topology degenerates the worker-pool semantics to a pool of one per keyPrefix — the keyPrefix itself acts as the multi-tenant routing key, and each client's traffic stays within its own backend's command and reply streams.

Call await client.stop() on each client during application shutdown to drain the reply loop and DEL the reply-stream key.

Per-call timeout override

Each generated proxy method accepts an optional opts object whose timeoutMs overrides the client's defaultTimeoutMs for that single call: await arith.add({ a: 2, b: 3 }, { timeoutMs: 60_000 }). When omitted, the client's defaultTimeoutMs applies. Notification proxy methods don't accept opts — they have no reply to wait for. The override is round-trip wall-time (xadd → handler → reply); cancellation and retry semantics are otherwise unchanged.

Custom-command extension

To extend an existing engine with user-defined commands without running codegen, build a Contract by hand and register it on the same RedisRpcServer that already hosts the codegen-emitted services. Each registerService call is independent — the dispatcher does not care whether a contract came from codegen or was authored inline.

import type IORedis from 'ioredis';
import { defineContract, defineMethod } from '@clamator/protocol';
import { z } from 'zod';
import { RedisRpcServer } from '../src/index.js';
import { arithContract } from './contracts/arith.js';
import type { ArithService } from './generated/arith.js';

// Hand-built contract for user-defined commands. Same shape as a codegen-
// emitted contract; just authored inline instead of imported from a generated
// module. Use this pattern when adding services to an engine at registration
// time without going through the codegen pipeline (e.g., user-supplied
// custom commands collected at boot).
const customCommandsContract = defineContract('custom-commands', {
  echo: defineMethod({
    params: z.object({ msg: z.string() }),
    result: z.object({ msg: z.string() }),
  }),
});

// One RedisRpcServer hosts both the codegen-emitted `arith` service and the
// hand-built `custom-commands` service. registerService must be called for
// each contract before start(); each gets its own consumer group keyed by
// the contract's service name.
export async function buildExtendedServer(opts: { redis: IORedis; keyPrefix: string }) {
  const server = new RedisRpcServer({ redis: opts.redis, keyPrefix: opts.keyPrefix });
  const arithHandlers: ArithService = {
    add: async ({ a, b }) => ({ sum: a + b }),
    ping: async (_p) => {},
  };
  const customHandlers = {
    echo: async ({ msg }: { msg: string }) => ({ msg }),
  };
  server.registerService(arithContract, arithHandlers);
  server.registerService(customCommandsContract, customHandlers);
  await server.start();
  return server;
}

(Verbatim from ts/packages/over-redis/tests/custom-commands.example.ts:1-37. In your own code, replace ../src/index.js with @clamator/over-redis.)

Before reaching for this pattern, see @clamator/protocol's "Hand-built contracts" section — runtime contract construction defeats the contract guarantee, and is rarely the right tool. The fixture above is for the case where the set of services is known at startup but assembled from multiple sources (e.g., codegen-emitted core + user-registered extensions).

Worker-pool semantics

Multiple RedisRpcServer instances sharing the same keyPrefix form a competing-consumers pool: each call is processed by exactly one instance. They share a single Redis consumer group per service (named <service>); each server is a unique consumer (named <service>:<instanceId>). XREADGROUP delivers each request to exactly one server. A reclaim loop (XAUTOCLAIM) re-delivers messages unacknowledged for consumerClaimIdleMs (default 60,000 ms). Delivery semantics are at-least-once. To run a single-consumer scenario, run one server.

Handlers must be idempotent. A handler whose execution exceeds consumerClaimIdleMs is reclaimed and re-dispatched to another consumer (or itself), so the same request may run more than once. A client timeout does not propagate to the server (see the client comment above), so a request the client gave up on may still complete server-side.

On start. The server's consumer loop reads new entries via XREADGROUP with id >. Pending entries from a prior session — entries XREADGROUPed but not XACKed before a crash — are reclaimed via XAUTOCLAIM after consumerClaimIdleMs (default 60s) elapses; new entries arriving in the meantime are processed normally.

Per-service dispatch is serialized within a single server. Each registered service has its own consumer loop that reads up to 16 messages per XREADGROUP poll and processes them one at a time (await per message; no detached tasks). Multiple services registered on the same server run their own consumer loops concurrently, but two requests for the same service on the same server are not parallelized.

Single-server in-order invariant. Within one RedisRpcServer instance, this serialization is a documented invariant: a request arriving after another on the same service observes the full effect of the prior request's handler before its own handler runs. Handlers can rely on previous-call mutations being visible (state machines, per-aggregate updates) without explicit locking. The invariant survives as long as handler latency stays well under consumerClaimIdleMs (default 60s) — a handler exceeding the reclaim threshold can be redelivered while still running, which violates the order.

Multi-server / worker-pool ordering is not yet finalized — clamator is pre-1.0. The current behavior under worker-pool fan-out (multiple RedisRpcServer instances sharing the same keyPrefix) is competing-consumers via XREADGROUP, with no in-order guarantee across servers. That falls out of the XREADGROUP design rather than being a stable contract. If you need ordered processing today, the supported pattern is partition by keyPrefix: assign a unique keyPrefix per ordering domain (e.g., per tenant, per aggregate root, per database) and run exactly one server per keyPrefix. Richer multi-server ordering primitives (e.g., sticky transactions, contract-level partition keys) are candidates for a future minor release.

To process one service's requests in parallel within a single server (when ordering is not required), have your handler return after kicking off the work as a detached task (e.g., void doActualWork()); the consumer-loop dispatch becomes effectively non-blocking and the reply confirms acceptance, not completion.

Single-consumer case. For single-server deployments (one server per backend), worker-pool semantics degenerate trivially: the consumer group has one consumer, every request goes to that consumer, and no fan-out concerns apply.

Fire-and-forget operations

Operations the caller doesn't need a reply for — telemetry, cache invalidations, status pings — should be modeled as notifications in the contract (defineNotification on the TS side; MethodEntry(result_model=None, ...) on the Py side). The generated proxy emits a typed notification method that returns once the request envelope is XADDed to Redis; it does not wait for the server to process.

import { RedisRpcClient } from '../src/index.js';
import { ArithClient } from './generated/arith.js';

// Fire-and-forget: notification proxies return once the request is queued in Redis;
// they do not wait for the server to process. Handlers must be idempotent — see
// "Worker-pool semantics" for the at-least-once delivery details.
export async function fireNotification(keyPrefix: string) {
  const client = new RedisRpcClient({ keyPrefix });
  await client.start();
  const arith = new ArithClient(client);
  await arith.ping({});
  await client.stop();
}

(Verbatim from ts/packages/over-redis/tests/fire-and-forget.example.ts:1-13. In your own code, replace ../src/index.js with @clamator/over-redis.)

The await resolves once the message is on the stream. It does not confirm the server received, processed, or finished the call. Notification handlers run under the same at-least-once delivery semantics as method handlers — design them to be idempotent.

Long-running background processes

clamator's RPC surface is request/reply (and fire-and-forget for notifications). It does not provide a server-to-client streaming or progress channel — the typed proxy is a single round-trip. If you need actual monitoring and control of long-running background processes (start, stop, query state, report progress, cancel, sequential and parallel children, persistence across restarts), which is a different concern from RPC, look at Optio: a Python process-management framework that handles exactly that.

Re-entrancy

Cross-service or cross-keyPrefix re-entrancy is safe. A handler can await client.call(...) against a different service, or a different keyPrefix via a different RedisRpcClient, without issue — different streams, different consumer groups, no shared lock.

Same-service same-server re-entrancy deadlocks. A handler that calls await client.call('myservice', 'foo', ...) to invoke its own service's method on the same server will deadlock: the consumer loop reading that service's stream is held by the outer handler, so the inner request sits in the stream forever, and the outer handler waits forever for the inner reply.

Don't go through the RPC layer for in-process composition. If a handler needs the logic of another method on the same service, factor that logic into a regular function or call the other handler directly (handlers in the same handlers literal share scope; class-based handlers can call this.foo(...)). The RPC layer is for wire-side routing; once a request is dispatched, you have direct access to your own code, and a function call achieves the same result with zero serialization, zero validation overhead, and no deadlock risk.

Authorization

clamator has no authorization at the RPC layer. Any process that can read/write this Redis instance can call any registered method or send any notification — there is no caller identity in the wire envelope.

Apply caller-identity checks at the boundary: a gateway (HTTP server, message-bus filter, etc.) enforces who-can-call-what before invoking the typed proxy. Deploy Redis behind a network you trust (TLS, AUTH, ACLs, private VPC); the transport assumes the substrate is already restricted to authenticated participants.

Keys owned under keyPrefix

| Pattern | Type | Purpose | |---|---|---| | <keyPrefix>:cmds:<service> | stream | inbound command stream per service; servers consume via XREADGROUP, clients write via XADD | | <keyPrefix>:replies:<instanceId> | stream | per-client reply stream; servers write replies via XADD, the client reads via XREAD; deleted by client stop() | | <service> | consumer group | competing-consumers pool name (lives inside the cmds stream's metadata; not a top-level key) |

When to reach for this vs. @clamator/over-memory

  • @clamator/over-memory — tests, embedded scenarios, anything single-process.
  • @clamator/over-redis — cross-process, cross-host, durable streams, production.

Links