npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@alkdev/pubsub

v0.1.0

Published

Type-safe publish/subscribe with pluggable event target adapters (in-process, Redis, WebSocket, Worker)

Readme

@alkdev/pubsub

Type-safe publish/subscribe with pluggable event target adapters. Transport layer only — no call protocol or coordination semantics.

Every event is an EventEnvelope<TType, TPayload> with { type, id, payload }. Adapters implement the TypedEventTarget interface so you can swap transports without changing your subscribe logic.

Install

npm install @alkdev/pubsub

For Redis transport:

npm install ioredis

WebSocket and Worker adapters use built-in APIs — no additional dependencies.

Quick Start

In-Process (default)

import { createPubSub } from "@alkdev/pubsub";

type EventMap = {
  "user.created": { name: string };
  "order.placed": { orderId: string };
};

const pubsub = createPubSub<EventMap>();

pubsub.subscribe("user.created", (_, payload) => {
  console.log(`New user: ${payload.name}`);
});

pubsub.publish("user.created", "id-1", { name: "Alice" });

Redis

import { createPubSub } from "@alkdev/pubsub";
import { createRedisEventTarget } from "@alkdev/pubsub/event-target-redis";
import Redis from "ioredis";

const publishClient = new Redis();
const subscribeClient = new Redis();

const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient,
});

const pubsub = createPubSub({ eventTarget });

WebSocket Client (browser/Node)

import { createPubSub } from "@alkdev/pubsub";
import { createWebSocketClientEventTarget } from "@alkdev/pubsub/event-target-websocket-client";

const ws = new WebSocket("ws://localhost:8080");
const eventTarget = createWebSocketClientEventTarget(ws);

const pubsub = createPubSub({ eventTarget });

WebSocket Server (Node)

import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";

const server = createWebSocketServerEventTarget({
  onConnection(spoke, ws) { /* new client connected */ },
  onDisconnection(spoke, ws) { /* client disconnected */ },
  maxBufferedAmount: 1_048_576,
  onBackpressure(ws, bufferedAmount) { /* optional backpressure signal */ },
});

// When a new WebSocket connects:
server.addConnection(ws);

// When it disconnects:
server.removeConnection(ws);

// Subscribe local handlers:
server.addEventListener("user.created:id-1", (event) => {
  // event.detail is the EventEnvelope
});

// Publish to subscribed connections:
server.dispatchEvent(new CustomEvent("user.created:id-1", { detail: envelope }));

Worker (Host ↔ Thread)

// Host (main thread)
import { createWorkerHostEventTarget } from "@alkdev/pubsub/event-target-worker";

const worker = new Worker("./worker.js");
const eventTarget = createWorkerHostEventTarget(worker);
// Worker thread
import { createWorkerThreadEventTarget } from "@alkdev/pubsub/event-target-worker";

const eventTarget = createWorkerThreadEventTarget();
// Must be called inside a Worker context — throws if globalThis.postMessage is unavailable

Lifecycle

All transport adapters provide a close() method for graceful teardown:

const eventTarget = createRedisEventTarget({ publishClient, subscribeClient });
// ... subscribe and publish ...

eventTarget.close(); // unsubscribes all channels, removes listener, clears state

After close():

  • addEventListener, removeEventListener, and dispatchEvent are no-ops
  • Intercepted handlers (onmessage, onclose) are restored to their originals
  • Subscriptions are cleaned up (Redis channels unsubscribed, WebSocket __unsubscribe sent)
  • The underlying transport (Redis connection, WebSocket, Worker) is not destroyed — the caller owns it

close() is idempotent. Calling it multiple times is safe.

Operators

Operators transform AsyncIterable streams from subscribe():

import { pipe, filter, map, take, batch } from "@alkdev/pubsub";

const pubsub = createPubSub<EventMap>();

const stream = pubsub.subscribe("user.created");

for await (const event of pipe(
  stream,
  filter((e) => e.payload.name.startsWith("A")),
  map((e) => e.payload.name),
  take(5),
)) {
  console.log(event);
}

Available operators: filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join.

EventEnvelope

All events are serialized as EventEnvelope:

interface EventEnvelope<TType = string, TPayload = unknown> {
  type: TType;
  id: string;
  payload: TPayload;
}

This is the cross-platform wire format. Adapters serialize/deserialize this automatically (JSON for Redis and WebSocket, structured clone for Worker).

Subscription Control Protocol

Event types starting with __ are reserved for internal use. Adapters use __subscribe and __unsubscribe control events to manage topic subscriptions across connections. User code must not define event types with the __ prefix.

TypeScript

Full type inference through EventMap:

type EventMap = {
  "user.created": { name: string; role: string };
  "order.placed": { orderId: string; total: number };
};

const pubsub = createPubSub<EventMap>();

pubsub.publish("user.created", "id-1", { name: "Alice", role: "admin" });
//                                               ^ full type checking on payload

Exports

| Import | Description | |--------|-------------| | @alkdev/pubsub | Core: createPubSub, EventEnvelope, Repeater, operators | | @alkdev/pubsub/event-target-redis | Redis adapter (peer dep: ioredis) | | @alkdev/pubsub/event-target-websocket-client | WebSocket client adapter | | @alkdev/pubsub/event-target-websocket-server | WebSocket server adapter | | @alkdev/pubsub/event-target-worker | Worker host + thread adapters |

Upstream Attribution

Core createPubSub, TypedEventTarget, and operators are adapted from graphql-yoga (MIT). The Repeater class is inlined from @repeaterjs/repeater (MIT).

License

Dual-licensed under MIT or Apache-2.0. Portions adapted from upstream projects retain their MIT attribution.