rrq-ts
v0.11.1
Published
RRQ TypeScript producer and runner runtime.
Readme
RRQ TypeScript
TypeScript/JavaScript client for RRQ, the distributed job queue with a Rust-powered orchestrator.
What is RRQ?
RRQ (Reliable Redis Queue) is a distributed job queue that separates the hard parts (scheduling, retries, locking, timeouts) into a Rust orchestrator while letting you write job handlers in your preferred language. It uses Redis as the source of truth with atomic operations for reliability.
Why RRQ?
- Language flexibility - Write job handlers in TypeScript, Python, or Rust
- Rust orchestrator - The complex distributed systems logic is handled by battle-tested Rust code
- Production features built in - Retries, dead letter queues, timeouts, cron scheduling, distributed tracing
- Redis simplicity - No separate databases; everything lives in Redis with predictable semantics
This Package
This package provides:
- Producer client - Enqueue jobs from Node.js/Bun applications
- Runner runtime - Execute job handlers written in TypeScript
Works with Node.js 20+ and Bun.
Quick Start
1. Install
npm install rrq-ts
# or
bun add rrq-ts2. Enqueue jobs (Producer)
import { RRQClient } from "rrq-ts";
const client = new RRQClient({
config: { redisDsn: "redis://localhost:6379/0" },
});
const jobId = await client.enqueue("send_email", {
params: { to: "[email protected]", template: "welcome" },
queueName: "emails",
maxRetries: 5,
});
console.log(`Enqueued job: ${jobId}`);
await client.close();3. Write job handlers (Runner)
import { RunnerRuntime, Registry } from "rrq-ts";
const registry = new Registry();
registry.register("send_email", async (request) => {
const { to, template } = request.params;
// Your email sending logic here
await sendEmail(to, template);
return { sent: true, to };
});
const runtime = new RunnerRuntime(registry);
// RRQ launches runners with: --tcp-socket host:port
await runtime.runFromArgs();4. Configure (rrq.toml)
[rrq]
redis_dsn = "redis://localhost:6379/0"
default_runner_name = "node"
[rrq.runners.node]
type = "socket"
cmd = ["node", "dist/runner.js"]
tcp_port = 9000
pool_size = 4
max_in_flight = 105. Run
# Install the RRQ orchestrator
cargo install rrq
# or download from releases
# Start the orchestrator (spawns runners automatically)
rrq worker run --config rrq.tomlProducer API
Enqueue Options
interface EnqueueOptions {
params?: Record<string, unknown>; // Job parameters
queueName?: string; // Target queue (default: "default")
jobId?: string; // Custom job ID
maxRetries?: number; // Max retry attempts
jobTimeoutSeconds?: number; // Execution timeout
resultTtlSeconds?: number; // How long to keep results
enqueueTime?: Date; // Explicit enqueue timestamp
deferUntil?: Date; // Schedule for specific time
deferBySeconds?: number; // Delay execution
traceContext?: Record<string, string>; // Distributed tracing
}Producer Config
interface ProducerConfig {
redisDsn: string;
queueName?: string;
maxRetries?: number;
jobTimeoutSeconds?: number;
resultTtlSeconds?: number;
idempotencyTtlSeconds?: number;
correlationMappings?: Record<string, string>; // e.g. { tenant_id: "params.tenant.id" }
}Unique Jobs (Idempotency)
const jobId = await client.enqueueWithUniqueKey(
"process_order",
"order-123", // unique key
{ params: { orderId: "123" } },
);Rate Limiting
const jobId = await client.enqueueWithRateLimit("sync_user", {
params: { userId: "456" },
rateLimitKey: "user-456",
rateLimitSeconds: 60,
});
if (jobId === null) {
console.log("Rate limited");
}Debouncing
await client.enqueueWithDebounce("save_document", {
params: { docId: "789" },
debounceKey: "doc-789",
debounceSeconds: 5,
});Job Status
const status = await client.getJobStatus(jobId);
console.log(status);Runner API
Handler Signature
type Handler = (
request: ExecutionRequest,
signal: AbortSignal,
) => Promise<ExecutionOutcome | unknown> | ExecutionOutcome | unknown;Cancellation Behavior
- Handlers receive an
AbortSignal. - Runner
cancelrequests and deadline timeouts abort that signal. - Pass the signal to downstream APIs (
fetch, database clients, etc.) so in-flight work stops promptly. - Keep handlers idempotent and retry-safe for libraries that do not support cancellation.
Execution Request
interface ExecutionRequest {
protocol_version: string;
job_id: string;
request_id: string;
function_name: string;
params: Record<string, unknown>;
context: {
job_id: string;
attempt: number;
queue_name: string;
enqueue_time: string;
deadline?: string | null;
trace_context?: Record<string, string> | null;
correlation_context?: Record<string, string> | null;
worker_id?: string | null;
};
}Outcome Types
// Success
const success: ExecutionOutcome = {
job_id: jobId,
request_id: requestId,
status: "success",
result: { result: "data" },
};
// Failure (may be retried)
const failure: ExecutionOutcome = {
job_id: jobId,
request_id: requestId,
status: "error",
error: { message: "Something went wrong" },
};
// Explicit retry after delay
const retry: ExecutionOutcome = {
job_id: jobId,
request_id: requestId,
status: "retry",
error: { message: "Rate limited" },
retry_after_seconds: 60,
};OpenTelemetry
import { RunnerRuntime, Registry, OtelTelemetry } from "rrq-ts";
const runtime = new RunnerRuntime(registry, new OtelTelemetry());
// RRQ launches runners with: --tcp-socket host:port
await runtime.runFromArgs();Producer FFI Setup
The producer uses a Rust FFI library for consistent behavior across languages. The library is loaded from:
RRQ_PRODUCER_LIB_PATHenvironment variablerrq-ts/bin/<platform>-<arch>/(for published packages)rrq-ts/bin/(for development)
Build from source:
cargo build -p rrq-producer --release
# Copy to bin/darwin-arm64/ or bin/linux-x64/ as appropriateRelated Packages
| Package | Language | Purpose | | ----------------------------------------------------- | ---------- | -------------------------------- | | rrq-ts | TypeScript | Producer + runner (this package) | | rrq | Python | Producer + runner | | rrq | Rust | Orchestrator binary | | rrq-producer | Rust | Native producer | | rrq-runner | Rust | Native runner |
Requirements
- Node.js 20+ or Bun
- Redis 5.0+
- RRQ orchestrator binary
License
MIT
