@zintrust/redis-rpc
v2.4.6
Published
Redis RPC backend for BullMQ queue operations in ZinTrust.
Maintainers
Readme
@zintrust/redis-rpc
@zintrust/redis-rpc provides an HTTP RPC boundary for Redis-backed BullMQ operations. It is designed for runtimes that cannot safely create direct Redis or BullMQ connections, such as Cloudflare Workers, queue dashboards, remote worker controllers, and thin application processes.
Instead of asking every package to emulate Redis commands or BullMQ Lua scripts, the caller sends a typed intent to a backend-owned RPC server. The RPC server owns the real Redis connection, BullMQ queues, queue events, and worker lifecycle state. Application job execution stays in the runtime that has the application processors.
When to use it
Use Redis RPC when:
USE_REDIS_PROXY=trueandREDIS_RPC_URLare both configured.- Queue producers run in an environment where direct TCP Redis is unavailable.
- Queue monitor or worker dashboard code needs queue state without constructing local BullMQ clients.
- You want a single backend process to own Redis credentials and BullMQ coordination details.
Use direct BullMQ/Redis when your process can safely connect to Redis and does not need a proxy boundary.
Package layout
server.tsstarts the HTTP RPC server and exposes/healthand/rpc.backend.tsimplements the default services for queues, workers, queue monitor reads, and raw Redis calls.client.tsis the low-level HTTP client.adapters.tscontains convenience wrappers used by@zintrust/queue-redis,@zintrust/queue-monitor, and worker tooling.env.tsreads runtime configuration from.envandprocess.env.types.tscontains the public request, client, backend, and server types.test-bullmq.mjsis an end-to-end smoke test against a real Redis instance.
Installation
npm install @zintrust/redis-rpcFor monorepo development this package is built like the other ZinTrust packages:
npm --prefix packages/redis-rpc run buildConfiguration
The server and client read these variables:
| Variable | Default | Purpose |
| -------------------------- | ------------------------------------ | --------------------------------------------------------------- |
| REDIS_RPC_URL | empty | Full client base URL, for example https://queues.example.com. |
| REDIS_RPC_HOST | 127.0.0.1 | Server listen host and client host fallback. |
| REDIS_RPC_PORT | 8794 | Server listen port and client port fallback. |
| REDIS_RPC_SECRET | REDIS_PROXY_SECRET or APP_KEY | Shared secret sent as x-redis-rpc-secret. |
| REDIS_RPC_REDIS_HOST | REDIS_HOST or 127.0.0.1 | Redis host used by the RPC backend. |
| REDIS_RPC_REDIS_PORT | REDIS_PORT or 6379 | Redis port used by the RPC backend. |
| REDIS_RPC_REDIS_PASSWORD | REDIS_PASSWORD | Redis password used by the RPC backend. |
| REDIS_RPC_REDIS_DB | REDIS_QUEUE_DB, REDIS_DB, or 0 | Redis database used for queue operations. |
| REDIS_RPC_BULLMQ_PREFIX | BULLMQ_PREFIX or bull | BullMQ key prefix used by the backend. |
| REDIS_RPC_TIMEOUT_MS | 30000 | Client-side timeout used by integrations. |
| REDIS_RPC_RETRY_MAX | 2 | Client-side retry count used by integrations. |
| REDIS_RPC_RETRY_DELAY_MS | 500 | Client-side retry delay used by integrations. |
Custom request headers
The client supports injecting extra HTTP headers into every outgoing RPC request. This uses the same environment-variable convention as all other ZinTrust proxy adapters (REDIS_PROXY_HEADERS_* for the redis-proxy, MYSQL_PROXY_HEADERS_* for MySQL, and so on).
Pattern: REDIS_RPC_PROXY_HEADERS_{HEADER_NAME}=value
Underscores in HEADER_NAME are converted to hyphens to form the actual header name.
| Environment variable | HTTP header sent |
| --- | --- |
| REDIS_RPC_PROXY_HEADERS_X_Tenant_Id=abc | x-tenant-id: abc |
| REDIS_RPC_PROXY_HEADERS_Authorization=Bearer t | authorization: Bearer t |
| REDIS_RPC_PROXY_HEADERS_X_Trace_Id=xyz | x-trace-id: xyz |
| REDIS_RPC_PROXY_HEADERS_X_Custom_Header=foo | x-custom-header: foo |
These headers are read once when createRedisRpcClient() is called and are sent on every request. They are merged after x-redis-rpc-secret, so they cannot accidentally overwrite authentication.
You can also pass headers directly in code. Programmatic headers take priority over env-sourced ones when a key collides:
const client = createRedisRpcClient({
baseUrl: process.env.REDIS_RPC_URL,
secret: process.env.REDIS_RPC_SECRET,
headers: {
'x-tenant-id': 'abc',
'x-trace-id': '123',
},
});Both sources are combined: { ...rpcClientHeaders(), ...options.headers }. To inspect what headers were auto-detected from env, call the exported helper directly:
import { rpcClientHeaders } from '@zintrust/redis-rpc';
console.log(rpcClientHeaders()); // { 'x-tenant-id': 'abc', ... } or undefinedSet both USE_REDIS_PROXY=true and REDIS_RPC_URL to make supported ZinTrust packages select Redis RPC automatically. USE_REDIS_PROXY=true by itself does not enable Redis RPC; it only says the process is allowed to use a Redis proxy transport.
Running the server
From the repository:
tsx packages/redis-rpc/server.tsFrom a ZinTrust project with @zintrust/redis-rpc installed:
zin redis-rpc
# or
zin s redis-rpcCLI overrides:
zin redis-rpc --host 0.0.0.0 --port 8794 --redis-host 127.0.0.1 --redis-db 1Health check:
curl http://127.0.0.1:8794/healthExample RPC request:
curl -X POST http://127.0.0.1:8794/rpc \
-H 'content-type: application/json' \
-H "x-redis-rpc-secret: $REDIS_RPC_SECRET" \
-d '{
"requestId": "example-1",
"service": "queue",
"method": "add",
"payload": {
"target": "emails",
"args": ["send", {"to":"[email protected]"}, {"attempts":3}]
}
}'Client usage
import { createRedisRpcClient } from '@zintrust/redis-rpc';
const client = createRedisRpcClient({
baseUrl: process.env.REDIS_RPC_URL,
secret: process.env.REDIS_RPC_SECRET,
});
const job = await client.queue('add', {
target: 'emails',
args: ['send', { to: '[email protected]' }, { attempts: 3 }],
});
const counts = await client.queue('getJobCounts', { target: 'emails' });Adapter usage
import {
createBullMqRpcQueue,
createQueueMonitorRpcDriver,
createWorkerRpcRuntime,
} from '@zintrust/redis-rpc/adapters';
const queue = createBullMqRpcQueue('emails');
await queue.add('send', { to: '[email protected]' }, { attempts: 3 });
const monitor = createQueueMonitorRpcDriver();
const snapshot = await monitor.getSnapshot(['emails']);
const workers = createWorkerRpcRuntime();
await workers.startWorker('emails', 'emails-worker', { processor: 'echo' });Built-in services
queue
Supported methods:
add/enqueuedequeueackfail/nackget/getJobgetJobsgetJobCounts/countscount/lengthpauseresumedraincleanremoveJobretryJobpromoteJobobliteratecloseQueue
Queue requests identify the queue with payload.target, payload.queueName, or payload.queue.
dequeue is for pull-based runtimes. It uses BullMQ's own atomic waiting-to-active transition and returns { id, name, payload, attempts }. The caller must report the result with ack or fail / nack. ack accepts an optional return value as payload.returnValue, payload.returnvalue, or the second positional arg. fail / nack accepts an optional failure reason as payload.reason or the second positional arg.
worker
Supported methods:
start/startWorker/startAppWorkerrestart/restartWorker/restartAppWorkerstop/stopWorkerlist
The Redis RPC backend does not import application processor modules and does not create BullMQ Worker consumers. Worker lifecycle calls persist desired state in Redis under <BULLMQ_PREFIX>:__rpc_workers. The record includes worker name, queue name, optional processor spec, concurrency, status, and update time.
This keeps Redis RPC as a coordination service. A Node worker process, Cloudflare Worker, container, or other runtime that already has the application code should pull jobs with queue.dequeue, execute the processor locally, then complete the job with queue.ack or queue.fail.
worker.list returns the persisted lifecycle records plus queue-discovery placeholders for BullMQ queues that have no registered worker record. Placeholders use source: "bullmq-discovery" and names like <queueName>:redis-rpc; persisted records use source: "redis-rpc-registry".
queue-monitor
Supported methods:
getSnapshotgetEventsgetRecentJobsForQueue
@zintrust/queue-monitor uses this service automatically when both USE_REDIS_PROXY=true and REDIS_RPC_URL are configured.
redis
Supported methods:
pingcallpipelinemulti
Use raw Redis calls sparingly. Prefer queue and monitor methods because they preserve the BullMQ abstraction and keep Redis command details out of callers.
call accepts either positional args ({ "args": ["SET", "key", "value", "EX", 60] }) or a command plus args. Object-style option arguments are expanded for Redis clients, so cache calls such as set(key, value, { EX: 60 }) work through RPC. pipeline and multi accept a commands array:
{
"service": "redis",
"method": "multi",
"payload": {
"commands": [
{ "command": "SET", "args": ["key1", "value1"] },
{ "command": "INCRBY", "args": ["counter", 1] }
],
"transaction": true
}
}Custom services
import { createRedisRpcBackend, listenRedisRpcServer } from '@zintrust/redis-rpc/server';
const backend = createRedisRpcBackend();
backend.registerService('reports', async ({ method, payload }) => {
if (method !== 'refresh') throw new Error(`Unsupported reports method: ${method}`);
return { accepted: true, reportId: payload.reportId };
});
await listenRedisRpcServer({ backend, port: 8794 });ZinTrust integrations
@zintrust/queue-redisroutesenqueue,dequeue,ack,length, anddrainthrough Redis RPC only whenUSE_REDIS_PROXY=trueandREDIS_RPC_URLis configured.@zintrust/queue-monitorreads snapshots, job counts, recent jobs, and retry operations through Redis RPC in the same explicit mode.- Core Redis transport uses Redis RPC for raw Redis commands when both flags are set, so cache and lock code can use Redis RPC without the older Redis HTTP proxy. When
USE_REDIS_PROXY=truebutREDIS_RPC_URLis empty, core Redis transport falls back to the legacy Redis HTTP proxy. @zintrust/cache-redissupports Redis RPC for the documented cache surface:get,set,delete,clear,has,increment,decrement,getRedisClient().pipeline(), andgetRedisClient().multi().- Core env scaffolding includes the
REDIS_RPC_*variables so generated projects can opt in without hand-editing config types. - The existing queue HTTP gateway remains available for signed
/api/_sys/queue/rpctraffic; when the queue driver is in Redis RPC mode, that gateway delegates to Redis RPC-backed queue operations.
Verification
Run the focused checks:
npm --prefix packages/redis-rpc run type-check
npm --prefix packages/redis-rpc run build
npm --prefix packages/queue-redis run build
npm --prefix packages/queue-monitor run build
npm --prefix packages/workers run buildRun the BullMQ smoke test when a Redis instance is available:
npm --prefix packages/redis-rpc run testThe smoke test starts the RPC server, verifies Redis PING, adds jobs, exercises queue lifecycle calls, reads queue monitor snapshots, removes jobs, drains, cleans, and obliterates the temporary queue.
