@solncebro/websocket-engine
v0.2.0
Published
Reliable WebSocket client with reconnect, heartbeat and typed messages
Maintainers
Readme
@solncebro/websocket-engine
Reliable WebSocket client for Node.js with automatic reconnection, ping/pong heartbeat, typed messages, optional authentication phase, and request/response pattern.
Installation
yarn add @solncebro/websocket-enginenpm install @solncebro/websocket-engineFeatures
- Automatic reconnection — exponential backoff with jitter; fast reconnect for specific close codes (1001, 1006, 1011–1014)
- Heartbeat — TCP ping/pong by default; or application-level JSON ping (e.g. Bybit
{ op: 'ping' }) - Connection timeout — handshake timeout with retry
- Auth phase — optional
onOpenasync callback for authentication before the connection is considered ready - Send —
controller.sendToConnectedSocket(data)for outbound messages - Request/response —
controller.waitForMessage(predicate, timeout)to await a specific incoming message by any criteria (e.g.reqId) - Typed messages — optional
parseMessagefor type-safe payloads - Notifications —
onNotifycallback for alerts; process exits with code 1 after max retries (suitable for PM2 restart)
Requirements
- Node.js 16+
- TypeScript 5.x (optional, for types)
Usage
Basic (no auth)
import { createReliableWebSocket } from "@solncebro/websocket-engine";
interface StreamMessage {
type: string;
data: unknown;
}
const controller = createReliableWebSocket<StreamMessage>({
url: "wss://stream.example.com",
label: "market-stream",
logger: pinoLogger,
parseMessage: (rawData) => JSON.parse(rawData.toString()) as StreamMessage,
onMessage: (message) => {
console.log(message.type, message.data);
},
onNotify: async (message) => {
await sendTelegramAlert(message);
},
});
controller.close();With authentication (e.g. Bybit trading WebSocket)
import crypto from "crypto";
import {
createReliableWebSocket,
WebSocketOpenContext,
} from "@solncebro/websocket-engine";
interface BybitMessage {
op?: string;
retCode?: number;
retMsg?: string;
reqId?: string;
data?: unknown;
}
const controller = createReliableWebSocket<BybitMessage>({
url: "wss://stream.bybit.com/v5/trade",
label: "bybit-trade",
logger: pinoLogger,
parseMessage: (rawData) => JSON.parse(rawData.toString()) as BybitMessage,
onOpen: async ({ send, waitForMessage }: WebSocketOpenContext<BybitMessage>) => {
const expires = Date.now() + 10000;
const signature = crypto
.createHmac("sha256", SECRET)
.update(`GET/realtime${expires}`)
.digest("hex");
send({ op: "auth", args: [API_KEY, expires, signature] });
const response = await waitForMessage((message) => message.op === "auth", 10000);
if (response.retMsg !== "OK") {
throw new Error(`Auth failed: ${response.retMsg}`);
}
},
heartbeat: {
buildPayload: () => ({ op: "ping" }),
isResponse: (msg) => msg.op === "pong",
},
onMessage: (message) => {
if (message.op === "order.create") {
// handle order response
}
},
onReconnectSuccess: () => {
console.log("Reconnected and re-authenticated");
},
onNotify: async (message) => {
await sendTelegramAlert(message);
},
});
// Send an order and await its specific response by reqId
const sendOrder = async (orderParams: Record<string, unknown>) => {
const reqId = `req_${Date.now()}`;
controller.sendToConnectedSocket({
reqId,
op: "order.create",
args: [orderParams],
});
return controller.waitForMessage((message) => message.reqId === reqId, 30000);
};API
createReliableWebSocket<TMessage>(args)
Returns a ReliableWebSocketController<TMessage>.
Arguments
| Property | Type | Required | Description |
|----------|------|----------|-------------|
| url | string | Yes | WebSocket URL |
| label | string | Yes | Identifier for logs and notifications |
| logger | WebSocketLogger | Yes | Logger with debug, info, warn, error, fatal |
| onMessage | (message: TMessage) => void | Yes | Called for each incoming message (not intercepted by waitForMessage or heartbeat) |
| parseMessage | (rawData: RawData) => TMessage | No | Parse raw data to TMessage; default: pass-through |
| onOpen | (context: WebSocketOpenContext<TMessage>) => Promise<void> | No | Async setup phase after connect (e.g. auth). Connection is not considered ready until this resolves. |
| onReconnectSuccess | () => void | No | Called after a successful reconnection (not on first connect) |
| onNotify | (message: string) => void \| Promise<void> | No | Called on connection issues and before process exit |
| heartbeat | WebSocketHeartbeatOptions<TMessage> | No | Application-level heartbeat (JSON ping/pong). When provided, TCP ping is disabled. |
| configuration | Partial<WebSocketConfiguration> | No | Override default timeouts and retry behaviour |
Controller
| Method | Description |
|--------|-------------|
| close() | Stops reconnection, clears timers, rejects pending waiters, closes the socket |
| getStatus() | Returns current WebSocketStatus |
| sendToConnectedSocket(data) | Send data; string is sent as-is, anything else is JSON.stringify-ed. Throws if not connected. |
| waitForMessage(predicate, timeoutMilliseconds) | Returns a Promise<TMessage> that resolves with the first incoming message matching predicate. The message is not passed to onMessage. Rejects on timeout or connection close. |
WebSocketStatus
CONNECTING— initial connection attemptCONNECTED— connected (and auth passed ifonOpenwas provided)DISCONNECTED— disrupted, reconnect scheduledRECONNECTING— reconnect in progress (includesonOpenphase)FAILED— closed by user or max retries exceeded
WebSocketOpenContext
Passed to onOpen:
| Property | Description |
|----------|-------------|
| send | Send to the open socket (for use during onOpen) |
| waitForMessage | Same as controller.waitForMessage |
WebSocketHeartbeatOptions
| Property | Description |
|----------|-------------|
| buildPayload | Returns the JSON object to send as a ping |
| isResponse | Returns true if the message is a pong. Matching messages are not passed to onMessage. |
Configuration (configuration)
| Option | Default | Description |
|--------|---------|-------------|
| maxRetryAttempts | 15 | Max reconnection attempts before process exit |
| initialRetryDelay | 1000 | Initial delay (ms) for exponential backoff |
| maxRetryDelay | 30000 | Cap (ms) for backoff delay |
| retryDelayMultiplier | 1.8 | Backoff multiplier |
| connectionTimeout | 30000 | Handshake timeout (ms) |
| pingInterval | 15000 | Ping interval (ms) |
| pongTimeout | 10000 | Pong wait timeout (ms) |
| heartbeatGracePeriod | 3000 | Delay before first ping (ms) |
| fastReconnectCodes | [1001, 1006, 1011, 1012, 1013, 1014] | Close codes that use short reconnect delay |
| missedPongThreshold | 3 | Missed pongs before terminating connection |
Behaviour
- On close or error, reconnection is scheduled with exponential backoff.
- If
onOpenthrows, the connection is terminated and reconnect is triggered (with the same backoff logic). Retry counters are only reset afteronOpenresolves successfully. - After maxRetryAttempts failed attempts,
onNotifyis awaited with a critical message, thenprocess.exit(1)runs (PM2 or similar will restart the app). waitForMessagepending promises are rejected when the connection is disrupted orclose()is called.- Heartbeat response messages and
waitForMessage-intercepted messages are never passed toonMessage.
License
ISC
