npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@valentinkolb/sync

v2.2.0

Published

Distributed synchronization primitives for Bun and TypeScript

Readme

@valentinkolb/sync

Distributed synchronization primitives for Bun and TypeScript, backed by Redis.

Philosophy

  • Bun-native - Built for Bun. Uses Bun.redis, Bun.sleep, RedisClient directly. No Node.js compatibility layers.
  • Minimal dependencies - Only zod as a peer dependency. Everything else is Bun built-ins and Redis Lua scripts.
  • Composable building blocks - Nine focused primitives that work independently or together. job composes queue + topic internally, scheduler composes durable dispatch on top of job.
  • Consistent API - Every module follows the same pattern: moduleName({ id, ...config }) returns an instance. No classes, no .create(), no new.
  • Atomic by default - All Redis operations use Lua scripts for atomicity. No multi-step race conditions at the Redis level.
  • Schema-validated - Queue, topic, job, registry, and ephemeral payloads are validated with Zod at the boundary. Invalid data never enters Redis.

Features

  • Rate limiting - Sliding window algorithm with atomic Lua scripts
  • Distributed mutex - SET NX-based locking with retry, extend, and auto-expiry
  • Queue - Durable work queue with leases, DLQ, delayed messages, and idempotency
  • Topic - Pub/sub with consumer groups, at-least-once delivery, and live streaming
  • Job - Durable job processing built on queue + topic with retries, cancellation, and event sourcing
  • Scheduler - Distributed cron scheduler with idempotent registration, leader fencing, and durable dispatch via job submission
  • Registry - Generic typed service/config registry with native prefix queries, CAS, TTL-backed liveness, and push-based change streams
  • Ephemeral - TTL-based ephemeral key/value store with typed snapshots and event stream for upsert/touch/delete/expire
  • Retry utility - Small transport-aware retry helper with sensible defaults and per-call override

For complete API details, use the modular skills in skills/, especially each feature's references/api.md such as sync-registry, sync-scheduler, sync-job, and sync-topic.

Installation

bun add @valentinkolb/sync zod

Retry Utility

KISS transport retry helper for Redis/network hiccups. Defaults are exported as DEFAULT_RETRY_OPTIONS; override only when needed.

import {
  retry,
  DEFAULT_RETRY_OPTIONS,
  isRetryableTransportError,
} from "@valentinkolb/sync";

// default usage (recommended)
const value = await retry(() => fragileCall());

console.log(DEFAULT_RETRY_OPTIONS);
// {
//   attempts: 8,
//   minDelayMs: 100,
//   maxDelayMs: 2000,
//   factor: 2,
//   jitter: 0.2,
//   retryIf: isRetryableTransportError
// }

// per-call override (edge cases only)
const value2 = await retry(
  () => fragileCall(),
  {
    attempts: 12,
    maxDelayMs: 5_000,
    retryIf: isRetryableTransportError,
  },
);

Internal default behavior:

  • Queue/topic/registry/ephemeral stream({ wait: true }) and topic live() use transport retries to stay alive across brief outages.
  • One-shot calls (recv(), stream({ wait: false }), send()/pub()/submit()) keep explicit failure semantics unless you wrap them with retry(...).

Requires Bun and a Redis-compatible server (Redis 6.2+, Valkey, Dragonfly).

Install Agent Skills (optional)

This repository ships reusable agent skills in skills/.
Using the Vercel Skills CLI, install them with:

# list available skills from this repo
bunx skills add https://github.com/valentinkolb/sync --list

# install all skills (project-local)
bunx skills add https://github.com/valentinkolb/sync --skill '*'

# install selected skills (example)
bunx skills add https://github.com/valentinkolb/sync \
  --skill sync-registry \
  --skill sync-scheduler \
  --skill sync-job \
  --skill sync-retry

Rate Limit

Sliding window rate limiter. Atomic via Lua script.

import { ratelimit, RateLimitError } from "@valentinkolb/sync";

const limiter = ratelimit({
  id: "api",
  limit: 100,
  windowSecs: 60,
});

const result = await limiter.check("user:123");
// { limited: false, remaining: 99, resetIn: 58432 }

try {
  await limiter.checkOrThrow("user:123");
} catch (error) {
  if (error instanceof RateLimitError) {
    console.log(`Retry in ${error.resetIn}ms`);
  }
}

Mutex

Distributed lock with retry + jitter, TTL auto-expiry, and Lua-based owner-only release.

import { mutex, LockError } from "@valentinkolb/sync";

const m = mutex({ id: "checkout", defaultTtl: 5000 });

// Automatic acquire + release
const result = await m.withLock("order:123", async (lock) => {
  await m.extend(lock, 10_000); // extend if needed
  return await processOrder();
});

