npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@fastcar/rpc

v0.2.9

Published

rpc,socket集成

Readme

@fastcar/core框架下整合的rpc框架

快速安装

npm install @fastcar/rpc

框架说明

  • 参考koa框架,结合koa-compose的思想,制定了一套rpc基本的通信规则
  • 顶层主要是RpcServer和RpcClient两个类的封装,其中RpcSevrer整合成了一个组件,直接在入口处配置即可
  • 消息通信采用长连接,一般按规定抽象成SocketServer和SocketClient,再由底层进行实现

框架类型支持

  • 从第三方依赖库上来说支持socket.io,mqtt,ws和grpc
  • 从长连接通讯协议来说支持ws,wss,mqtt,mqtts,grpc等

第三方依赖包安装

  • 使用socket.io时,安装 socket.io 和 socket.io-client

  • 使用mqtt时,安装 aedes 和 mqtt,如果要支持ws协议还需要装websocket-stream

  • 使用ws时,安装 ws

  • 使用grpc时,安装@grpc/proto-loader和@grpc/grpc-js

消息通信配置说明

//服务端配置
type RpcConfig = {
 list: SocketServerConfig[];
 retry: Required<RetryConfig>;
 limit: {
  //限流策略
  open: boolean; //是否开启 默认关闭
  pendingMaxSize: number; //服务端并发最大请求数
  pendingSessionMaxSize: number; //服务端单个会话的最大请求数
 };
 slowRPCInterval: number; //监控rpc的处理请求是否缓慢 默认为500ms
};

//客户端配置
type RpcClientConfig = {
 retryCount: number; //错误重试次数 默认三次
 retryInterval: number; //重试间隔 默认一秒
 maxMsgNum: number; //最大并发数
 timeout: number; //超时时间
} & SocketClientConfig;

type RetryConfig = {
 retryCount?: number;
 retryInterval?: number;
 timeout?: number; //超时时间
 maxMsgNum?: number;
 increase?: boolean; //是否按照等差递增 按照 1 2 3秒进行重发
};
//服务端连接配置
 type SocketServerConfig = {
 id: string; //编号名称
 type: SocketEnum; //具体为哪一种型号的连接器
 server: ServerConfig;
 extra?: any; //第三方拓展配置 用于灵活的调用第三方
 serviceType: string; //服务器用途类型 用于表名是何种服务器
 encode?: EncodeMsg; //编码解码
 decode?: DecodeMsg;
 codeProtocol?: CodeProtocolEnum; //约定协议 json protobuf
 secure?: SecureClientOptions;
 maxConnections?: number; //最大连接数 默认1024
 timeout?: number; //空闲超时时间 针对ws和grpc有效
} & { [key: string]: any };

//长连接配置
 type SocketClientConfig = {
 url: string; //连接地址
 type: SocketEnum; //具体为哪一种型号的连接器
 extra?: any; //第三方拓展参数
 encode?: EncodeMsg; //解码器
 decode?: DecodeMsg;
 disconnectInterval?: number;
 secure?: SecureClientOptions;
} & { [key: string]: any };
application:
    name: "simple"
settings:
    rpc:
        list:
            - {
                  id: "rpc-server-1",
                  type: "socket.io",
                  server: { port: 1235 },
                  extra: {},
                  serviceType: "rpc",
              }
            - {
                  id: "rpc-server-2",
                  type: "mqtt",
                  server: { port: 1236, protocol: "net" },
                  extra: {},
                  serviceType: "rpc", #如果是ws则协议连接为http
              }
            - {
                  id: "rpc-server-3",
                  type: "socket.io",
                  server: { port: 1237 },
                  extra: {},
                  serviceType: "rpc",
                  secure: { username: "user", password: "123456" }, #连接前进行校验
              }
            - {
                  id: "rpc-server-4",
                  type: "ws",
                  server: { port: 1238 },
                  serviceType: "rpc",
                  secure: { username: "user", password: "123456" }, #连接前进行校验
              }
            - {
                  id: "rpc-server-5",
                  type: "mqtt",
                  server:
                      {
                          port: 1239,
                          protocol: "https",
                          ssl:
                              {
                                  key: "./ssl/localhost-key.pem",
                                  cert: "./ssl/localhost-cert.crt",
                              },
                      },
                  serviceType: "rpc",
              }
            - {
              id: "rpc-server-6",
              type: grpc,
              server: {
                port: 1240,
                ssl: {
                  ca: "./cert/ca.crt",
                  key: "./cert/server.key",
                  cert: "./cert/server.crt",
                },
              },
              serviceType: "rpc",
              codeProtocol: "protobuf",
              extra: {
                checkClientCertificate: true,
              },
            }

