ts-web-pubsub-client
v1.1.1
Published
Lightweight Azure Web PubSub client wrapper with typed event support, auto-reconnect, and retry.
Maintainers
Readme
ts-web-pubsub-client
Lightweight Azure Web PubSub client wrapper for TypeScript. Provides typed custom events, automatic reconnection with message recovery, send-retry with deduplication, and full dependency injection for testing.
[[TOC]]
Overview
This package wraps @azure/web-pubsub-client and solves the problems that come up when using the Azure SDK directly in a TypeScript project:
| Raw Azure SDK | This package |
|---|---|
| No typed event routing — one server-message handler for everything | Named on("event", handler) per event type |
| Basic protocol — no message recovery after disconnect | Reliable protocol — server buffers and replays missed messages |
| Token captured once — expires silently on reconnect | URL factory called on every reconnect for fresh tokens |
| No send retry — fire and forget | Exponential back-off retry with ackId deduplication |
| No input validation — cryptic Azure errors | PubSubError with typed codes before anything reaches Azure |
| Not injectable — impossible to unit test | All dependencies have interfaces, injected via constructor |
| Manual presence/join sends on every reconnect | sendOnConnect config — auto-sends a list of events on every connect/reconnect |
| Must use callback to queue pre-connect messages | sendWhenConnected() — queues a message and sends it the moment the connection opens |
Installation
npm install ts-web-pubsub-clientRequirements:
- Node.js 18+ or a modern browser
- TypeScript 4.7+ (optional but recommended)
Core Concepts
How messages flow
Azure Web PubSub Service
│
│ WebSocket (reliable protocol)
▼
AzureEventHandler ← handles raw Azure events
│ OnServerDataMessageArgs.message.data (ServerDataMessage)
│ OnGroupDataMessageArgs.message.data (GroupDataMessage)
│ data: JSONTypes | ArrayBuffer
│
│ MessageParser: parses { event, data } envelope
▼
EventBus ← routes by event name
│
▼
your on("chat", handler) ← your code receives typed data field directlyMessage envelope
Every message sent or received through this package uses this JSON structure:
{ "event": "chat", "data": { "text": "hello", "from": "Alice" } }The event field is the routing key. The data field is what your handler receives.
Your server MUST send messages in this exact format. Here is what actually happens internally, using the real Azure SDK types:
Azure SDK fires:
OnServerDataMessageArgs → { message: ServerDataMessage }
ServerDataMessage → { kind: "serverData", dataType: "text" | "json" | ..., data: JSONTypes | ArrayBuffer }
JSONTypes = string | number | boolean | object
data field (when dataType = "text") = '{"event":"chat","data":{...}}' ← JSON string
data field (when dataType = "json") = { event: "chat", data: {...} } ← already an object
↓ package parses this into { event, data } envelope
↓ if "event" field is missing or blank → silently dropped
↓
client.on("chat", handler) → handler(data)Supported dataType values:
| dataType | data type | Parsed by this package |
|---|---|---|
| "text" | string | Yes — parsed as JSON string |
| "json" | object | Yes — used directly as object |
| "binary" | ArrayBuffer | No — silently dropped |
| "protobuf" | ArrayBuffer | No — silently dropped |
Send messages from your server using dataType: "text" or dataType: "json".
What gets silently dropped:
// Missing event field — handler will never fire
{ "text": "hello", "from": "Alice" }
// Flat objects without event field — not routed
{ "type": "ChatMessage", "text": "hello" }
// Binary data (ArrayBuffer) — not supported, silently dropped
dataType: "binary"What works:
// Correct — has event + data envelope, sent as text or json
{ "event": "chat", "data": { "text": "hello", "from": "Alice" } } // dataType: "text" or "json"Reliable protocol
The package always uses WebPubSubJsonReliableProtocol. This means:
- The Azure service assigns a sequence number to every message
- During a disconnect, the service buffers messages for a few minutes
- On reconnect, the client sends the last received sequence number and the service replays anything missed
- Groups are automatically rejoined after reconnection
This is invisible to you — it just works.
Quick Start
import { PubSubClient } from "ts-web-pubsub-client";
// 1. Create the client
const client = new PubSubClient(
{
getClientAccessUrl: async () => {
const res = await fetch("/api/pubsub/negotiate");
return (await res.json()).url;
},
// Auto-send these events on every connect AND reconnect — no onConnected callback needed
sendOnConnect: [
{ event: "presence", data: { status: "online", userId: "u1" } },
],
},
{
onConnected: (id) => console.log("Connected:", id),
onDisconnected: (msg) => console.warn("Disconnected:", msg),
onStopped: () => console.error("Stopped — create a new instance to reconnect"),
}
);
// 2. Register handlers BEFORE connecting
client.on<{ text: string; from: string }>("chat", (data) => {
console.log(`${data.from}: ${data.text}`);
});
// 3. Connect (sendOnConnect events fire automatically when connected)
await client.connect();
// 4. Send immediately
await client.send("chat", { text: "hello", from: "Alice" });
// 5. Or queue a message to send the moment the connection opens
client.sendWhenConnected("notification", { msg: "client ready" });API Reference
Constructor
new PubSubClient(config, callbacks?, deps?)| Parameter | Type | Required | Description |
|---|---|---|---|
| config | PubSubConfig | Yes | Connection settings — URL, retry strategy, Azure options |
| callbacks | ConnectionCallbacks | No | Lifecycle hooks for connection state changes |
| deps | PubSubClientDeps | No | Inject mocks for unit testing without a real WebSocket |
Methods
connect()
await client.connect(): Promise<void>Opens the WebSocket connection. Retries on failure using the connectRetry strategy (default: exponential back-off, 5 retries).
| Client state when called | Behaviour |
|---|---|
| Not connected | Opens WebSocket. Retries on failure per strategy. |
| connect() already in progress | No-op — concurrent call is ignored. |
| Already connected | No-op. |
| onStopped has fired | Throws PubSubError("CLIENT_STOPPED") — create a new instance. |
Throws:
| Code | When |
|---|---|
| CLIENT_STOPPED | Called after onStopped fired. Create a new PubSubClient. |
disconnect()
client.disconnect(): voidCloses the connection gracefully. Safe to call before connect() — no-op if not connected. After disconnect() you can call connect() again.
| Client state when called | Behaviour |
|---|---|
| Connected | Closes WebSocket. onStopped will NOT fire (not treated as an error). |
| connect() in progress | Cancels. |
| Not connected | No-op. |
on(event, handler)
client.on<TData>(event: string, handler: (data: TData) => void): thisSubscribe to a named custom event. Returns this for chaining.
| Parameter | Type | Description |
|---|---|---|
| event | string | Routing key — matches the event field in the incoming { event, data } envelope. |
| handler | (data: TData) => void | Called with the data field from the envelope. |
| Returns | this | Enables chaining: .on(...).on(...).on(...) |
Throws: PubSubError("INVALID_EVENT_NAME") if event is empty or whitespace-only.
client
.on<ChatMessage>("chat", handleChat)
.on<UserEvent>("user-joined", handleUserJoined)
.on<Notification>("notification", handleNotification);off(event, handler)
client.off<TData>(event: string, handler: (data: TData) => void): thisUnsubscribe a previously registered handler. Must pass the same function reference used in on() — anonymous functions cannot be unsubscribed.
| Parameter | Type | Description |
|---|---|---|
| event | string | The event name used when subscribing. |
| handler | (data: TData) => void | The exact function reference passed to on(). |
| Returns | this | Enables chaining. |
Throws: PubSubError("INVALID_EVENT_NAME") if event is empty or whitespace-only.
send(event, data, options?)
await client.send<TData>(event: string, data: TData, options?: SendOptions): Promise<void>Send a custom event to the application server as a Web PubSub user-event. By default wraps data in a { event, data } envelope and sends with automatic retry.
| Parameter | Type | Required | Description |
|---|---|---|---|
| event | string | Yes | Event name. Must be non-empty. |
| data | TData | Yes | Any JSON-serialisable value. |
| options | SendOptions | No | Per-call overrides for retry, envelope wrapping, and dataType. |
| Returns | Promise<void> | — | Resolves when the Azure service acknowledges the message. |
Throws:
| Code | When |
|---|---|
| NOT_CONNECTED | Called before connect() or after disconnect(). |
| CLIENT_STOPPED | Called after onStopped fired. |
| INVALID_EVENT_NAME | event is empty or whitespace-only. |
sendWhenConnected(event, data, options?)
await client.sendWhenConnected<TData>(event: string, data: TData, options?: SendOptions): Promise<void>Queue a custom event to be sent as soon as the client is connected.
| When called | Behaviour |
|---|---|
| Already connected | Sends immediately — equivalent to send(). |
| Not yet connected | Held in queue. Sent the moment connected event fires. |
| Client is stopped | Rejects immediately with CLIENT_STOPPED. |
| disconnect() called while queued | Queue is preserved — items flush on the next connect() call. |
Unlike send(), this does not throw NOT_CONNECTED — it waits for the connection.
Queued items are sent concurrently when the connection opens. Each resolves or rejects independently.
Use this for one-off messages (e.g. session init payloads) that should fire once when the connection opens. Use
sendOnConnectin config for messages that must fire on every connect AND reconnect (e.g. presence).
| Parameter | Type | Required | Description |
|---|---|---|---|
| event | string | Yes | Event name. Must be non-empty. |
| data | TData | Yes | Any JSON-serialisable value. |
| options | SendOptions | No | Per-call overrides for retry, envelope wrapping, and dataType. |
| Returns | Promise<void> | — | Resolves when the Azure service acknowledges the message. |
Throws:
| Code | When |
|---|---|
| CLIENT_STOPPED | Called after onStopped fired. |
| INVALID_EVENT_NAME | event is empty or whitespace-only. |
// Queue before connect() is called — sends the moment the connection opens
client.sendWhenConnected("session-init", { sessionId: "abc", userId: "u1" });
await client.connect(); // ← session-init fires here automatically
// Or queue during a retry window — sends when the retry eventually succeeds
client.connect(); // might be retrying
client.sendWhenConnected("ready", { timestamp: Date.now() });sendToGroup(group, event, data, options?)
await client.sendToGroup<TData>(group: string, event: string, data: TData, options?: SendOptions): Promise<void>Broadcast a custom event to all clients that have joined a group. By default wraps data in a { event, data } envelope and sends with automatic retry.
| Parameter | Type | Required | Description |
|---|---|---|---|
| group | string | Yes | Target group name. Must be non-empty. |
| event | string | Yes | Event name. Must be non-empty. |
| data | TData | Yes | Any JSON-serialisable value. |
| options | SendOptions | No | Per-call overrides for retry, envelope wrapping, and dataType. |
| Returns | Promise<void> | — | Resolves when the Azure service acknowledges the message. |
Throws:
| Code | When |
|---|---|
| NOT_CONNECTED | Called before connect() or after disconnect(). |
| CLIENT_STOPPED | Called after onStopped fired. |
| INVALID_GROUP_NAME | group is empty or whitespace-only. |
| INVALID_EVENT_NAME | event is empty or whitespace-only. |
joinGroup(group)
await client.joinGroup(group: string): Promise<void>Join a group on the Azure service. Once joined, messages broadcast to the group via sendToGroup() are delivered to this client's on() handlers. When autoRejoinGroups: true (default), the client rejoins automatically after every reconnection.
| Parameter | Type | Required | Description |
|---|---|---|---|
| group | string | Yes | Group name. Must be non-empty. |
| Returns | Promise<void> | — | Resolves when the Azure service confirms the join. |
Throws:
| Code | When |
|---|---|
| NOT_CONNECTED | Called before connect() or after disconnect(). |
| CLIENT_STOPPED | Called after onStopped fired. |
| INVALID_GROUP_NAME | group is empty or whitespace-only. |
leaveGroup(group)
await client.leaveGroup(group: string): Promise<void>Leave a group. After leaving, group messages will no longer be delivered to this client.
| Parameter | Type | Required | Description |
|---|---|---|---|
| group | string | Yes | Group name. Must be non-empty. |
| Returns | Promise<void> | — | Resolves when the Azure service confirms the leave. |
Throws:
| Code | When |
|---|---|
| NOT_CONNECTED | Called before connect() or after disconnect(). |
| CLIENT_STOPPED | Called after onStopped fired. |
| INVALID_GROUP_NAME | group is empty or whitespace-only. |
Configuration Reference
PubSubConfig
Passed as the first argument to the constructor.
| Property | Type | Required | Default | Description |
|---|---|---|---|---|
| getClientAccessUrl | string \| ClientAccessUrlFactory | Yes | — | Static URL or async factory. Factory is called on every (re)connect — use it in production to always get a fresh token. |
| autoRejoinGroups | boolean | No | true | Automatically rejoin all previously joined groups after every reconnection. |
| connectRetry | ConnectRetryStrategy | No | built-in exp. back-off | Custom retry schedule for the initial connect() call. |
| sendOnConnect | AutoSendItem[] | No | [] | Events to auto-send on every connect and reconnect. No callback needed — the package fires them automatically. |
| sendEnvelope | boolean | No | true | Whether to wrap outgoing payloads in a { event, data } envelope. Set to false when the server expects the raw payload directly. Overridable per-call via SendOptions. |
| sendDataType | "text" \| "application/json" | No | "text" | The dataType passed to the Azure SDK when sending. Overridable per-call via SendOptions. |
| clientOptions | clientOptions | No | {} | Azure SDK options forwarded to WebPubSubClient. autoRejoinGroups is excluded (managed by this package). |
ClientAccessUrlFactory:
type ClientAccessUrlFactory = () => Promise<string> | string;// Static URL — dev/testing only. Token expires, not suitable for production.
{ getClientAccessUrl: "wss://my-service.webpubsub.azure.com/client/hubs/chat?access_token=..." }
// Factory — recommended for production. Fresh token on every connect.
{
getClientAccessUrl: async () => {
const res = await fetch("/api/pubsub/negotiate");
return (await res.json()).url;
}
}AutoSendItem
Each entry in sendOnConnect is an AutoSendItem:
interface AutoSendItem {
event: string; // event name — must be non-empty
data: unknown; // any JSON-serialisable payload
retryOptions?: RetryOptions; // optional per-item retry override
}Items are sent concurrently on every connected event (initial connect + every auto-reconnect). Errors after retry exhaustion are logged via console.error — the connection remains open and other items are unaffected.
const client = new PubSubClient({
getClientAccessUrl: async () => (await fetch("/api/negotiate")).json().url,
sendOnConnect: [
// Presence — tell the server this user is online
{ event: "presence", data: { status: "online", userId: currentUser.id } },
// Room join — re-announce current room on every reconnect
{ event: "join-room", data: { roomId: activeRoom.id } },
// Critical init — override retry for this specific item
{ event: "session-ready", data: { version: APP_VERSION }, retryOptions: { maxRetries: 5 } },
],
});Tip:
sendOnConnectfires on every reconnect too — perfect for presence and room membership announcements that must stay current after a network drop.
ConnectRetryStrategy
type ConnectRetryStrategy = (attempt: number, error: unknown) => number | null;| Parameter | Type | Description |
|---|---|---|
| attempt | number | Retry number — 1 = first retry after the initial failure, 2 = second retry, … |
| error | unknown | The error thrown by the failed azure.start() call. |
| Returns | number \| null | Milliseconds to wait before the next attempt, or null to stop retrying and throw. |
Default built-in schedule (used when connectRetry is not set):
| Attempt | Delay before this attempt | |---|---| | 1 (first try) | 0 ms — immediate | | 2 (retry 1) | 1 000 ms | | 3 (retry 2) | 2 000 ms | | 4 (retry 3) | 4 000 ms | | 5 (retry 4) | 8 000 ms | | 6 (retry 5) | 16 000 ms → throws |
Custom strategy examples:
// No retry — fail immediately on first error
connectRetry: () => null
// Fixed 2 s delay, max 3 retries
connectRetry: (attempt) => attempt > 3 ? null : 2_000
// Retry forever with exponential back-off capped at 30 s
connectRetry: (attempt) => Math.min(1000 * 2 ** (attempt - 1), 30_000)
// Log + exponential back-off
connectRetry: (attempt, error) => {
console.warn(`connect attempt ${attempt} failed:`, error);
return attempt > 5 ? null : 1000 * Math.pow(2, attempt - 1);
}ConnectionCallbacks
Passed as the second argument to the constructor. All callbacks are optional.
| Callback | Signature | When it fires |
|---|---|---|
| onConnected | (connectionId: string) => void | WebSocket opened — initial connect AND after every auto-reconnect |
| onDisconnected | (message?: string) => void | Connection dropped — auto-recovery is in progress (not a final failure) |
| onStopped | () => void | All Azure reconnect attempts exhausted — client is permanently stopped |
| onRejoinGroupFailed | (group: string, error: Error) => void | Reconnected but could not auto-rejoin a group (autoRejoinGroups: true only) |
| onConnectRetry | (attempt: number, error: unknown) => void | connect() failed and will retry — fires before each retry, NOT after the final failure |
const callbacks: ConnectionCallbacks = {
onConnected(connectionId) {
updateStatus("connected"); // safe to send messages now
},
onDisconnected(message) {
updateStatus("disconnected"); // show "reconnecting…" — recovery is automatic
},
onStopped() {
updateStatus("stopped"); // permanent — must create a new PubSubClient
showError("Connection lost. Please refresh.");
},
onRejoinGroupFailed(group, error) {
console.error(`Could not rejoin "${group}":`, error.message);
},
onConnectRetry(attempt, error) {
console.warn(`connect() attempt ${attempt} failed — retrying…`, error);
},
};SendOptions
Passed as the optional last argument to send(), sendWhenConnected(), and sendToGroup(). All fields are optional and override the global defaults set in PubSubConfig.
| Property | Type | Default | Description |
|---|---|---|---|
| retryOptions | RetryOptions | — | Override retry behaviour for this call only. |
| sendEnvelope | boolean | PubSubConfig.sendEnvelope (default true) | Override envelope wrapping for this call only. |
| sendDataType | "text" \| "application/json" | PubSubConfig.sendDataType (default "text") | Override the dataType for this call only. |
// Override retry only
await client.send("audit-log", data, { retryOptions: { maxRetries: 6, baseDelayMs: 300 } });
// Send raw payload without envelope (server expects plain JSON, not { event, data })
await client.send("ping", rawPayload, { sendEnvelope: false });
// Fail fast + raw + json dataType — all three at once
await client.send("typing", data, {
retryOptions: { maxRetries: 0 },
sendEnvelope: false,
sendDataType: "application/json",
});RetryOptions
Used inside SendOptions as options.retryOptions. Controls retry behaviour for that specific call.
| Property | Type | Default | Description |
|---|---|---|---|
| maxRetries | number | 3 | Maximum number of retries after the first failure. 0 = no retry. |
| baseDelayMs | number | 200 | Initial delay in ms before the first retry. Doubles on each subsequent retry (exponential back-off). |
Back-off schedule with defaults (maxRetries: 3, baseDelayMs: 200):
| Attempt | Delay before this attempt | |---|---| | 1 (first try) | 0 ms — immediate | | 2 (retry 1) | 200 ms | | 3 (retry 2) | 400 ms | | 4 (retry 3) | 800 ms → throws if still failing |
// More aggressive retry for critical messages
await client.send("audit-log", data, { retryOptions: { maxRetries: 6, baseDelayMs: 300 } });
// Fail fast — fire-and-forget events
await client.send("typing", data, { retryOptions: { maxRetries: 0 } });clientOptions — Azure SDK pass-through
PubSubConfig.clientOptions is forwarded directly to the underlying Azure WebPubSubClient.
autoRejoinGroupsis excluded — usePubSubConfig.autoRejoinGroupsinstead.
| Property | Type | Default | Description |
|---|---|---|---|
| autoReconnect | boolean | true | Whether the Azure SDK automatically reconnects after a connection drop. |
| messageRetryOptions | WebPubSubRetryOptions | — | Azure-level retry for joinGroup, leaveGroup, sendToGroup, sendEvent at the SDK level. |
| reconnectRetryOptions | WebPubSubRetryOptions | — | Azure-level retry for automatic reconnection. Only applies when autoReconnect: true. |
| protocol | WebPubSubClientProtocol | JsonReliableProtocol | Wire protocol. This package always enforces WebPubSubJsonReliableProtocol — any value set here is ignored. |
WebPubSubRetryOptions
| Property | Type | Default | Description |
|---|---|---|---|
| maxRetries | number | 3 | Number of retry attempts. |
| retryDelayInMs | number | — | Base delay between retries in ms. Used in both Fixed and Exponential modes. |
| maxRetryDelayInMs | number | — | Maximum delay cap between retries. Only applies in Exponential mode. |
| mode | "Fixed" \| "Exponential" | "Fixed" | Retry timing mode. |
PubSubError
Extends Error. Always carries a typed code property — use it instead of parsing message strings.
class PubSubError extends Error {
readonly code: PubSubErrorCode; // typed error code
readonly message: string; // human-readable description
readonly name: "PubSubError";
}try {
await client.send("chat", data);
} catch (err) {
if (err instanceof PubSubError) {
switch (err.code) {
case "NOT_CONNECTED": showToast("Still connecting…"); break;
case "CLIENT_STOPPED": showToast("Connection lost. Refresh to reconnect."); break;
default: console.error(err.message);
}
}
}Error codes:
| Code | Thrown by | When |
|---|---|---|
| NOT_CONNECTED | send, sendToGroup, joinGroup, leaveGroup | Called before connect() completes, or after disconnect(). |
| CLIENT_STOPPED | connect, send, sendWhenConnected, sendToGroup, joinGroup, leaveGroup | Called after onStopped fired. Create a new PubSubClient to reconnect. |
| INVALID_EVENT_NAME | on, off, send, sendWhenConnected, sendToGroup | event argument is empty string or whitespace-only. |
| INVALID_GROUP_NAME | joinGroup, leaveGroup, sendToGroup | group argument is empty string or whitespace-only. |
| INVALID_CONFIG | Constructor | getClientAccessUrl is an empty string. |
PubSubClientDeps
Passed as the optional third argument to the constructor. Inject mock implementations to test without a real WebSocket connection.
| Property | Type | Description |
|---|---|---|
| azureClient | IAzureWebPubSubClient | Replace the real Azure WebPubSubClient. Implement start, stop, on, joinGroup, leaveGroup, sendEvent, sendToGroup. |
| eventBus | IEventBus | Replace the internal EventBus. Implement subscribe, unsubscribe, dispatch, hasListeners. |
| messageParser | IMessageParser | Replace the internal MessageParser. Implement parse(raw): PubSubMessage \| null. |
import type { IAzureWebPubSubClient } from "ts-web-pubsub-client";
const mockAzure: IAzureWebPubSubClient = {
start: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
joinGroup: vi.fn().mockResolvedValue(undefined),
leaveGroup: vi.fn().mockResolvedValue(undefined),
sendEvent: vi.fn().mockResolvedValue(undefined),
sendToGroup: vi.fn().mockResolvedValue(undefined),
on: vi.fn(),
};
const client = new PubSubClient(
{ getClientAccessUrl: "wss://unused" },
{},
{ azureClient: mockAzure }
);Type Glossary
All types exported from this package.
| Type | Kind | Description |
|---|---|---|
| PubSubConfig | interface | First constructor argument — URL, retry, sendOnConnect, Azure pass-through options |
| AutoSendItem | interface | One entry in sendOnConnect — { event, data, retryOptions? } |
| ConnectionCallbacks | interface | Second constructor argument — lifecycle hooks |
| PubSubClientDeps | interface | Third constructor argument — dependency injection for testing |
| RetryOptions | interface | Per-call retry tuning for send(), sendWhenConnected(), and sendToGroup() |
| ConnectRetryStrategy | type | (attempt: number, error: unknown) => number \| null — custom initial connect retry |
| ClientAccessUrlFactory | type | () => Promise<string> \| string — factory variant of getClientAccessUrl |
| PubSubMessage<TData> | interface | Wire envelope: { event: string, data: TData } — used by server and parser |
| EventHandler<TData> | type | (data: TData) => void — handler passed to on() / off() |
| PubSubError | class | Thrown for consumer mistakes. Has code: PubSubErrorCode and message: string. |
| PubSubErrorCode | type | "NOT_CONNECTED" \| "CLIENT_STOPPED" \| "INVALID_EVENT_NAME" \| "INVALID_GROUP_NAME" \| "INVALID_CONFIG" |
| IAzureWebPubSubClient | interface | Contract for the Azure WebSocket client (inject via deps.azureClient) |
| IEventBus | interface | Contract for the internal event bus (inject via deps.eventBus) |
| IMessageParser | interface | Contract for the message parser (inject via deps.messageParser) |
Features
Typed custom events
Every event has a name and a typed payload. TypeScript will catch payload shape mismatches at compile time.
interface OrderUpdate {
orderId: string;
status: "processing" | "shipped" | "delivered";
updatedAt: string;
}
client.on<OrderUpdate>("order-update", (data) => {
// data.orderId ✓ string
// data.status ✓ "processing" | "shipped" | "delivered"
// data.mistyped ✗ TypeScript error
});Auto-reconnection and message recovery
When the connection drops:
Connection drops
│
▼
1. RECOVER — reconnect with same connection ID
Service replays buffered messages (missed during disconnect)
│
▼ recovery fails
2. RECONNECT — new connection ID
Groups rejoined automatically (autoRejoinGroups: true)
│
▼ all attempts exhausted
3. STOPPED — onStopped fires, client is doneNo code required from you. Use onConnected, onDisconnected, and onStopped callbacks to update your UI.
Auto-send on connect / reconnect (sendOnConnect)
Pass a list of events in config and the package sends them automatically every time the connection opens — including after every automatic reconnect. No callback wiring needed.
const client = new PubSubClient({
getClientAccessUrl: async () => (await fetch("/api/negotiate")).json().url,
sendOnConnect: [
{ event: "presence", data: { status: "online", userId: currentUser.id } },
{ event: "join-room", data: { roomId: activeRoom } },
],
});
await client.connect();
// ↑ "presence" and "join-room" are sent automatically after connected fires.
// On every reconnect they fire again — presence stays current without any extra code.When to use sendOnConnect vs onConnected callback:
| Scenario | Use |
|---|---|
| Presence / room join that must re-fire on every reconnect | sendOnConnect |
| Multiple events that all re-fire on every reconnect | sendOnConnect |
| Conditional logic before sending (e.g. check a flag) | onConnected callback |
| Update UI state on connect | onConnected callback |
Queue a one-off message before connecting (sendWhenConnected)
Send a message that fires exactly once — the moment the connection opens. Useful for session-init payloads or messages that must be queued before connect() is called.
// Queue BEFORE connect() — fires automatically when connected
client.sendWhenConnected("session-init", { sessionId: "abc", userId: currentUser.id });
await client.connect(); // ← session-init fires here
// Or queue during a retry window
client.connect(); // retrying in background
client.sendWhenConnected("ready", { timestamp: Date.now() }); // queued, sends when connectedDifferences from sendOnConnect:
| Feature | sendOnConnect (config) | sendWhenConnected() (method) |
|---|---|---|
| Fires on reconnect | Yes — every time | No — once only |
| Configured at | Constructor | Call site |
| Result | Fire-and-forget | Promise<void> you can await |
| Use for | Presence, room membership | Session init, one-time payloads |
Groups
Groups let you broadcast messages to a subset of connected clients — like chat rooms, notification channels, or tenant-scoped feeds.
// Join
await client.joinGroup("tenant-abc");
// Send to everyone in the group
await client.sendToGroup("tenant-abc", "notification", {
title: "New report available",
reportId: "rpt-123",
});
// Leave
await client.leaveGroup("tenant-abc");Groups are rejoined automatically after reconnection when autoRejoinGroups: true.
Send retry with deduplication
send() and sendToGroup() retry on failure using exponential back-off. On each retry the same ackId is reused, so the Azure service can deduplicate — a message that was delivered before the error was thrown will not be delivered twice.
Dependency injection for testing
All collaborators implement public interfaces and can be replaced via the deps constructor argument. No real WebSocket is needed in tests.
import type { IAzureWebPubSubClient } from "ts-web-pubsub-client";
const mockAzure: IAzureWebPubSubClient = {
start: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
joinGroup: vi.fn().mockResolvedValue(undefined),
leaveGroup: vi.fn().mockResolvedValue(undefined),
sendEvent: vi.fn().mockResolvedValue(undefined),
sendToGroup: vi.fn().mockResolvedValue(undefined),
on: vi.fn(),
};
const client = new PubSubClient(
{ getClientAccessUrl: "wss://unused" },
{},
{ azureClient: mockAzure }
);Usage Examples
Example 1 — Basic setup (plain TypeScript)
// pubsub/client.ts
import { PubSubClient } from "ts-web-pubsub-client";
import apiClient from "../lib/apiClient";
import { currentUser } from "../auth";
export const client = new PubSubClient(
{
getClientAccessUrl: async () => {
const res = await apiClient.get("/pubsub/negotiate");
return res.data.url;
},
autoRejoinGroups: true,
// Auto-sent on every connect and reconnect — no onConnected callback needed
sendOnConnect: [
{ event: "presence", data: { status: "online", userId: currentUser.id } },
],
},
{
onConnected: (id) => console.log("[PubSub] connected:", id),
onDisconnected: () => console.warn("[PubSub] disconnected — recovering"),
onStopped: () => console.error("[PubSub] stopped"),
}
);// main.ts
import { client } from "./pubsub/client";
import { registerAllHandlers } from "./pubsub/events";
registerAllHandlers();
await client.connect();Example 2 — React with a custom hook
// hooks/usePubSub.ts
import { useEffect, useRef, useState } from "react";
import { PubSubClient } from "ts-web-pubsub-client";
import type { ConnectionCallbacks } from "ts-web-pubsub-client";
export type ConnectionStatus = "idle" | "connecting" | "connected" | "disconnected" | "stopped";
export function usePubSub(getClientAccessUrl: () => Promise<string>) {
const [status, setStatus] = useState<ConnectionStatus>("idle");
const clientRef = useRef<PubSubClient | null>(null);
useEffect(() => {
const callbacks: ConnectionCallbacks = {
onConnected: () => setStatus("connected"),
onDisconnected: () => setStatus("disconnected"),
onStopped: () => setStatus("stopped"),
};
const instance = new PubSubClient({ getClientAccessUrl }, callbacks);
clientRef.current = instance;
setStatus("connecting");
instance.connect();
return () => instance.disconnect();
}, []);
return { client: clientRef.current, status };
}// App.tsx
import { usePubSub } from "./hooks/usePubSub";
export function App() {
const { client, status } = usePubSub(async () => {
const res = await fetch("/api/pubsub/negotiate");
return (await res.json()).url;
});
useEffect(() => {
if (!client) return;
client.on<{ text: string }>("chat", (data) => console.log(data.text));
}, [client]);
return <span>Status: {status}</span>;
}Example 3 — React context (recommended for multi-component apps)
// context/PubSubContext.tsx
import { createContext, useContext, useEffect, useRef, useState } from "react";
import { PubSubClient } from "ts-web-pubsub-client";
import type { ConnectionCallbacks } from "ts-web-pubsub-client";
type Status = "idle" | "connecting" | "connected" | "disconnected" | "stopped";
interface PubSubContextValue {
client: PubSubClient | null;
status: Status;
}
const PubSubContext = createContext<PubSubContextValue>({ client: null, status: "idle" });
export function PubSubProvider({ children }: { children: React.ReactNode }) {
const [status, setStatus] = useState<Status>("idle");
const clientRef = useRef<PubSubClient | null>(null);
useEffect(() => {
const callbacks: ConnectionCallbacks = {
onConnected: () => setStatus("connected"),
onDisconnected: () => setStatus("disconnected"),
onStopped: () => setStatus("stopped"),
};
const instance = new PubSubClient(
{
getClientAccessUrl: async () => {
const res = await fetch("/api/pubsub/negotiate");
return (await res.json()).url;
},
},
callbacks
);
clientRef.current = instance;
setStatus("connecting");
instance.connect();
return () => instance.disconnect();
}, []);
return (
<PubSubContext.Provider value={{ client: clientRef.current, status }}>
{children}
</PubSubContext.Provider>
);
}
export const usePubSubContext = () => useContext(PubSubContext);// components/StatusBadge.tsx
import { usePubSubContext } from "../context/PubSubContext";
const labels = {
idle: "Not connected",
connecting: "Connecting…",
connected: "Live",
disconnected: "Reconnecting…",
stopped: "Connection lost",
};
export function StatusBadge() {
const { status } = usePubSubContext();
return <span>{labels[status]}</span>;
}Example 4 — Zustand integration
// stores/pubsub.store.ts
import { create } from "zustand";
import { PubSubClient, PubSubError } from "ts-web-pubsub-client";
type Status = "idle" | "connecting" | "connected" | "disconnected" | "stopped";
interface PubSubState {
client: PubSubClient | null;
status: Status;
init: () => void;
teardown: () => void;
}
export const usePubSubStore = create<PubSubState>((set, get) => ({
client: null,
status: "idle",
init() {
const instance = new PubSubClient(
{
getClientAccessUrl: async () => {
const res = await fetch("/api/pubsub/negotiate");
return (await res.json()).url;
},
},
{
onConnected: () => set({ status: "connected" }),
onDisconnected: () => set({ status: "disconnected" }),
onStopped: () => set({ status: "stopped" }),
}
);
set({ client: instance, status: "connecting" });
instance.connect();
},
teardown() {
get().client?.disconnect();
set({ client: null, status: "idle" });
},
}));// App.tsx
import { useEffect } from "react";
import { usePubSubStore } from "./stores/pubsub.store";
export function App() {
const { init, teardown, status } = usePubSubStore();
useEffect(() => {
init();
return () => teardown();
}, []);
return <span>{status}</span>;
}Example 5 — Event handlers in separate files
// pubsub/events/chat.handler.ts
import { client } from "../client";
export interface ChatMessage {
id: string;
text: string;
from: string;
roomId: string;
sentAt: string;
}
function handleChat(data: ChatMessage): void {
// update your store, dispatch to Redux, etc.
console.log(`[${data.roomId}] ${data.from}: ${data.text}`);
}
export function registerChatHandler(): void { client.on<ChatMessage>("chat", handleChat); }
export function unregisterChatHandler(): void { client.off<ChatMessage>("chat", handleChat); }// pubsub/events/notification.handler.ts
import { client } from "../client";
export interface Notification {
id: string;
title: string;
body: string;
severity: "info" | "warning" | "error";
}
function handleNotification(data: Notification): void {
console.log(`[${data.severity}] ${data.title}`);
}
export function registerNotificationHandler(): void { client.on<Notification>("notification", handleNotification); }
export function unregisterNotificationHandler(): void { client.off<Notification>("notification", handleNotification); }// pubsub/events/index.ts ← single entry point for all handlers
export { registerChatHandler, unregisterChatHandler } from "./chat.handler";
export { registerNotificationHandler, unregisterNotificationHandler } from "./notification.handler";
export { registerPresenceHandlers, unregisterPresenceHandlers } from "./presence.handler";// pubsub/index.ts ← the only file the rest of your app imports from
import { client } from "./client";
import {
registerChatHandler,
registerNotificationHandler,
registerPresenceHandlers,
} from "./events";
export async function startPubSub(): Promise<void> {
registerChatHandler();
registerNotificationHandler();
registerPresenceHandlers();
await client.connect();
}
export function stopPubSub(): void {
client.disconnect();
}
export { client };Example 6 — Typing indicator
// pubsub/events/typing.handler.ts
import { client } from "../client";
interface TypingEvent { userId: string; roomId: string; }
let hideTypingTimer: ReturnType<typeof setTimeout> | null = null;
function handleTyping(data: TypingEvent): void {
showTypingIndicator(data.userId);
// Auto-hide if no further events arrive within 2 seconds
if (hideTypingTimer) clearTimeout(hideTypingTimer);
hideTypingTimer = setTimeout(() => hideTypingIndicator(), 2000);
}
export function registerTypingHandler(): void { client.on<TypingEvent>("typing", handleTyping); }
export function unregisterTypingHandler(): void { client.off<TypingEvent>("typing", handleTyping); }Sending a typing event:
// Throttle to avoid flooding — send at most once every 800ms
let lastTypingSent = 0;
function onInputChange(): void {
const now = Date.now();
if (now - lastTypingSent < 800) return;
lastTypingSent = now;
client.send("typing", { userId: currentUserId, roomId: activeRoom })
.catch(() => {}); // typing events are fire-and-forget
}Example 7 — Manual connection control
// components/ChatRoom.tsx
import { useRef, useState } from "react";
import { PubSubClient, PubSubError } from "ts-web-pubsub-client";
import { registerChatHandler, unregisterChatHandler } from "../pubsub/events/chat.handler";
export function ChatRoom({ roomId }: { roomId: string }) {
const clientRef = useRef<PubSubClient | null>(null);
const [joined, setJoined] = useState(false);
const [error, setError] = useState<string | null>(null);
async function join() {
try {
const instance = new PubSubClient({
getClientAccessUrl: async () => {
const res = await fetch(`/api/pubsub/negotiate?room=${roomId}`);
return (await res.json()).url;
},
});
clientRef.current = instance;
registerChatHandler();
await instance.connect();
await instance.joinGroup(roomId);
setJoined(true);
} catch (err) {
if (err instanceof PubSubError) setError(err.message);
}
}
function leave() {
unregisterChatHandler();
clientRef.current?.leaveGroup(roomId);
clientRef.current?.disconnect();
clientRef.current = null;
setJoined(false);
}
if (error) return <p>Error: {error}</p>;
return joined
? <button onClick={leave}>Leave room</button>
: <button onClick={join}>Join room</button>;
}Example 8 — Error handling
import { PubSubClient, PubSubError } from "ts-web-pubsub-client";
async function safeSend(event: string, data: unknown): Promise<void> {
try {
await client.send(event, data);
} catch (err) {
if (!(err instanceof PubSubError)) throw err; // re-throw unknown errors
switch (err.code) {
case "NOT_CONNECTED":
showToast("Still connecting — please try again in a moment");
break;
case "CLIENT_STOPPED":
showToast("Connection lost. Refresh the page to reconnect.");
break;
case "INVALID_EVENT_NAME":
case "INVALID_GROUP_NAME":
// These are programmer errors — they should never reach production
console.error("[BUG]", err.message);
break;
default:
console.error("PubSub send failed:", err.message);
}
}
}Example 9 — Presence and session init with sendOnConnect + sendWhenConnected
// pubsub/client.ts
import { PubSubClient } from "ts-web-pubsub-client";
import { getAuthToken } from "../auth";
// Presence event that must fire on EVERY connect and reconnect
const client = new PubSubClient(
{
getClientAccessUrl: async () => {
const { url } = await (await fetch("/api/pubsub/negotiate")).json();
return url;
},
sendOnConnect: [
// Sent automatically after every connect — keeps presence status current
{ event: "presence", data: { status: "online" } },
// Re-announce active room membership on every reconnect
{ event: "join-room", data: { roomId: getActiveRoom() } },
],
},
{
onConnected: (id) => console.log("connected:", id),
onDisconnected: () => updateStatusBadge("reconnecting"),
onStopped: () => updateStatusBadge("offline"),
}
);
export { client };// app-init.ts — queue a one-off session-init before connecting
import { client } from "./pubsub/client";
import { getSession } from "./auth";
// sendWhenConnected queues the message and sends it once the connection opens.
// Unlike sendOnConnect, this does NOT repeat on reconnect.
client.sendWhenConnected("session-init", {
sessionId: getSession().id,
appVersion: APP_VERSION,
locale: navigator.language,
});
// Register all event handlers
client.on("chat", handleChat);
client.on("notification", handleNotification);
// Connect — "session-init" fires automatically when the connection opens.
// "presence" and "join-room" also fire now (and on every future reconnect).
await client.connect();Consumer Project Structure
Minimal structure — single feature
src/
pubsub/
client.ts ← PubSubClient singleton
events.ts ← all on() registrations in one file
App.tsxStandard structure — multiple features
src/
pubsub/
client.ts ← PubSubClient singleton
index.ts ← startPubSub() / stopPubSub()
events/
chat.handler.ts ← "chat" event
notification.handler.ts
presence.handler.ts
index.ts ← re-exports all register/unregister
hooks/
usePubSubStatus.ts ← connection status hook
App.tsxLarge app structure — domain-driven
src/
pubsub/
client.ts ← singleton client
index.ts ← startPubSub() / stopPubSub()
features/
chat/
chat.handler.ts ← registers chat events, updates chat store
chat.store.ts
Chat.tsx
notifications/
notification.handler.ts
notification.store.ts
NotificationBell.tsx
presence/
presence.handler.ts
presence.store.ts
OnlineUsers.tsx
shared/
hooks/
usePubSubStatus.ts
components/
ConnectionBadge.tsxEach feature folder owns its own handler file. No cross-feature imports. The pubsub/index.ts imports from each feature and calls registerXxxHandler() in one place.
Testing Guide
Unit test — event handler
// events/chat.handler.test.ts
import { describe, it, expect, vi } from "vitest";
import { EventBus } from "ts-web-pubsub-client";
describe("chat handler", () => {
it("logs the message", () => {
const bus = new EventBus();
const spy = vi.fn();
bus.subscribe("chat", spy);
bus.dispatch("chat", { text: "hello", from: "Alice" });
expect(spy).toHaveBeenCalledWith({ text: "hello", from: "Alice" });
});
});Unit test — PubSubClient with mocked Azure
// client/pub-sub-client.test.ts
import { describe, it, expect, vi, beforeEach } from "vitest";
import { PubSubClient, PubSubError } from "ts-web-pubsub-client";
import type { IAzureWebPubSubClient } from "ts-web-pubsub-client";
function buildMockAzure(): IAzureWebPubSubClient {
return {
start: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
joinGroup: vi.fn().mockResolvedValue(undefined),
leaveGroup: vi.fn().mockResolvedValue(undefined),
sendEvent: vi.fn().mockResolvedValue(undefined),
sendToGroup: vi.fn().mockResolvedValue(undefined),
on: vi.fn(),
};
}
describe("PubSubClient", () => {
let mockAzure: IAzureWebPubSubClient;
let client: PubSubClient;
beforeEach(() => {
mockAzure = buildMockAzure();
client = new PubSubClient(
{ getClientAccessUrl: "wss://unused" },
{},
{ azureClient: mockAzure }
);
});
it("calls azure.start on connect", async () => {
await client.connect();
expect(mockAzure.start).toHaveBeenCalledOnce();
});
it("does not call start twice if already connected", async () => {
await client.connect();
await client.connect();
expect(mockAzure.start).toHaveBeenCalledOnce();
});
it("throws NOT_CONNECTED if send called before connect", async () => {
await expect(client.send("chat", {}))
.rejects.toMatchObject({ code: "NOT_CONNECTED" });
});
it("throws INVALID_EVENT_NAME for blank event", async () => {
await client.connect();
await expect(client.send(" ", {}))
.rejects.toMatchObject({ code: "INVALID_EVENT_NAME" });
});
it("routes incoming server-message to on() handler", () => {
// Get the listener registered for "server-message"
const listeners = new Map<string, (e: unknown) => void>();
vi.mocked(mockAzure.on).mockImplementation((event: string, fn: (e: unknown) => void) => {
listeners.set(event, fn);
});
const newClient = new PubSubClient(
{ getClientAccessUrl: "wss://unused" },
{},
{ azureClient: mockAzure }
);
const handler = vi.fn();
newClient.on("chat", handler);
// Simulate incoming message — mirrors the real Azure SDK shape:
// OnServerDataMessageArgs → { message: ServerDataMessage }
// ServerDataMessage → { kind: "serverData", dataType: "text", data: JSONTypes | ArrayBuffer }
listeners.get("server-message")?.({
message: { kind: "serverData", dataType: "text", data: JSON.stringify({ event: "chat", data: { text: "hi" } }) },
});
expect(handler).toHaveBeenCalledWith({ text: "hi" });
});
});Unit test — MessageParser in isolation
// core/message-parser.test.ts
import { describe, it, expect } from "vitest";
import { MessageParser } from "ts-web-pubsub-client";
describe("MessageParser", () => {
const parser = new MessageParser();
it("parses a valid JSON string", () => {
const result = parser.parse('{"event":"chat","data":{"text":"hi"}}');
expect(result).toEqual({ event: "chat", data: { text: "hi" } });
});
it("parses a plain object", () => {
const result = parser.parse({ event: "chat", data: { text: "hi" } });
expect(result).toEqual({ event: "chat", data: { text: "hi" } });
});
it("returns null for missing event field", () => {
expect(parser.parse({ data: { text: "hi" } })).toBeNull();
});
it("returns null for blank event name", () => {
expect(parser.parse({ event: " ", data: {} })).toBeNull();
});
it("returns null for non-JSON string", () => {
expect(parser.parse("not json")).toBeNull();
});
it("returns null for null input", () => {
expect(parser.parse(null)).toBeNull();
});
});Common Mistakes
Mistake 1 — Listening to "server-message" as an event name
// WRONG — this registers "server-message" as a custom event name in the EventBus,
// NOT the Azure transport event. It will never fire for incoming server messages.
client.on("server-message", (data) => {
console.log(data); // ✗ never called
});The Azure SDK fires a raw "server-message" transport event internally (OnServerDataMessageArgs, where message.data is JSONTypes | ArrayBuffer). This package handles that raw event for you, parses the { event, data } envelope from message.data, and dispatches by the envelope's event field. You register handlers by the envelope's event field, not by the Azure transport event name.
// CORRECT — register by the event name inside the envelope
client.on("chat", (data) => {
console.log(data); // ✓ called with the "data" field from the envelope
});Mistake 2 — Server sending flat messages without an envelope
If your server sends a flat object without the event field, your handlers will never fire:
// Server sends (missing event field):
{ "value": "ok" } // ✗ no "event" field → silently dropped
{ "type": "notification", "title": "Hello" } // ✗ no "event" field → silently droppedYour server must wrap the payload in a { event, data } envelope:
// Server sends:
{ "event": "ODResponse", "data": { "value": "ok" } } // ✓
{ "event": "notification", "data": { "title": "Hello" } } // ✓Client then registers:
client.on("ODResponse", (data) => console.log(data.value));
client.on("notification", (data) => console.log(data.title));Mistake 3 — Calling send() before connect()
// WRONG — connect() not awaited yet
client.connect();
await client.send("chat", data); // ✗ throws PubSubError("NOT_CONNECTED")
// CORRECT — await connect() first
await client.connect();
await client.send("chat", data); // ✓
// ALSO CORRECT — use sendWhenConnected() to queue and auto-send when connected
client.sendWhenConnected("chat", data); // ✓ queued, fires the moment connection opens
await client.connect();Mistake 4 — Creating a new client on every render (React)
// WRONG — new WebSocket on every render
function MyComponent() {
const client = new PubSubClient(...); // ✗ new instance every render
}
// CORRECT — stable ref
function MyComponent() {
const clientRef = useRef(new PubSubClient(...)); // ✓ created once
}Reconnection Behaviour
| Scenario | What happens |
|---|---|
| Brief network blip (< buffer window) | Connection recovers silently. Missed messages are replayed. onDisconnected then onConnected fire. |
| Network down for a few minutes | Same as above if within buffer window. |
| Network down beyond buffer window | Reconnects but missed messages during the gap are lost. |
| Azure service restart | Client reconnects to the new instance automatically. |
| Expired token on reconnect | URL factory is called fresh — new token is fetched. |
| All reconnect attempts fail | onStopped fires. Create a new PubSubClient to try again. |
send() does not queue offline messages — it throws PubSubError("NOT_CONNECTED") when not connected. Show the user feedback or use sendWhenConnected() if you want the message to wait:
// Option A — fail immediately, show feedback
try {
await client.send("chat", data);
} catch (err) {
if (err instanceof PubSubError && err.code === "NOT_CONNECTED") {
showToast("Not connected. Message not sent.");
}
}
// Option B — queue the message; it sends the moment the connection opens
await client.sendWhenConnected("chat", data);React StrictMode Warning
In development, React 18 StrictMode mounts every component twice. If you create and connect your client inside a useEffect without a cleanup function, you will end up with two simultaneous WebSocket connections.
// BAD — two connections in StrictMode
useEffect(() => {
client.connect();
}, []);
// GOOD — cleanup always closes the connection
useEffect(() => {
client.connect();
return () => client.disconnect();
}, []);Keep the client instance in a useRef, not useState. A WebSocket connection is not UI state.
Source Structure
src/
client/
pub-sub-client.ts Public API — connect, disconnect, on, off, send, groups
azure-event-handler.ts Handles all Azure WebSocket events, routes to EventBus
guards.ts Input validation — throws PubSubError with typed codes
core/
event-bus.ts Subscribe / unsubscribe / dispatch (no Azure dependency)
message-parser.ts Raw unknown → typed PubSubMessage | null
retry.ts Exponential back-off with ackId deduplication
azure/
client-factory.ts Creates the underlying Azure WebPubSubClient
errors/
pub-sub-error.ts PubSubError with typed PubSubErrorCode
types/
config.ts PubSubConfig, ClientAccessUrlFactory, PubSubClientDeps
message.ts PubSubMessage, EventHandler
callbacks.ts ConnectionCallbacks, RetryOptions
interfaces.ts IEventBus, IMessageParser, IAzureWebPubSubClient
index.ts Package entry point — all public exports