// Throws LockError if lock cannot be acquired
await m.withLockOrThrow("order:123", async () => {
  await doExclusiveWork();
});

// Manual acquire/release
const lock = await m.acquire("order:123");
if (lock) {
  try { /* work */ } finally { await m.release(lock); }
}

Queue

Durable work queue with at-least-once delivery, lease-based visibility, delayed messages, idempotency, and dead-letter queue.

import { z } from "zod";
import { queue } from "@valentinkolb/sync";

const q = queue({
  id: "mail.send",
  schema: z.object({ to: z.string().email(), subject: z.string() }),
  delivery: { defaultLeaseMs: 60_000, maxDeliveries: 5 },
  limits: { maxMessageAgeMs: 7 * 24 * 60 * 60 * 1000 },
});

// Send
await q.send({
  data: { to: "[email protected]", subject: "Welcome" },
  idempotencyKey: "welcome:[email protected]",
  delayMs: 5_000,               // optional: deliver after 5s
  meta: { traceId: "abc-123" }, // optional metadata
});

// Receive + process
const msg = await q.recv({ wait: true, timeoutMs: 30_000 });
if (msg) {
  try {
    await sendMail(msg.data);
    await msg.ack();
  } catch (error) {
    await msg.nack({ delayMs: 5_000, error: String(error) });
  }
}

// Stream processing
for await (const m of q.stream()) {
  await handle(m.data);
  await m.ack();
}

// Multiple readers (independent blocking clients)
const reader = q.reader();
const msg2 = await reader.recv({ signal: abortController.signal });

Queue features

  • Lease-based delivery: Messages are invisible to other consumers while leased. Call msg.touch() to extend.
  • Dead-letter queue: After maxDeliveries failed attempts, messages move to DLQ.
  • Delayed messages: send({ delayMs }) or nack({ delayMs }) for retry delays.
  • Idempotency: send({ idempotencyKey }) deduplicates within a configurable TTL.
  • Multi-tenant: Pass tenantId to send() and recv() for isolated queues.
  • AbortSignal: Pass signal to recv() for graceful shutdown.
  • Transport resilience in stream loops: stream({ wait: true }) auto-retries transient transport errors and keeps consuming after short Redis outages.
  • One-shot semantics stay explicit: recv() and stream({ wait: false }) keep direct error semantics (no hidden infinite retry wrapper).

Topic

Pub/sub with Redis Streams. Supports consumer groups (at-least-once, load-balanced) and live streaming (best-effort, all events).

import { z } from "zod";
import { topic } from "@valentinkolb/sync";

const t = topic({
  id: "order.events",
  schema: z.object({ type: z.string(), orderId: z.string() }),
  retentionMs: 7 * 24 * 60 * 60 * 1000,
});

// Publish
await t.pub({
  data: { type: "order.confirmed", orderId: "o1" },
  idempotencyKey: "confirm:o1",
  meta: { source: "checkout" },
});

// Consumer group reader (at-least-once, load-balanced across consumers)
const reader = t.reader("mailer");
for await (const event of reader.stream()) {
  await sendConfirmationEmail(event.data);
  await event.commit();
}

// Multiple groups receive the same events independently
const analytics = t.reader("analytics");
const billing = t.reader("billing");

// Live stream (best-effort, no consumer group, no commit needed)
for await (const event of t.live({ signal: ac.signal })) {
  console.log(event.data);
}

// Replay from a specific cursor
for await (const event of t.live({ after: "0-0" })) {
  // receives all stored events from the beginning
}

Topic features

  • Consumer groups: Each group tracks its own position. Multiple consumers in the same group load-balance.
  • Live streaming: t.live() uses XREAD for real-time, best-effort delivery to all listeners.
  • Replay: Pass after: "0-0" to live() to replay all stored events.
  • Retention: Automatic XTRIM based on retentionMs.
  • Multi-tenant: Pass tenantId to pub() and recv() for isolated streams.
  • AbortSignal: Pass signal to recv(), stream(), and live().
  • Transport resilience in stream loops: reader().stream({ wait: true }) and live() auto-retry transient transport errors by default.
  • One-shot semantics stay explicit: recv() and stream({ wait: false }) keep direct error semantics.

Job

Durable job processing built on queue + topic. Supports retries with backoff, cancellation, event sourcing, and graceful shutdown.

import { z } from "zod";
import { job } from "@valentinkolb/sync";

const sendOrderMail = job({
  id: "mail.send-order",
  schema: z.object({ orderId: z.string(), to: z.string().email() }),
  defaults: { maxAttempts: 3, backoff: { kind: "exp", baseMs: 1000 } },
  process: async ({ ctx, input }) => {
    // ctx.signal is aborted on timeout or error
    if (ctx.signal.aborted) return;

    await ctx.heartbeat(); // extend lease
    await ctx.step({ id: "send", run: () => mailProvider.send(input) });
    return { ok: true };
  },
});

