@alkdev/pubsub
v0.1.0
Published
Type-safe publish/subscribe with pluggable event target adapters (in-process, Redis, WebSocket, Worker)
Maintainers
Readme
@alkdev/pubsub
Type-safe publish/subscribe with pluggable event target adapters. Transport layer only — no call protocol or coordination semantics.
Every event is an EventEnvelope<TType, TPayload> with { type, id, payload }. Adapters implement the TypedEventTarget interface so you can swap transports without changing your subscribe logic.
Install
npm install @alkdev/pubsubFor Redis transport:
npm install ioredisWebSocket and Worker adapters use built-in APIs — no additional dependencies.
Quick Start
In-Process (default)
import { createPubSub } from "@alkdev/pubsub";
type EventMap = {
"user.created": { name: string };
"order.placed": { orderId: string };
};
const pubsub = createPubSub<EventMap>();
pubsub.subscribe("user.created", (_, payload) => {
console.log(`New user: ${payload.name}`);
});
pubsub.publish("user.created", "id-1", { name: "Alice" });Redis
import { createPubSub } from "@alkdev/pubsub";
import { createRedisEventTarget } from "@alkdev/pubsub/event-target-redis";
import Redis from "ioredis";
const publishClient = new Redis();
const subscribeClient = new Redis();
const eventTarget = createRedisEventTarget({
publishClient,
subscribeClient,
});
const pubsub = createPubSub({ eventTarget });WebSocket Client (browser/Node)
import { createPubSub } from "@alkdev/pubsub";
import { createWebSocketClientEventTarget } from "@alkdev/pubsub/event-target-websocket-client";
const ws = new WebSocket("ws://localhost:8080");
const eventTarget = createWebSocketClientEventTarget(ws);
const pubsub = createPubSub({ eventTarget });WebSocket Server (Node)
import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";
const server = createWebSocketServerEventTarget({
onConnection(spoke, ws) { /* new client connected */ },
onDisconnection(spoke, ws) { /* client disconnected */ },
maxBufferedAmount: 1_048_576,
onBackpressure(ws, bufferedAmount) { /* optional backpressure signal */ },
});
// When a new WebSocket connects:
server.addConnection(ws);
// When it disconnects:
server.removeConnection(ws);
// Subscribe local handlers:
server.addEventListener("user.created:id-1", (event) => {
// event.detail is the EventEnvelope
});
// Publish to subscribed connections:
server.dispatchEvent(new CustomEvent("user.created:id-1", { detail: envelope }));Worker (Host ↔ Thread)
// Host (main thread)
import { createWorkerHostEventTarget } from "@alkdev/pubsub/event-target-worker";
const worker = new Worker("./worker.js");
const eventTarget = createWorkerHostEventTarget(worker);// Worker thread
import { createWorkerThreadEventTarget } from "@alkdev/pubsub/event-target-worker";
const eventTarget = createWorkerThreadEventTarget();
// Must be called inside a Worker context — throws if globalThis.postMessage is unavailableLifecycle
All transport adapters provide a close() method for graceful teardown:
const eventTarget = createRedisEventTarget({ publishClient, subscribeClient });
// ... subscribe and publish ...
eventTarget.close(); // unsubscribes all channels, removes listener, clears stateAfter close():
addEventListener,removeEventListener, anddispatchEventare no-ops- Intercepted handlers (
onmessage,onclose) are restored to their originals - Subscriptions are cleaned up (Redis channels unsubscribed, WebSocket
__unsubscribesent) - The underlying transport (Redis connection, WebSocket, Worker) is not destroyed — the caller owns it
close() is idempotent. Calling it multiple times is safe.
Operators
Operators transform AsyncIterable streams from subscribe():
import { pipe, filter, map, take, batch } from "@alkdev/pubsub";
const pubsub = createPubSub<EventMap>();
const stream = pubsub.subscribe("user.created");
for await (const event of pipe(
stream,
filter((e) => e.payload.name.startsWith("A")),
map((e) => e.payload.name),
take(5),
)) {
console.log(event);
}Available operators: filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join.
EventEnvelope
All events are serialized as EventEnvelope:
interface EventEnvelope<TType = string, TPayload = unknown> {
type: TType;
id: string;
payload: TPayload;
}This is the cross-platform wire format. Adapters serialize/deserialize this automatically (JSON for Redis and WebSocket, structured clone for Worker).
Subscription Control Protocol
Event types starting with __ are reserved for internal use. Adapters use __subscribe and __unsubscribe control events to manage topic subscriptions across connections. User code must not define event types with the __ prefix.
TypeScript
Full type inference through EventMap:
type EventMap = {
"user.created": { name: string; role: string };
"order.placed": { orderId: string; total: number };
};
const pubsub = createPubSub<EventMap>();
pubsub.publish("user.created", "id-1", { name: "Alice", role: "admin" });
// ^ full type checking on payloadExports
| Import | Description |
|--------|-------------|
| @alkdev/pubsub | Core: createPubSub, EventEnvelope, Repeater, operators |
| @alkdev/pubsub/event-target-redis | Redis adapter (peer dep: ioredis) |
| @alkdev/pubsub/event-target-websocket-client | WebSocket client adapter |
| @alkdev/pubsub/event-target-websocket-server | WebSocket server adapter |
| @alkdev/pubsub/event-target-worker | Worker host + thread adapters |
Upstream Attribution
Core createPubSub, TypedEventTarget, and operators are adapted from graphql-yoga (MIT). The Repeater class is inlined from @repeaterjs/repeater (MIT).
License
Dual-licensed under MIT or Apache-2.0. Portions adapted from upstream projects retain their MIT attribution.