消息使用示例说明

//客户端测试
class NotifyHandle implements RpcAsyncService {
 async handleMsg(url: string, data: Object): Promise<void | Object> {
  console.log("收到服务端消息", url, data);
  return {
   url,
   data: "来自客户端的消息---",
  };
 }
}

@Application
@BasePath(__dirname)
@BaseFilePath(__filename)
@EnableRPC
class APP {
 app!: FastCarApplication;
}
const appInstance = new APP();

describe("rpc交互测试", () => {
 it("服务端和客户端交互", async () => {
  let client1 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    // url: "mqtt://localhost:1236",
    // type: SocketEnum.MQTT,
    url: "ws://localhost:1238",
    type: SocketEnum.WS,
    secure: { username: "user", password: "123456" },
   },
   new NotifyHandle()
  );
  await client1.start();
  let result = await client1.request("/hello");
  console.log("普通调用", result);
  let result2 = await client1.request("/head/hello");
  console.log("追加了head的url", result2);
  let sessionId = client1.getSessionId();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  let result3 = await server.request(sessionId, "/server/test", "发送至客户端");
  console.log("服务端收到客户端应答", result3);
  let result4 = await client1.request("/asynchello");
  console.log("普通调用", result4);
 });

 it("客户端主动断开连接", async () => {
  let client2 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
   },
   new NotifyHandle()
  );
  await client2.start();
  client2.stop("客户端主动关闭");
 });
 it("服务端主动断开客户端连接", async () => {
  let client3 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
   },
   new NotifyHandle()
  );
  await client3.start();
  let sessionId = client3.getSessionId();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  server.kickSessionId(sessionId, "服务端强制客户端下线");
 });


 it("服务端断线重连", async () => {
  let client4 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
   },
   new NotifyHandle()
  );
  await client4.start();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  await server.stop();
  setTimeout(() => {
   server.start();
  }, 2000);
  // setTimeout(async () => {
  //  try {
  //   let result = await client4.request("/hello");
  //   console.log(result);
  //  } catch (e) {
  //   console.log(e);
  //  }
  // }, 2000);
 });

 it("连接认证测试", async () => {
  let client1 = new RpcClient(
   {
    url: "ws://localhost:1237",
    type: SocketEnum.SocketIO,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
   },
   new NotifyHandle()
  );
  let connect1 = await client1.start();
  console.assert(connect1 == true);
  if (!connect1) {
   client1.close();
  }
  let client2 = new RpcClient(
   {
    url: "ws://localhost:1237",
    type: SocketEnum.SocketIO,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
    secure: {
     username: "user",
     password: "123456",
    },
   },
   new NotifyHandle()
  );
  let connect2 = await client2.start();
  console.assert(connect2 == true);
 });

 it("ssl 测试", async () => {
  let client1 = new RpcClient(
   {
    url: "wss://localhost:1239",
    type: SocketEnum.MQTT,
    extra: {
     rejectUnauthorized: false, //当没有证书时这边设置为忽略
    },
   },
   new NotifyHandle()
  );
  await client1.start();
  let result = await client1.request("/hello");
  console.log("普通调用", result);
 });
});
 //优雅的调用请求
  let result = await ClientRequestStatic<HelloPBRequest, HelloPBReply>({
   url: "/pbhello",
   data: {
    message: "来自客户端的pb调用",
   },
   client: client1,
  });