// Submit
const id = await sendOrderMail.submit({
  input: { orderId: "o1", to: "[email protected]" },
  key: "mail:o1",         // idempotency key
  delayMs: 5_000,         // schedule for later
  maxAttempts: 3,
  backoff: { kind: "exp", baseMs: 1000, maxMs: 30_000 },
});

// Wait for completion
const terminal = await sendOrderMail.join({ id, timeoutMs: 60_000 });
// terminal.status: "completed" | "failed" | "cancelled" | "timed_out"

// Cancel
await sendOrderMail.cancel({ id, reason: "user-request" });

// Event stream
const events = sendOrderMail.events(id);
for await (const e of events.reader("orchestrator").stream({ wait: false })) {
  console.log(e.data.type); // "submitted" | "started" | "heartbeat" | "retry" | "completed" | "failed" | "cancelled"
  await e.commit();
}

// Live events
for await (const e of events.live({ signal: ac.signal })) {
  console.log(e.data.type);
}

// Graceful shutdown
sendOrderMail.stop();

Job features

  • Automatic retries: Fixed or exponential backoff with configurable max attempts.
  • Lease timeout: Jobs that exceed leaseMs are automatically timed out.
  • Cancellation: Cancel in-flight or queued jobs. Workers detect cancellation between steps.
  • Event sourcing: Every state transition emits a typed event to a per-job topic.
  • Idempotent submit: Pass key to deduplicate submissions atomically.
  • AbortSignal: ctx.signal is aborted on timeout, error, or cancellation.
  • Graceful shutdown: stop() signals the worker loop to exit.
  • Per-job state TTL: Each job's state has its own Redis TTL (7 days default).
  • Worker transport resilience: Internal worker receive loop auto-retries transient transport errors and self-recovers after short Redis outages.

Scheduler

Distributed cron scheduler for horizontally scaled apps. Registration is idempotent per schedule id; one active leader dispatches due slots and submits durable jobs.

import { z } from "zod";
import { job, scheduler } from "@valentinkolb/sync";

const cleanup = job({
  id: "cleanup-temp",
  schema: z.object({ scope: z.string() }),
  process: async ({ input }) => {
    await runCleanup(input.scope);
  },
});

const sched = scheduler({
  id: "platform",
  onMetric: (metric) => console.log(metric),
});

sched.start();

await sched.register({
  id: "cleanup-hourly",    // idempotent key
  cron: "0 * * * *",       // every hour
  tz: "Europe/Berlin",
  job: cleanup,
  input: { scope: "tmp" },
  misfire: "skip",         // default: do not replay backlog
  meta: { owner: "ops" },
});

await sched.triggerNow({
  id: "cleanup-hourly",
  key: "ops-manual-run-1", // optional but recommended for retry-safe manual triggers
});

Scheduler features

  • Idempotent upsert registration: repeated register({ id, ... }) creates once, then updates in place.
  • No fixed leader pod: leadership uses a renewable Redis lease (mutex) with epoch fencing.
  • Durable dispatch: each cron slot maps to deterministic job key (scheduleId:slotTs) to prevent duplicates.
  • Durable manual trigger: triggerNow({ id, key? }) submits immediately through the same durable job path. Durability begins once triggerNow() returns a jobId.
  • Misfire policies: skip (default), catch_up_one, catch_up_all with cap (maxCatchUpRuns).
  • Failure isolation: submit retry + backoff, dispatch DLQ, configurable threshold for auto-advance after repeated failures.
  • Handler safety: optional strictHandlers mode (default true) relinquishes leadership when required handlers are missing.

Manual trigger notes:

  • triggerNow() does not require start() and does not alter cron state (nextRunAt, misfire handling, due slots).
  • triggerNow() reuses the registered schedule input. If you need custom input per run, call the underlying job.submit(...) directly.
  • Pass key for retry-safe idempotent manual triggering. Without key, repeated calls create additional runs.

Registry

Typed service/config registry with exact-key reads, native prefix listing, compare-and-swap, optional TTL-backed liveness, and stream-based change notifications.

import { z } from "zod";
import { registry } from "@valentinkolb/sync";

const services = registry({
  id: "services",
  schema: z.object({
    appId: z.string(),
    kind: z.enum(["instance", "setting", "flag"]),
    url: z.string().url().optional(),
  }),
});

await services.upsert({
  key: "apps/contacts/instances/i-1",
  value: { appId: "contacts", kind: "instance", url: "https://contacts-1.internal" },
  ttlMs: 15_000,
});

