valkey-pubsub
v1.0.4
Published
Provides a Valkey-based publish-subscribe mechanism for decoupled communication between components. Provides support for Mercurius subscriptions using Valkey.
Downloads
4
Readme
valkey-pubsub
A lightweight, TypeScript-native Pub/Sub implementation designed for use with Mercurius GraphQL, built on top of Valkey (Redis-compatible).
Supports broadcasting messages to multiple listeners and enables easy event-driven design for GraphQL subscriptions or internal messaging systems.
Features
- 🔁 Broadcast-style delivery: All listeners receive each message
- ⚡ Fast and simple API: Push, subscribe, destroy
- ✅ Compatible with Mercurius PubSub interface
- 🧪 Tested with Jest
- 🧱 Type-safe and modular — written in pure TypeScript
Installation
pnpm add valkey-pubsubUsage
Create the PubSub instance
import ValkeyPubSub, { PubSubGenericQueue } from "valkey-pubsub";
const pubsub = await ValkeyPubSub.create({
addresses: [{ host: "localhost", port: 6379 }],
clusterMode: false,
});Subscribe to a topic
const queue = new PubSubGenericQueue<string>();
await pubsub.subscribe("my-topic", queue);
queue.onItem((message) => {
console.log("Received:", message);
});Publish a message
await pubsub.publish({
topic: "my-topic",
payload: { hello: "world" },
});Usage with Mercurius
The original/driving force for the design was the subscription models for Mercurius
const pubsub = await ValkeyPubSub.create();
fastify.decorate("pubsub", pubsub);
fastify.register(mercurius, {
schema,
resolvers: await resolvers,
loaders: await loaders,
context: async (request: FastifyRequest) => {
return {
request,
db: server.db,
valkey: server.valkey,
pubsub: server.pubsub,
logger: server.log,
} as ServerDecorators;
},
subscription: {
context: async (_server, request) => {
return {
request,
db: server.db,
valkey: server.valkey,
pubsub: server.pubsub,
logger: server.log,
} as ServerDecorators;
},
pubsub: server.pubsub,
},
graphiql: false, // ℹ️ cannot use in place with helmet
allowBatchedQueries: true,
path: "/graphql", // 👈 Restricts GraphQL to this endpoint
prefix: "/",
});API
ValkeyPubSub
ValkeyPubSub.create(config): Promise<ValkeyPubSub>
Creates a PubSub instance.
Config options:
- addresses: Array of { host, port } objects (defaults to Valkey on localhost:6379)
- protocol: 'RESP2' or 'RESP3' (default: 'RESP3')
- clusterMode: true or false (default: false)
pubsub.subscribe(topic, queue)
- Subscribes a queue to a topic. Every published message will be pushed to the queue and delivered to all its listeners.
pubsub.publish({ topic, payload }, callback?)
- Publishes a message to the specified topic. Payload will be stringified before being sent.
pubsub.cleanup()
- Cleans up all open connections and subscriptions.
PubSubGenericQueue
PubSubGenericQueue<T>
A generic in-memory delivery queue that stores items and allows multiple listeners.
Methods:
push(value: T): Pushes a value onto the queueonItem(callback: (value: T) => void): Registers a callback for new itemsisEmpty(): Returns true if the queue is emptysize(): Returns the current number of pending itemsdestroy(): Destroys the queue and runs any registered close callbacks
License
MIT © Chris Schuld
Contact
- Email - twitter handle @ gmail.com
- X - @cbschuld
Contributing
Yes, thank you! Please update the docs and tests and add your name to the package.json file.