常用功能集成说明

  • 会话的连接告知默认路由 /connect

  • 会话的离线默认路由 /disconnect

//示例
@Controller
export default class HelloController {
 @Autowired
 private rpcServer!: RpcServer;

 @RPCMethod()
 connect(session: ClientSession) {
  console.log("connect-----", session.sessionId);
  // //也可以这这里做一些权限校验的工作 如果不对则直接ko
  // this.rpcServer.kickSessionId(session.sessionId, "强制下线");
  return {
   code: 200,
   data: "socket is connect",
  };
 }

 @RPCMethod()
 disconnect({ session, reason }: DisconnectType) {
  console.log("disconnect-----", session.sessionId, reason);
  return {
   code: 200,
  };
 }
}
  • 连接前进行验证
//连接前的验证语法 实现接口
@RPCAuth
class Auth implements RpcAuthService {
 async auth(username: string, password: string, config: SocketServerConfig): Promise<boolean> {
  return config.secure?.username == username && config.secure.password == password;
 }
}
  • 消息重试次数和超时交由客户端进行管理,只需在配置中声明即可

  • 业务逻辑为统一的session 具有唯一的sessionId

//客户端会话值
type ClientSession = {
 sessionId: string;
 serverId: string;
 connectedTime: number; //连接的开始时间
 settings: Map<string | symbol, any>; //自定义设置项
};
  • 提供给客户端路由
@Controller
export default class HelloController {
  @RPCMethod() 
  hello() {
    return {
    code: 200,
    data: "我是一个快乐的rpc",
    };
  }

  @RPCMethod()
  async asynchello() { // 访问路径 /asynchello
    return new Promise((resolve) => {
    setTimeout(() => {
      resolve({
      code: 200,
      data: "这是一个异步rpc",
      });
    }, 200);
    });
  }
}

@Controller
@RPC("/head")
export default class HeadController {
  @RPCMethod()
  hello() {
    return {
    code: 200,
    data: "追加了头的url",
    };
  }
}
  • 编码默认协议支持json和protobuff。grpc仅支持pb协议
 it("protobuff格式传输示例", async () => {
  let client1 = new RpcClient(
   {
    url: "local.dev.com:1240",
    type: SocketEnum.Grpc,
    codeProtocol: CodeProtocolEnum.PROTOBUF,
    ssl: {
     ca: path.join(__dirname, "../resource/cert/ca.crt"),
     key: path.join(__dirname, "../resource/cert/client.key"),
     cert: path.join(__dirname, "../resource/cert/client.crt"),
    },
    extra: {
     options: {
      "grpc.ssl_target_name_override": "example",
      "grpc.default_authority": "example",
     },
    },
   },
   new NotifyHandle(),
   {
    retryCount: 0,
   }
  );

  client1.addProtoBuf({
   root: {
    protoPath: path.join(__dirname, "../../demo.proto"), //这边为绝对路径依赖
    service: "HelloPBController", //服务
   },
   // list: [
   //  {
   //   method: "pbhello",
   //   url: "/pbhello",
   //  },
   // ],
  });
  await client1.start();
  //优雅的调用请求
  let result = await ClientRequestStatic<HelloPBRequest, HelloPBReply>({
   url: "/pbhello",
   data: {
    message: "来自客户端的pb调用",
   },
   client: client1,
  });
  console.log("返回数据", result.data);
 });

注解说明

  • EnableRPC 开启rpc服务

  • RPC 路由头新增

  • RPCMethod 绑定路由方法

  • RPCMiddleware 增加链路的中间件

  • RPCAuth 强化初始连接是进行用户名和密码的拓展校验

  • RPCError 封装原有的响应和捕捉错误

更多用法

参考项目git地址 @fastcar/rpc/test 下的example内

项目开源地址