await services.touch({ key: "apps/contacts/instances/i-1" });

const active = await services.list({
  prefix: "apps/contacts/instances/",
  status: "active",
});

const snap = await services.list({ prefix: "apps/" });
const ac = new AbortController();

for await (const ev of services.reader({ prefix: "apps/", after: snap.cursor }).stream({ signal: ac.signal })) {
  console.log(ev.type);
}

Registry features

  • Native prefix listing: list({ prefix }) uses Redis-side lex indexing instead of client-side filtering.
  • Typed payloads: values are validated with Zod on write and parsed on read.
  • CAS updates: cas({ key, version, value }) is atomic in Redis.
  • Optional liveness: records with ttlMs are live and refreshed via touch().
  • Expired-state visibility: recently expired entries can be queried via status: "expired".
  • Snapshot + cursor: list() returns a cursor for replay-safe handoff into reader().
  • Scoped streams: watch a single key, a namespace prefix, or the whole registry.
  • Namespace-safe prefixes: list({ prefix }) and reader({ prefix }) expect slash-suffixed namespace prefixes such as apps/contacts/.
  • Bounded growth: configurable maxEntries, payload-size limits, root-stream retention, per-stream max length, and tombstone retention.

Ephemeral

Typed ephemeral key/value store with TTL semantics and stream events. Useful for short-lived state like presence, worker heartbeats, or temporary coordination hints.

import { z } from "zod";
import { ephemeral } from "@valentinkolb/sync";

const presence = ephemeral({
  id: "presence",
  schema: z.object({ nodeId: z.string(), status: z.enum(["up", "down"]) }),
  ttlMs: 30_000,
});

await presence.upsert({ key: "worker:42", value: { nodeId: "42", status: "up" } });
await presence.touch({ key: "worker:42" }); // extend TTL

const snap = await presence.snapshot();
console.log(snap.entries.length, snap.cursor);

for await (const event of presence.reader({ after: snap.cursor }).stream()) {
  console.log(event.type);
}

Ephemeral features

  • TTL-first model: each key has an independent expiration and can be extended with touch().
  • Bounded capacity: configurable maxEntries and payload-size limits.
  • Typed values: schema validation on write and read-path parsing.
  • Change stream: emits upsert, touch, delete, expire, and overflow events.
  • Snapshot + cursor: take a consistent snapshot and continue with stream replay.
  • Tenant isolation: optional per-operation tenantId for keyspace isolation.
  • Transport resilience in stream loops: reader().stream({ wait: true }) auto-retries transient transport errors by default.

Testing

bun test --preload ./tests/preload.ts
# fault-tolerance suites
bun run test:fault
# all test files
bun run test:all

Requires a Redis-compatible server on localhost:6399 (configured in tests/preload.ts).

Contributing

Setup

git clone https://github.com/valentinkolb/sync.git
cd sync
bun install

Running tests

You need a Redis-compatible server on port 6399. The easiest way is Docker/Podman:

docker run -d --name valkey -p 6399:6379 valkey/valkey:latest
bun test --preload ./tests/preload.ts
bun run test:fault

Project structure

src/
  ratelimit.ts       # Sliding window rate limiter
  mutex.ts           # Distributed lock
  queue.ts           # Durable work queue
  topic.ts           # Pub/sub with consumer groups
  job.ts             # Job processing (composes queue + topic)
  scheduler.ts       # Distributed cron scheduler (durable dispatch via job)
  registry.ts        # Typed registry with prefix queries, CAS, and liveness
  ephemeral.ts       # TTL-based ephemeral store with event stream
  retry.ts           # Generic transport-aware retry utility
  internal/
    cron.ts          # Cron parsing/next timestamp + timezone validation
    job-utils.ts     # Job helper functions (retry, timeout, parsing)
    topic-utils.ts   # Stream entry parsing helpers
tests/
  *.test.ts          # Integration tests (require Redis)
  *-utils.unit.test.ts  # Pure unit tests
  preload.ts         # Sets REDIS_URL for test environment
index.ts             # Public API exports
skills/              # Modular agent skills + per-feature API references (including sync-registry)

Guidelines

  • Keep it minimal. No abstractions for one-time operations.
  • Every Redis mutation must be in a Lua script for atomicity.
  • Validate at boundaries (user input), trust internal data.
  • All modules follow the moduleName({ id, ...config }) factory pattern.
  • Tests go in tests/. Use test:q, test:t, etc. as prefix in tests to avoid collisions. Each test file has a beforeEach that cleans up its own keys.
  • Run bun test --preload ./tests/preload.ts before submitting a PR. All tests must pass.

License

MIT