@xuzhiyang/syncvar
v1.1.7
Published
WaterBear SDK - WebSocket 变量同步、锁、RPC、事件订阅和 Worker 执行能力
Maintainers
Readme
WaterBear SDK - WebSocket 变量同步 SDK
WaterBear SDK 提供一套基于 WebSocket 的变量同步、锁、RPC、事件订阅和 Worker 执行能力,帮助快速搭建协同编辑、状态共享与后台任务管理场景。
特性
- 🔄 实时变量同步 - 自动检测变化并同步到所有客户端
- 🔒 分布式锁 - 防止并发写入冲突
- 📡 RPC 调用 - 远程过程调用,支持双向通信
- 📢 事件订阅 - 基于 SSE 的服务器推送事件
- 🚀 MessagePack - 高效二进制传输格式,减少 30-50% 数据量
- 🛠️ Worker 执行 - 远程代码执行能力
在线演示
访问 在线演示地址 查看 SDK 实时交互效果。
安装
npm install @xuzhiyang/syncvar快速开始
Node.js 环境
import { WebSocketClient, SyncVar } from '@xuzhiyang/syncvar';
// 创建 WebSocket 连接
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: 1, // MessagePack 格式(推荐)
onopen: () => console.log('已连接'),
});
// 创建 SyncVar 并绑定
const syncVar = new SyncVar();
await ws.bind(syncVar);
// 同步变量
const { position } = await syncVar.sync('position', {
reset: { position: { x: 0, y: 0 } },
});
// 监听变化
position.watch((value) => {
console.log('position 更新:', value);
});
// 修改变量(自动同步)
position.x = 10;
await syncVar.waitUpdated();
// RPC 调用
await syncVar.rpcBind(function rpc_add({ a, b }) {
return a + b;
});
const result = await syncVar.rpcCall('rpc_add', { a: 10, b: 20 });
console.log('RPC 结果:', result); // 30浏览器环境
方式 1:ESM 模块(推荐)
<script type="module">
import { WebSocketClient, SyncVar } from '@xuzhiyang/syncvar';
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: 1,
});
const syncVar = new SyncVar();
await ws.bind(syncVar);
const { counter } = await syncVar.sync('counter', {
reset: { counter: 0 },
});
counter.watch((value) => {
document.getElementById('counter').textContent = value;
});
counter.value++;
</script>方式 2:CDN 引入
<!-- 引入 UMD 版本 -->
<script src="https://unpkg.com/@xuzhiyang/syncvar/dist/syncvar.js"></script>
<script>
// SDK 挂载到全局变量 WaterBear
const { WebSocketClient, SyncVar } = window.WaterBear;
(async function() {
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: 1,
});
const syncVar = new SyncVar();
await ws.bind(syncVar);
const { counter } = await syncVar.sync('counter', {
reset: { counter: 0 },
});
counter.watch((value) => {
console.log('计数器:', value);
});
})();
</script>CommonJS 环境
const { WebSocketClient, SyncVar } = require('@xuzhiyang/syncvar');
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: 1,
});
const syncVar = new SyncVar();
await ws.bind(syncVar);
// 使用方式与 ESM 相同TypeScript 环境
SDK 内置完整的 TypeScript 类型定义,开箱即用:
import { WebSocketClient, SyncVar, NetPackFormat } from '@xuzhiyang/syncvar';
interface Position {
x: number;
y: number;
}
// 完整的类型推断
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: NetPackFormat.MessagePack,
onopen: (ws: WebSocket, event: Event) => {
console.log('已连接');
},
});
const syncVar = new SyncVar({ delay: 100 });
await ws.bind(syncVar);
// 类型推断为 Position
const { position } = await syncVar.sync<Position>('position', {
reset: { position: { x: 0, y: 0 } },
});
// 类型安全
position.watch((value: Position) => {
console.log('position 更新:', value.x, value.y);
});
// RPC 调用也有类型检查
await syncVar.rpcBind(function rpc_add({ a, b }: { a: number; b: number }) {
return a + b;
});
const result: number = await syncVar.rpcCall('rpc_add', { a: 10, b: 20 });TypeScript 类型包含:
- 所有 API 方法的类型定义
- 完整的接口和枚举
- 详细的 JSDoc 文档(IDE 中自动显示)
- 浏览器全局类型(UMD 模式下
window.WaterBear)
使用场景示例
协同编辑器
// 多人实时编辑文档
const document = await syncVar.sync('document', {
reset: { document: { title: '', content: '' } },
});
document.watch((value) => {
// 更新编辑器内容
editor.setValue(value.content);
});
// 编辑器变化时同步
editor.on('change', () => {
document.content = editor.getValue();
});实时计数器
const { counter } = await syncVar.sync('counter', {
reset: { counter: 0 },
});
// 获取锁后安全修改
await counter.lock();
counter.value++;
// 锁在下一个 sync 时自动释放在线用户列表
const { users } = await syncVar.sync('users', {
reset: { users: [] },
});
users.watch((value) => {
updateUserListUI(value);
});1. API 文档
3. WebSocketClient
创建方式
const ws = await WebSocketClient(options);参数说明
| 参数 | 类型 | 描述 |
| --- | --- | --- |
| options.url | string | 可选:WebSocket 服务地址,默认 wss://varserver.popx.com/ws/syncvar。 |
| options.namespace | string | 可选:命名空间/房间,隔离不同业务。 |
| options.guid | string | 可选:客户端 GUID,用于恢复会话或追踪。 |
| options.packFormat | number | 可选:数据传输格式,0 为 JSON 格式,1 为 MessagePack 格式,默认 1(MessagePack)。 |
| options.onopen | (ws, event) => void | 可选:连接成功后的回调。 |
| options.onclose | (event) => void | 可选:连接关闭时的回调。 |
| options.onerror | (error) => void | 可选:连接或消息处理异常时的回调。 |
| options.timeout | number | 可选:连接超时时间(毫秒),默认 5000。 |
| options.maxReconnectCount | number | 可选:最大重连次数,默认 100。 |
使用步骤
- 创建连接:
await WebSocketClient(options)。 - 处理
namespace、guid、packFormat、onopen/onerror/onclose以保证可观测性。 - 绑定:
await ws.bind(new SyncVar())或await ws.bind(new Worker(...))。
数据传输格式
WebSocketClient 支持两种数据传输格式:
- JSON 格式 (
packFormat = 0):传统文本传输,向后兼容 - MessagePack 格式 (
packFormat = 1):高效二进制传输,默认推荐
MessagePack 优势:
- 相比 JSON 减少 30-50% 数据传输量
- 更快的编码解码速度
- 支持更多数据类型(如二进制数据)
- 自动与服务器协商最优格式
使用示例:
// 使用 MessagePack 格式(推荐)
const ws = await WebSocketClient({
namespace: 'room1',
packFormat: 1, // MessagePack 格式
});
// 或使用 JSON 格式(兼容旧版本)
const wsJson = await WebSocketClient({
namespace: 'room1',
packFormat: 0, // JSON 格式
});示例
// MessagePack 格式(推荐)
const ws = await WebSocketClient({
namespace: 'room1',
guid: 'client-123',
packFormat: 1, // 使用高效的 MessagePack 格式
onopen: () => console.log('connected'),
onerror: (err) => console.error('ws error', err),
});
await ws.bind(new SyncVar());
// JSON 格式(向后兼容)
const wsJson = await WebSocketClient({
namespace: 'room1',
guid: 'client-123',
packFormat: 0, // 使用传统 JSON 格式
onopen: () => console.log('connected'),
onerror: (err) => console.error('ws error', err),
});
await wsJson.bind(new SyncVar());4. SyncVar
创建与参数
const syncVar = new SyncVar({ delay?, promiseTimeout? });| 参数 | 类型 | 描述 |
| --- | --- | --- |
| delay | number | 可选:变更打包延迟(毫秒),合并频繁修改,默认 50。 |
| promiseTimeout | number | 可选:与服务端交互的 Promise 超时时间(毫秒),默认 10000。 |
核心用法
- 绑定
ws.bind(syncVar)。 await syncVar.sync(names, { reset?, store? })获取代理,names支持字符串或数组。- 修改代理上属性自动收集 diff 并延迟发送。
- 通过
proxy.lock({ timeout? })安全写入,proxy.unlock()释放。 await syncVar.waitUpdated()等待$set回执。proxy.watch(cb)注册更新回调。rpcBind/rpcCall实现远程函数调用。
示例
const syncVar = new SyncVar({ delay: 100 });
await ws.bind(syncVar);
const { position } = await syncVar.sync('position', {
reset: { position: { x: 0, y: 0 } },
});
position.watch((value) => console.log('position updated', value));
position.x += 10;
await syncVar.waitUpdated();await syncVar.rpcBind(function rpc_sum({ a, b }) {
return a + b;
});
const ret = await syncVar.rpcCall('rpc_sum', { a: 3, b: 5 });5. SSE(单向推送)
创建与参数
const sse = new SSE({ namespace? });| 参数 | 类型 | 描述 |
| --- | --- | --- |
| namespace | string | 可选:隔离订阅范围。 |
典型流程
await sse.watch({ vars: object|array, events: object|array }):vars支持对象或数组格式。- 对象:
{ varName: ({ value }) => { ... }, ... }(键名为变量名) - 数组:
[function varName({ value }) { ... }, 'var2', ...]
- 对象:
events支持对象或数组格式。- 对象:
{ eventName: ({ data }) => { ... }, ... } - 数组:
[function eventName({ data }) { ... }, 'event2', ...]
- 对象:
sse.get(name)/sse.getAll()读取快照。await sse.emit(eventName, data)发送事件。sse.close()断开连接。
示例
const sse = new SSE({ namespace: 'room1' });
await sse.watch({
vars: {
position: ({ value }) => console.log('position', value),
test: ({ value }) => console.log('test', value)
},
events: {
myevent: ({ data }) => console.log('event', JSON.parse(data))
}
});
await sse.emit('myevent', { hello: 'world' });6. Worker
创建与参数
const worker = new Worker({ onlog?, onerror?, onexit?, promiseTimeout? });| 参数 | 类型 | 描述 |
| --- | --- | --- |
| onlog | (args) => void | 接收服务端日志。 |
| onerror | (error) => void | 接收服务端错误。 |
| onexit | (exitInfo) => void | 接收 worker 退出事件。 |
| promiseTimeout | number | 操作超时时间,默认 10000。 |
使用方式
await ws.bind(worker)。await worker.run({ code, name, workerData })启动。kill/attach/lists管理实例。- 监听
onlog/onerror/onexit反馈。
示例
const worker = new Worker({
onlog: (args) => console.log('worker log', args),
});
await ws.bind(worker);
await worker.run({ code: 'console.log("hi")', name: 'task-1' });
const list = await worker.lists();7. 推荐流程
- 建立
WebSocketClient。 - 创建
SyncVar并绑定后调用sync()获取 proxy。 - 注册
watch、调用lock、使用waitUpdated()监控。 - 需要 RPC 时使用
rpcBind/rpcCall。 - 仅需订阅时使用
SSE。 - 后台任务交给
Worker执行。 - 连接关闭或异常时调用
syncVar.destroy()/sse.close()清理。
