websocket-cross-server-adapter
v1.0.7
Published
A Node.js based WebSocket distributed communication framework that enables seamless multi-server collaboration for real-time communication, cross-server event handling, and scalable applications.
Maintainers
Keywords
Readme
🚀 一款基于 Node.js、支持多服务器协同通信的 WebSocket 分布式框架
目录
为什么要做这个框架?
原生 ws 只是通信基础,心跳、重连、消息回调和房间路由都得自己实现。
Node.js 单线程和内存限制,让它难以应付大量连接和复杂业务。要支持多进程或者分布式的多服务器协同和房间管理,必须采取分布式的架构,这就是做这个框架的原因。
实现原理
WebSocketCrossServerAdapter(服务端通信核心)
该适配器基于 Redis 的发布订阅机制,实现跨服务器的消息广播与事件同步,支持多节点去中心化通信,具备健康监测和自动恢复,保障高可用性。支持单服务器多进程及跨物理服务器部署,便于弹性扩展。
主要功能:
- 跨节点事件通信,支持回调/Promise
- 动态管理 Redis 节点,支持压缩传输
- 分布式房间广播和客户端追踪
- 本地优先响应,自动路由目标节点
- 热插拔扩容,无需重启
消息发送支持:
- 全局广播
- 单客户端精准发送
- 批量 socketId 发送
- 分布式房间广播
支持房间命名空间管理和跨节点统计(在线用户、房间人数等)。所有事件处理器可任意节点注册,跨节点事件可直接回调客户端,无需中转。
WebSocketConnector(客户端连接管理器)
一个轻量级、简洁的 WebSocket 客户端类,适用于任何基于标准 WebSocket 协议的平台,例如浏览器、Node.js、Electron、React Native、移动 App、小程序、Cocos Creator 等环境。内置心跳机制、断线重连、事件回调、延迟反馈等功能,逻辑清晰、易于集成,压缩后体积仅约 5KB,适合各类实时通信场景的前端接入。
支持功能:
- 断线重连
- 心跳保活机制
- 网络延迟检测(基于 ping-pong 实现)
- emit 支持回调与超时处理
- 延迟响应回调(可用于展示 loading 等)
- 支持参数注入(URL)
开始使用
npm install websocket-cross-server-adapter接口文档
使用示例
一. 单 WebSocket 服务器模式(非分布式)
如果你的项目仅需传统的单 WebSocket 服务器模式,则无需使用 Redis,也无需进行任何额外的分布式配置。
你只需像使用原生 ws 模块那样传入配置信息即可。框架会自动以单服务器模式运行。
传入的 ws 配置应使用对象形式,并遵循 ws 模块官方文档 中的配置说明。
server.js:
// server.js:
// 如果你不是在示例文件夹下运行,请将 require 地址换成包名:
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
// 如果你用的是 ES Module,可以这样写:
// import { WebSocketCrossServerAdapter } from 'websocket-cross-server-adapter';
// 默认的配置端口
let port = 9000;
// 解析命令行参数,可以在node命令行加上以下参数,动态配置prot,例如:node server --port=9001
const args = process.argv.slice(2);
args.forEach(arg => {
if (arg.startsWith('--port=')) {
port = parseInt(arg.split('=')[1], 10);
}
});
console.log(`Using configured values - port: ${port}`);
const wsServer = new WebSocketCrossServerAdapter({
wsOptions: {
port
}
});
wsServer.onWebSocketEvent('connection', (socket, req) => {
console.log('Client connection');
// 使用 WebSocketCrossServerAdapter 的辅助方法 parseWsRequestParams 解析 req 对象,
// 获取客户端通过 WebSocketConnector 类创建连接时配置的参数信息,比如 token、自定义参数等等。
const data = wsServer.parseWsRequestParams(req);
console.log('Connection params:', data);
// ✅ 使用客户端传来的 id 建立映射。实际业务中应在此处进行完整的身份验证(如 token 鉴权)。
// 例如可使用 jsonwebtoken 模块校验 data.token,并根据验证结果决定是否继续。
// 然而,我们更推荐在 noServer 模式下,在 WebSocket 协议升级阶段就完成鉴权逻辑,效率更高、也更安全。
// ws 官方虽然提供了 verifyClient 参数用于连接时鉴权,但该 API 已不推荐使用,并可能在未来版本中移除。
// 👉 建议查阅 ws 官方文档中的 noServer 模式以及 `server.on('upgrade')` 相关用法,了解推荐的鉴权方式。
// 此处为了演示方便,仅直接使用客户端传来的 id。
if (data.params.id) {
const playerId = String(data.params.id);
console.log('The client’s ID is:' + playerId);
// 把 id 存储到 socket.playerId 中。具体存法请根据自身业务决定,
// 比如 socket.player = { playerId, name } 等等。
// 总之需确保能从 socket 上获取到该连接的唯一身份标识。
socket.playerId = playerId;
// 必须建立 id(必须为字符串类型)与 socket 实例的映射,
// 后续房间广播、单点、多点推送才能找到对应实例。
wsServer.setUserSocket(playerId, socket);
} else {
// 模拟鉴权失败,使用自定义关闭码(4011)关闭连接。
// 这里的代码应根据自身业务逻辑定义。
// 详细查看 API 客户端关于 close 事件部分解释。
socket.close(4011, 'Auth failure');
}
});
wsServer.onWebSocketEvent('close', (socket, req) => {
console.log('Client disconnected,id:' + socket.playerId);
if (socket.playerId) {
// 客户端断开连接时,请务必删除 ID 和 socket 实例的映射,
// 否则 socket 实例可能无法被释放,导致内存泄漏。
wsServer.removeUserSocket(socket.playerId);
}
});
wsServer.onWebSocketEvent('say', (socket, data, callback) => {
console.log(`Received 'say' event from client ${socket.playerId}:`, data);
if (callback) {
// 如果客户端使用 emit 的时候带有回调,或者使用 emitWithPromise 发送消息,
// 此时 callback 会为有效函数,此处可调用 callback 回传结果给客户端。
callback({ msg: 'I am a callback for your say event' });
}
});
wsServer.onWebSocketEvent('joinRoom', (socket, data, callback) => {
console.log(`Received 'joinRoom' event from client ${socket.playerId}:`, data);
if (socket.playerId) {
// 模拟加入testRoom,id为1000的房间
wsServer.joinRoom('testRoom', '1000', socket.playerId);
}
if (callback) {
callback({ msg: 'JoinRoom successfully' });
}
});
// 模拟定时发送广播
setInterval(() => {
wsServer.broadcast('serverSay', { msg: 'I’m sending this message to everyone' });
}, 15_000)
// 模拟定时向测试房间发送消息
setInterval(() => {
wsServer.broadcastToRoom('testRoom', '1000', 'roomSay', { msg: 'This is a message sent to the test room' });
},10_000)
client.js:
// client.js:
// const { WebSocketConnector } = require('websocket-cross-server-adapter');
const WebSocketConnector = require('../../src/WebSocketConnector');
// 默认的配置端口和客户端id
let port = 9000;
let id = 1;
// 解析命令行参数,可以在node命令行加上以下参数,动态配置prot和id,例如:node client --id=16 --port=9001
const args = process.argv.slice(2);
args.forEach(arg => {
if (arg.startsWith('--port=')) {
port = parseInt(arg.split('=')[1], 10);
} else if (arg.startsWith('--id=')) {
id = arg.split('=')[1];
}
});
console.log(`Using configured values - port: ${port}, id: ${id}`);
const client = new WebSocketConnector({
url: `ws://localhost:${port}`,
customParams: {
name: 'Sam',
id
},
// 可以通过关闭服务器端来测试以下不同参数设置时候的重连效果
//repeatLimit: 5,
//fastReconnectThreshold: 1,
});
client.on('open', () => {
console.log('Connect success')
})
client.on('close', (event) => {
console.log('onClose event:', event.code, event.reason);
if (event.code === 4001 ||
event.code === 4010 ||
event.code === 4011 ||
event.code === 4012
) {
// 手动断开连接或服务器在特定情况下强制注销 — 不应尝试重连
console.log('Connection closed manually or by forced logout/auth failure. No reconnection.');
// 虽然连接已关闭,但仍需禁止自动重连,并清理所有计时器和 WebSocket 实例等资源。
client.manualClose();
} else {
// 其他情况下,应手动触发重连
client.reconnect();
}
})
client.on('error', (event) => {
console.log('Connect on error');
});
client.on('reconnect', ({ repeat, timeout }) => {
console.log('Preparing for reconnection attempt #' + repeat + ', actual reconnection will occur in ' + timeout + ' ms');
})
client.on('repeat-limit', (repeatLimit) => {
console.log('Reached maximum reconnection attempts: ' + repeatLimit);
})
client.on('serverSay', (data) => {
console.log('Received serverSay event:');
console.log(data)
})
client.on('roomSay', (data) => {
console.log('Received roomSay event:');
console.log(data)
})
client.on('ping', () => {
console.log('Go to ping....')
})
client.on('pong', (speed) => {
// 在pong事件中可以测得当前网络延迟
console.log(`Network latency: ${speed} ms`);
})
setTimeout(async () => {
// 使用 Promise 方式发送带有回调的事件
let data = await client.emitWithPromise('say', { msg: 'I am a client with ID: ' + id + ', and I need your promise callback.' }, {
onPending: () => {
console.log('requesting...')
}
});
console.log('Received promise response:');
console.log(data);
}, 2000);
setTimeout(() => {
// 使用 callback 方式发送带有回调的事件
client.emit('say', { msg: 'I am a client with ID: ' + id + ', and I need your callback.' }, (err, data) => {
if (err) {
console.log('Callback error occurred');
console.log(err)
} else {
console.log('Received callback response:');
console.log(data)
}
}, {
onPending: () => {
console.log('requesting...')
},
callbackTimeout: 1000
})
}, 4000);
setTimeout(() => {
// 模拟加入测试房间
client.emit('joinRoom', { msg: 'I want to join the test room' }, (err, data) => {
if (err) {
console.log('JoinRoon Callback error occurred');
console.log(err)
} else {
console.log('JoinRoon Received callback response:');
console.log(data)
}
})
}, 6000);
使用方法
- 安装依赖
请在项目主目录下执行以下命令,安装所需依赖:
npm install- 进入示例目录
进入 examples/single-ws-server 目录:
cd examples/single-ws-server- 启动Websocket服务器
默认端口启动:
node server或者自定义端口启动:
node server --port=9001
- 启动客户端
默认配置:
node client
或者指定客户端 ID 和端口启动:
node client --id=16 --port=9001
⚠️ 注意:每个客户端的 id 必须唯一,不能重复,否则将导致连接冲突。
你可以通过使用不同的 id 启动多个客户端,以观察各种事件情况。 还可以通过关闭服务器来测试断线场景,观察客户端的重连事件信息,然后再重启服务器,以模拟以下流程: 断线 → 重连中 → 成功重新连接
补充说明1:定向发送消息
如果你想测试单点定向发送消息或多点定向发送消息的能力,
请参考 API 文档中关于以下函数的说明并自行测试:
它们支持向特定的客户端 Socket 连接发送事件消息。
补充说明2:WebSocket 启动方式(noServer / 指定已有 Server)
除了默认监听端口启动外,WebSocket 服务器还支持以下两种方式启动:
✅ 1. 使用已有 HTTP(S) Server 启动 WebSocket(共享端口)
当使用已有的 HTTP 或 HTTPS 服务器启动 WebSocket 服务时,WebSocket 将会与 HTTP(S) 共用同一个端口。
这是通过 HTTP 协议的“协议升级”(Protocol Upgrade)机制实现的。
- WebSocket 客户端最初会发送一个普通的 HTTP 请求,请求头中包含
Upgrade: websocket字段; - HTTP(S) 服务器接收到该请求后,会将连接“升级”为 WebSocket 协议;
- 此时由
ws.Server实例接管连接处理逻辑; - 最终,HTTP 请求和 WebSocket 连接共享同一个 TCP 端口(例如 8080 或 443)。
这种方式特别适用于你希望 Web 应用(如网页、API)和 WebSocket 服务共用同一个端口 的场景,可以避免占用多个端口,方便部署与管理。
详情请查看官方文档:ws GitHub - External HTTPS Server
你可以传入已有的 HTTP Server 实例启动 WebSocket 服务:
const http = require('http');
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
const server = http.createServer();
const wsServer = new WebSocketCrossServerAdapter({
wsOptions: {
server
}
});
server.listen(9000, () => {
console.log('Server is running on port 9000');
});
wsServer.onWebSocketEvent('connection', (socket, req) => {
console.log('Client connection');
})
// ............................其他逻辑相同
✅ 2. 使用 noServer 模式(手动处理 upgrade 请求)
你可以通过 noServer 模式手动处理 HTTP 升级请求。这种方式适用于你希望完全控制 HTTP 服务和升级流程的场景,例如在一个服务器上同时处理 HTTP 请求和 WebSocket 连接。 适用于: 与现有 HTTP(S) 服务共用端口 需要自定义认证、权限验证等逻辑 更精细地控制连接行为
📚 详情请查看官方文档:
ws GitHub - noServer Mode
const http = require('http');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const server = http.createServer();
const wsServer = new WebSocketCrossServerAdapter({
wsOptions: {
noServer: true
}
});
server.listen(9000, () => {
console.log('Server is running on port 9000');
});
server.on('upgrade', (req, socket, head) => {
// 1. 检查 Upgrade 头必须是 websocket
if (req.headers['upgrade']?.toLowerCase() !== 'websocket') {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n');
socket.destroy();
return;
}
const data = wsServer.parseWsRequestParams(req);
console.log('传递参数是:')
console.log(data)
const id = data.params.id;
console.log("连接的客户端id:" + id);
if (id) {
// 获取 wsServer 中的 WebSocket.Server 实例,并处理 WebSocket 协议升级请求
wsServer.getWss()?.handleUpgrade(req, socket, head, (ws) => {
// 模拟完成鉴权,绑定 playerId 到该 WebSocket 实例上
ws.playerId = String(id);
// 手动触发 'connection' 事件,使该连接走统一的连接处理逻辑
wsServer.getWss()?.emit('connection', ws, req);
})
} else {
// 模拟鉴权失败,返回 401 错误并关闭连接
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); // 发送拒绝连接的 HTTP 响应
socket.destroy(); // 销毁连接
}
});
wsServer.onWebSocketEvent('connection', (socket, req) => {
console.log('Client connection');
console.log('客户端id:' + socket.playerId);
//....................其他逻辑相同
})
// ............................其他逻辑相同✅ WebSocket 鉴权推荐方式
在真实业务场景中,建议在客户端发起连接请求时即完成用户身份认证,服务器在接收到连接请求时验证身份信息。 不要等连接建立成功后再进行鉴权然后断开,这样会导致服务器资源被不必要地占用,增加安全风险。如果必须在连接成功后进行鉴权,请务必实现认证超时关闭机制,或者定期检查并清理无效连接,防止服务器资源被恶意或无效连接耗尽。
推荐使用如 jsonwebtoken 等模块,对请求中携带的 token 进行验证。
同时,建议在正式发起 WebSocket 连接之前,先通过 HTTP 接口进行身份验证。
这是因为在 WebSocket 协议升级过程中,服务器返回的鉴权失败信息在不同平台和客户端的表现不一致,
很多情况下客户端无法准确接收到具体的错误状态和原因,导致重连或错误处理复杂且不可靠。
通过预先的 HTTP 鉴权,可以避免这些问题,提高客户端的用户体验和连接稳定性。
💡 示例总结
上述示例完整展示了在 非分布式架构下,使用单 WebSocket 服务器 进行通信的典型场景与关键能力
二. 跨服务通信模块(纯服务端通信)
在完成了第一章节中单 WebSocket 服务器的通信逻辑后,我们将进入服务端之间的通信范式 —— 跨服务器通信模块(CrossServer)。
该示例不依赖 WebSocket,仅聚焦于分布式环境中服务节点之间如何稳定、高效地进行消息传递与回调处理。
该模块涵盖以下关键能力:
- 节点间事件广播与接收
- 定向发送与全局广播机制
- 跨服务器的请求-响应流程(支持 Promise)
- 基于事件名的统一调度系统
- 错误与超时处理机制
适用场景:
适用于不同进程或跨物理机器的服务之间通信,例如:
- HTTP 服务器 与 图片服务器
- 主业务服务器 与 文件存储服务器
- 网关服务 与 AI 推理服务
- 多个逻辑节点之间的事件驱动通信
这为解耦系统架构、构建微服务体系提供了通用的通信机制。
💡 该模块是构建
WebSocketCrossServerAdapter的基础部分,理解此机制将帮助你深入掌握后续跨服通信的底层逻辑。
安装 Redis(Install Redis)
在使用本项目之前,你需要提前安装好 Redis 服务。 安装教程或相关资源:
- Redis 官网:https://redis.io/docs/getting-started/installation/
- Redis GitHub:https://github.com/redis/redis
- Redis Windows 安装包(Windows Build):https://github.com/tporadowski/redis/releases
安装完成 Redis 后,启动 redis 服务:
redis-server或者指定配置文件启动(windows平台)
redis-server redis.windows.conf你可以通过以下方式测试是否启动成功:
redis-cli ping如果返回:
PONG说明 Redis 服务已成功启动并正常运行。
启动多个 Redis 实例
你可以通过复制并修改 Redis 配置文件来启动多个实例,每个实例监听不同的端口。
示例步骤:
- 复制默认配置文件(假设在 Linux/macOS):
cp /etc/redis/redis.conf /etc/redis/redis-6380.conf
cp /etc/redis/redis.conf /etc/redis/redis-6381.conf- 修改新配置文件中的端口(如 redis-6380.conf):
port 6380- 启动多个 Redis 实例,指定对应配置文件:
redis-server /etc/redis/redis-6380.conf
redis-server /etc/redis/redis-6381.conf- 你也可以直接用命令行参数启动不同端口(适合测试):
redis-server --port 6380
redis-server --port 6381Windows 同样,通过复制并修改配置文件中的端口,运行多个 redis-server 进程:
redis-server redis-6380.conf
redis-server redis-6381.conf或者直接开启多个:
redis-server --port 6380
redis-server --port 6381注意事项
- 每个实例必须使用不同的端口。
- 如果需要开启远程访问,请参考官方配置文件说明,修改
bind配置项以允许对应主机连接。
本框架底层使用 ioredis 作为 Redis 客户端,所有 Redis 相关配置参数均直接传递给 ioredis。 具体的配置选项和使用方法,请参考 ioredis 官方文档 以获取详细说明和最佳实践。
- ioredis GitHub 仓库:https://github.com/redis/ioredis
示例开始
cserver.js:
// cserver.js
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
// 填入你的 Redis 配置信息,支持多个实例,请确保 Redis 服务已启动
// 支持多个 Redis 节点,如果使用多个节点,则每次发布会根据设置的策略选择其中的一个节点进行发布,
// 从而实现“负载均衡”。不同的策略含义请参考 API 文档。
// 内部会维护各个节点的健康状态。
// 重要提示:至少需要提供一个 Redis 节点,跨服务通信才能正常工作。
const redisConfig = [
{ port: 6379, host: '127.0.0.1' },
//{ port: 6380, host: '127.0.0.1' },
// 可以添加更多节点
];
// 请务必确保启动多个服务器时,每个服务器的名称都唯一,避免冲突
let serverName = 'serverA';
// 解析命令行参数,可以在node命令行加上以下参数,动态配置serverName,例如:node cserver --name=serverA
const args = process.argv.slice(2);
args.forEach(arg => {
if (arg.startsWith('--name=')) {
serverName = arg.split('=')[1];
}
});
console.log(`Using configured values - serverName: ${serverName}`);
const crossServer = new WebSocketCrossServerAdapter({
redisConfig,
serverName,
// 注册监听redis节点的健康状态的事件函数,当健康状态发生变化的时候将触发
onRedisHealthChange: (health, info) => {
console.log(`Node health status changed:${health}`, info);
},
// 当频道订阅发生错误的时候触发,info对象包含:
onRedisSubscriptionError: (info) => {
console.log('onRedisSubscriptionError:', info);
}
});
// 注册跨服务器事件监听
crossServer.onCrossServerEvent('say', (data, callback) => {
console.log('Received "say" event from another server:', data);
// 如果发送方通过 callback 或 Promise 方式发送消息,则此时 callback 为有效函数,可以直接调用以回调响应结果
if (callback) {
callback({ msg: `Hi, this is server ${crossServer.getServerName()} responding to you` })
}
})
sender.js:
// sender.js
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
const redisConfig = [
{ port: 6379, host: '127.0.0.1' },
//{ port: 6380, host: '127.0.0.1' },
];
let serverName = 'senderA';
const args = process.argv.slice(2);
args.forEach(arg => {
if (arg.startsWith('--name=')) {
serverName = arg.split('=')[1];
}
});
console.log(`Using configured values - serverName: ${serverName}`);
const crossServer = new WebSocketCrossServerAdapter({
redisConfig,
serverName,
onRedisHealthChange: (health, info) => {
console.log(`Node health status changed:${health}`, info);
},
onRedisSubscriptionError: (info) => {
console.log('onRedisSubscriptionError:', info);
}
});
// 注册跨服务器事件监听
// 如果发送目标 targetServer 包含了自己(即全局广播时没有排除自己,或者 targetServer 中包含了自己的 serverName),
// 那么本服务器也会响应自己发送的该事件。
// 该事件响应会直接在本地上下文中执行,不经过 Redis 频道中转。
// 因此,开发者无需为本地事件做额外处理,所有优化均由内部机制自动完成。
crossServer.onCrossServerEvent('say', (data, callback) => {
console.log('Received "say" event from another server:');
console.log(data);
if (callback) {
callback({ msg: `Hi, this is server ${crossServer.getServerName()} responding to you` })
}
})
// 发送say事件消息,不需要任何回调
setTimeout(() => {
crossServer.emitCrossServer('say', {
content: `Hi everyone, I am ${crossServer.getServerName()}`
},null, {
targetServer: [],
})
}, 3000);
// 以callback的方式发送say事件消息,需要目标服务器回调
setTimeout(() => {
// 每当接收到一个服务器响应都会执行一次回调函数
crossServer.emitCrossServer('say', {
content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback.`
}, (result) => {
console.log('Callback response result:', result);
if (result.success) {
console.log('Received server callback:', result.data);
console.log('Number of servers yet to respond:', result.remainingResponses);
} else {
// Timed out before collecting all responses
console.log('Error message:', result.error);
console.log('Number of servers that did not respond:', result.unrespondedCount);
}
}, {
targetServer: [],
expectedResponses: 3,
// exceptSelf: false,
// timeout: 2000,
})
}, 6000);
// 以Promise的方式发送say事件消息,需要目标服务器回调
setTimeout(async () => {
// Promise 会在所有预期响应(expectedResponses)全部完成后解决(resolved),如果超时未收到全部响应,也会解决,但此时 result.success 为 false。
let result = await crossServer.emitCrossServerWithPromise('say', {
content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback for the promise.`
}, {
targetServer: [],
expectedResponses: 3,
// exceptSelf: true,
// timeout: 2000,
})
console.log('Promise response result:', result);
if (result.success) {
console.log('All expected nodes responded:', result.responses);
} else {
console.log('Nodes that have responded so far:', result.responses);
console.log('Number of servers that did not respond: ' + result.unrespondedCount);
}
// 也可以使用then的方式
// crossServer.emitCrossServerWithPromise('say', {
// content: `Hi everyone, I am ${crossServer.getServerName()}, please respond with your callback for the promise.`
// }, {
// targetServer: [],
// expectedResponses: 3,
// // exceptSelf: true,
// // timeout: 2000,
// }).then((result) => {
// })
}, 15_000);
使用方法
请务必确保 Redis 服务已启动,且监听端口为 6379,本示例运行前需满足此条件。
- 请确保你已经在项目主目录执行过 npm install,以安装所需依赖。否则后续命令可能无法正常运行。
npm install- 进入 examples/cross-server 目录:
cd examples/cross-server- 快速启动多个服务器(推荐)
concurrently是一个工具,可以一条命令同时启动多个服务器实例:
npx concurrently "node cserver --name=serverA" "node cserver --name=serverB" "node cserver --name=serverC" "node cserver --name=serverD" "node cserver --name=serverE"📌 注意:虽然它们共用一个终端窗口输出日志,但每个服务器仍然是独立的 Node.js 进程,彼此之间完全隔离,只是 concurrently 将它们的控制台输出集中显示,便于观察。
- 手动启动服务器(更直观)
如果你希望每个服务器运行在自己的独立终端窗口中,便于查看日志或调试,可以分别手动启动: 启动一个默认服务器:
node cserver
或者启动一个带自定义名称的服务器:
node cserver --name=serverB
⚠️ 每个服务器名称必须唯一,这是保证分布式系统正常运行的前提,否则可能导致节点识别冲突或消息路由错误。
- 启动消息发送服务器
用于测试跨服务器通信的发送端:
node sender
或使用自定义名称:
node sender --name=senderB
成功启动后,你可以看到多个服务器之间的事件通信、回调响应等输出结果,验证系统的分布式通信能力。
你可以使用不同的参数配置来进行测试,例如:
排除自己不接收消息
指定目标服务器发送消息
设置超时时间
设置预期响应服务器个数
targetServer: []
空数组表示广播模式,所有服务器都将接收到消息。此时可配合exceptSelf: true来排除当前服务器自身不接收消息。targetServer: ['serverA', 'serverB']
指定目标服务器名称(支持多个),可实现定向发送消息,仅目标服务器会接收到事件。
更多细节请参考 API 文档中
emitCrossServer 与
emitCrossServerWithPromise。
跨服务通信示例总结
通过使用 WebSocketCrossServerAdapter 的跨服务器通信功能,你可以轻松实现多进程或分布式环境下各个服务器节点之间的高效通信。 无论是定向消息发送、广播消息、回调机制,还是多节点响应统计等多种场景需求,都能被很好地支持,助力构建稳定且灵活的分布式系统。
三. WebSocket + CrossServer 分布式通信示例(跨服务场景)
在前两个章节中,我们已经实现了以下功能:
单 WebSocket 服务器模式(非分布式)
展示了如何使用 WebSocket 在单一服务实例中进行客户端通信,包括事件监听、消息发送和回调响应等基本操作。跨服务器通信模块(纯服务端通信)
展示了不同服务节点之间如何通过 Redis 实现事件广播、定向发送和异步回调处理。
接下来我们将进入更高级的场景:将 WebSocket 与 CrossServer 模块结合,实现真正意义上的WebSocket 分布式通信。
示例开始
wsserver.js:
// wsserver.js
// const { WebSocketCrossServerAdapter } = require('websocket-cross-server-adapter');
const WebSocketCrossServerAdapter = require('../../src/WebSocketCrossServerAdapter');
const redisConfig = [
{ port: 6379, host: '127.0.0.1' },
//{ port: 6380, host: '127.0.0.1' },
];
const args = process.argv.slice(2);
let port = 9000;
let serverName = 'serverA';
args.forEach(arg => {
if (arg.startsWith('--port=')) {
port = parseInt(arg.split('=')[1], 10);
} else if (arg.startsWith('--name=')) {
serverName = arg.split('=')[1];
}
});
console.log(`Using configured values - serverName: ${serverName},port: ${port}`);
if (!(port && serverName)) {
throw new Error("Invalid port or server name");
}
const wsCrossServer = new WebSocketCrossServerAdapter({
redisConfig,
serverName,
wsOptions: {
port
}
});
wsCrossServer.onWebSocketEvent('connection', async (socket, req) => {
const data = wsCrossServer.parseWsRequestParams(req);
console.log(`[${wsCrossServer.getServerName()}] Client Connection params:`, data);
if (data.params.id) {
const playerId = String(data.params.id);
socket.playerId = playerId;
wsCrossServer.setUserSocket(playerId, socket);
} else {
socket.close(4011, 'Auth failure');
}
})
wsCrossServer.onWebSocketEvent('close', async (socket, req) => {
console.log(`[${wsCrossServer.getServerName()}] Client ${socket.playerId} disconnected`);
if (socket.playerId) {
wsCrossServer.removeUserSocket(socket.playerId);
}
})
wsCrossServer.onWebSocketEvent('joinRoom', (socket, data, callback) => {
if (socket.playerId && data && data.roomId) {
console.log(`[${wsCrossServer.getServerName()}] Client ${socket.playerId} wants to join room ${data.roomId}`);
wsCrossServer.joinRoom('chat', String(data.roomId), socket.playerId);
callback?.({ msg: 'Successfully joined the roomId:' + data.roomId });
} else {
callback?.({ msg: 'Failed to join the room' });
}
});
wsCrossServer.onWebSocketEvent('command', (socket, data, callback) => {
console.log(`[${wsCrossServer.getServerName()}] Received 'command' event from client ${socket.playerId}:`, data);
if (!data || typeof data.action !== 'string') {
callback?.({ msg: 'Failed to send message' });
return;
}
const { action, msg, toPlayerId, toPlayerIds, roomId } = data;
switch (action) {
case 'broadcast':
wsCrossServer.broadcast('say', { action, msg });
break;
case 'toPlayer':
if (toPlayerId) {
wsCrossServer.toSocketId(String(toPlayerId), 'say', { action, msg });
}
break;
case 'toPlayers':
if (Array.isArray(toPlayerIds)) {
wsCrossServer.toSocketIds(toPlayerIds, 'say', { action, msg });
}
break;
case 'toRoom':
if (roomId) {
wsCrossServer.broadcastToRoom('chat', String(roomId), 'say', { action, msg });
}
break;
default:
callback?.({ msg: 'Unknown action type' });
return;
}
callback?.({ msg: `Message sent successfully [action:${action}] ` });
});
clients.js:
// clients.js
// const { WebSocketConnector } = require('websocket-cross-server-adapter');
const WebSocketConnector = require('../../src/WebSocketConnector');
const totalClients = 50;
const basePort = 9000;
const portRange = 5;
// 随机决定要发送加入房间消息的客户端数量
const joinRoomCount = 10;
const joinRoomClientIds = new Set();
// 随机挑选10个客户端id
while (joinRoomClientIds.size < joinRoomCount) {
joinRoomClientIds.add(Math.floor(Math.random() * totalClients) + 1);
}
// 预定义一些房间ID
const roomIds = ['1000', '1001', '1002'];
// 模拟多个客户端加入不同的ws服务器
for (let i = 0; i < totalClients; i++) {
const port = basePort + (i % portRange);
const id = i + 1;
const client = new WebSocketConnector({
url: `ws://localhost:${port}`,
customParams: {
id: id
}
});
client.on('open', () => {
console.log(`[Client ${id},port:${port}] Connect success`)
// 如果这个客户端在随机加入列表里,发送加入房间消息
if (joinRoomClientIds.has(id)) {
client.emit('joinRoom', { roomId: roomIds[Math.floor(Math.random() * roomIds.length)] }, (err, data) => {
if (err) {
console.log(`[Client ${id},port:${port}] JoinRoom Callback error occurred`);
console.log(err)
} else {
console.log(`[Client ${id},port:${port}] Received joinRoom callback response:`);
console.log(data)
}
})
}
})
client.on('close', (event) => {
console.log(`[Client ${id},port:${port}] onClose event:`, event.code, event.reason);
})
client.on('say', (data) => {
console.log(`[Client ${id},port:${port}] Received say event:`, data);
});
}
boss.js:
// boss.js
// const { WebSocketConnector } = require('websocket-cross-server-adapter');
const WebSocketConnector = require('../../src/WebSocketConnector');
let port = 9000;
let id = 555;
const args = process.argv.slice(2);
args.forEach(arg => {
if (arg.startsWith('--port=')) {
port = parseInt(arg.split('=')[1], 10);
} else if (arg.startsWith('--id=')) {
id = arg.split('=')[1];
}
});
console.log(`Using configured values - port: ${port}, id: ${id}`);
const client = new WebSocketConnector({
url: `ws://localhost:${port}`,
customParams: {
id
}
});
client.on('open', () => {
console.log('Connect success')
})
client.on('close', (event) => {
console.log('onClose event:', event.code, event.reason);
})
client.on('say', (data) => {
console.log(`Received say event:`, data);
});
setTimeout(async () => {
client.emit('command', { action: 'broadcast', msg: 'Hello every one' }, (err, data) => {
if (err) {
console.log('Callback error occurred');
console.log(err)
} else {
console.log('Received callback response:');
console.log(data)
}
})
}, 6_000);
setTimeout(async () => {
client.emit('command', { action: 'toPlayer', msg: 'Hello player 13 ', toPlayerId: '13' }, (err, data) => {
if (err) {
console.log('Callback error occurred');
console.log(err)
} else {
console.log('Received callback response:');
console.log(data)
}
})
}, 9_000);
setTimeout(async () => {
client.emit('command', { action: 'toPlayers', msg: 'Hello group players ', toPlayerIds: ['3','10','25','37'] }, (err, data) => {
if (err) {
console.log('Callback error occurred');
console.log(err)
} else {
console.log('Received callback response:');
console.log(data)
}
})
}, 12_000);
setTimeout(async () => {
client.emit('command', { action: 'toRoom', msg: 'Hello room players ', roomId: '1000' }, (err, data) => {
if (err) {
console.log('Callback error occurred');
console.log(err)
} else {
console.log('Received callback response:');
console.log(data)
}
})
}, 15_000);
使用方法
请务必确保 Redis 服务已启动,且监听端口为 6379,本示例运行前需满足此条件。
- 启动五个 WebSocket 服务器
进入 examples/ws-cross-server 目录,使用以下命令通过 concurrently 同时启动五个不同名称和端口的 WebSocket 服务实例:
npx concurrently "node wsserver --name=serverA --port=9000" "node wsserver --name=serverB --port=9001" "node wsserver --name=serverC --port=9002" "node wsserver --name=serverD --port=9003" "node wsserver --name=serverE --port=9004"注意:concurrently 会将多个服务器日志集中显示在一个终端窗口中。如果你更喜欢每个服务器拥有独立窗口,可以手动分别执行下面的命令启动:
node wsserver --name=serverA --port=9000
node wsserver --name=serverB --port=9001
node wsserver --name=serverC --port=9002
node wsserver --name=serverD --port=9003
node wsserver --name=serverE --port=9004请确保每个服务器的名称唯一,避免节点名称冲突。
- 启动模拟客户端
执行以下命令启动 50 个模拟客户端,这些客户端会随机连接到上述任意一个服务器,并随机把一部分客户端加入到测试房间。
node clients- 启动模拟控制端发送指令
使用下面命令启动控制端客户端,用于模拟发送广播、点对点、群发、房间消息等多种指令:
node boss预期效果
运行上述示例后,你将观察到以下分布式通信特性生效:
- 即使客户端连接在不同的 WebSocket 服务器节点上,也能:
- ✅ 接收到全局广播消息(如
broadcast) - ✅ 正确收到点对点消息(如
toPlayer指定玩家) - ✅ 支持群发消息到多个指定客户端(如
toPlayers) - ✅ 成功接收房间内的定向消息(如
toRoom)
- ✅ 接收到全局广播消息(如
这些特性表明:
- 各 WebSocket 服务器节点之间通过 Redis 实现了事件同步与消息路由。
- 分布式环境下,消息发送逻辑与单服务器模式几乎保持一致,开发者无需额外关注服务器部署细节。
- 整个系统具备了真正意义上的 WebSocket 分布式通信能力。
示例总结
通过以上三个章节的示例,你可以循序渐进地从 单机 WebSocket 通信,到 服务器之间的跨节点通信,再到 WebSocket 客户端与跨服务器系统协同通信,完整了解整个分布式通信的工作流程与核心机制。每一阶段都紧扣实际场景,帮助你逐步建立起对 WebSocket 分布式架构的整体认知。
常见问题
- 常见问题
- 1. 如何实现客户端到服务器端再到逻辑服务器的消息转发与回调?
- 2. 服务器向客户端发送消息时支持回调吗?
- 3. WebSocket 连接断开后是否会自动退出房间?
- 4. 如何动态加入Redis节点?
- 5. 如何新增服务器节点?
- 6. 每个服务器节点的 Redis 配置必须一致吗?
- 7. 如何配置 Redis 发布节点选择策略?
- 8. Redis 节点配置多少个合适?
- 9. WebSocketCrossServerAdapter 何时应该启用 Redis 数据压缩功能?
- 10. WebSocket 服务器端到客户端的数据传输支持压缩吗?
- 11. 房间的命名空间该如何设计?
- 12. 在分布式 WebSocket 服务中,如何获取房间或者玩家相关的信息?
- 13. 在分布式 WebSocket 服务中,如何把用户分配到不同的 WebSocket 服务器?
- 14. WebSocketConnector 客户端断线后不会自动重连吗?
- 15. 前端环境如何使用 WebSocketConnector 类?
- 16. WebSocketConnector 客户端只能使用 URL 方式传递参数吗?
- 17. 如何安全且兼容地传递认证及其他敏感信息?
- 18. 为什么 WebSocket 还需要心跳机制?是不是有 close 事件就够了?
- 19. Node.js 服务该如何部署?有没有推荐的方式?
联系方式
如果你在使用过程中有任何问题或建议,欢迎随时与我联系交流。 你也可以通过 GitHub 仓库的 Issues 反馈问题或提出建议。
为了防止邮件被误归类到垃圾邮件,请在邮件主题或正文前面加上 [WebSocketCrossServerAdapter]。 邮箱:[email protected]
许可证
本项目基于 MIT 协议开源,具体内容请查看 LICENSE 文件。
