npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@flinbein/stateful-rpc

v0.3.2

Published

A lightweight TypeScript library for type-safe Remote Procedure Calls (RPC) with built-in state management. This library facilitates communication between client and server with a focus on maintaining state synchronization.

Downloads

43

Readme

@flinbein/stateful-rpc

A lightweight TypeScript library for type-safe Remote Procedure Calls (RPC) with built-in state management. This library facilitates communication between client and server with a focus on maintaining state synchronization.

Features:

  • Type-safe RPC communication
  • State synchronization
  • Event-based messaging system
  • Nested channels support
  • Promise-based API
  • Based on Proxy for dynamic method handling
  • Works with WebSockets, MessagePorts, WebRTC and more

Table of Contents

Installation

NPM

Run the following command to install the package:

npm install @flinbein/stateful-rpc

CDN

You can use the library directly in the browser via CDN:

<script type="module">
  import { RPCSource, RPCChannel } from 'https://cdn.jsdelivr.net/npm/@flinbein/stateful-rpc/+esm';
</script>

Basic Usage

The library consists of two main components:

  • RPCSource: The server-side component that defines methods and manages state
  • RPCChannel: The client-side component that communicates with the RPCSource

Basic example using WebSocket:

Server Side:

import RPCSource from '@flinbein/stateful-rpc/source';
import ws from 'ws';

// Create an RPC source with methods
export const calculator = new RPCSource({
  add: (a, b) => a + b,
  subtract: (a, b) => a - b,
  multiply: (a, b) => a * b,
  divide: (a, b) => {
    if (b === 0) throw "Division by zero";
    return a / b;
  }
});

// Create a connection handler for WebSocket
function createConnection(socket) {
  return (onMessage, onClose) => {
    // Handle incoming messages from the WebSocket
    socket.on("message", (data) => {
      onMessage(...JSON.parse(data.toString()));
    });
  
    // Handle WebSocket closure
    socket.on("close", () => onClose("Client disconnected"));

    // Return a function to send messages back to the client
    return (...args) => socket.send(JSON.stringify(args));
  };
}

// Create a WebSocket server
const wss = new ws.Server({ port: 8080 });

// Handle WebSocket connections
wss.on('connection', (socket) => {
  // Start the RPC service for this connection
  RPCSource.start(calculator, createConnection(socket));
});

Client Side:

import RPCChannel from '@flinbein/stateful-rpc/channel';

// Import the type of the server-side RPCSource
import type { calculator } from './server'; 

// Create a connection handler for WebSocket
function createConnection(socket: WebSocket){
  return (onMessage, onClose) => {
    socket.onmessage = (event) => {
      onMessage(...JSON.parse(event.data));
    };
    
    socket.onclose = () => onClose('Connection closed');
    
    return (...args) => socket.send(JSON.stringify(args));
  };
}

// Connect to WebSocket server
const socket = new WebSocket('ws://localhost:8080');

// wait for the WebSocket to open before using the RPC channel
await new Promise(resolve => socket.onopen = resolve);

// Create RPC channel
const channel = new RPCChannel<typeof calculator>(createConnection(socket));

const sum = await channel.add(5, 3);
console.log('5 + 3 =', sum); // 8

const product = await channel.multiply(4, 7);
console.log('4 * 7 =', product); // 28

try {
  await channel.divide(10, 0);
} catch (e) {
  console.error('Error:', e); // "Division by zero"
}

channel.close();

Channel Lifecycle

An RPCChannel goes through three states: pending, ready, and closed.

flowchart TD
  initialize(( )) -- new --> pending
  pending --> |"on('error'), on('close')"| closed
  pending --> |"on('ready'), on('state')"| ready
  ready --->|"on('state')"| ready
  ready -->|"on('close')"| closed

Pending

The channel is being established.

  • channel.ready is false.
  • channel.closed is false.
  • channel.promise is pending.
  • channel.state is undefined (not available yet).
  • You can use remote methods and nest channels. But they can throw exception if the channel is closed.

Ready

The channel is established and ready for communication.

  • channel.ready is true.
  • channel.closed is false.
  • channel.promise is resolved with the channel instance.
  • channel.state contains the current state from the server.
  • You can use remote methods and nest channels.

Closed

The channel is closed and cannot be used anymore.

  • channel.ready is false.
  • channel.closed is true.
  • channel.promise is resolved or rejected with the close reason.
  • channel.state contains the last known state from the server.
  • You cannot use remote methods or nest channels.

Built-in Events

Channels have lifecycle events that you can listen to:

const channel = new RPCChannel(messageHandler);

// Listen for channel ready event
channel.on('ready', () => {
  console.log('Channel is ready!');
  console.assert(channel.ready === true);
});

// Listen for channel close event
channel.on('close', (reason) => {
  console.log('Channel closed with reason:', reason);
  console.assert(channel.closed === true);
});

// Listen for channel error event
channel.on('error', (error) => {
  console.error('Channel error:', error);
});

// Wait for channel to be ready
try {
  await channel.promise;
  console.log('Channel is ready!');
} catch (error) {
  console.error('Failed to establish channel:', error);
}

Calling Remote Methods

You can define methods on the server side by passing an object to the RPCSource constructor.

These methods should return:

Server side:

export const mathService = new RPCSource({
  ping: () => "pong",
  logToServer: (...message) => console.log("Message from client:", ...message),
  math: { // you can nest methods
    square: (x) => x * x,
  },
});

RPCSource.start(mathService, connection);

Client side:

const channel = new RPCChannel<typeof mathService>(connection);
// call method
console.log(await channel.ping()); // "pong"
// call nested method
console.log(await channel.math.square(5)); // 25
// call method without waiting for a response
channel.logToServer.notify("hello from client"); // void

Working with State

The RPCSource maintains a state that clients can access.
When the state changes, all connected clients are notified.

Server Side:

export const counter = new RPCSource({
  increment: () => {
    // Use counter.setState to update the state
    counter.setState((state) => state + 1);
    return counter.state;
  },
  decrement: () => {
    counter.setState((state) => state - 1);
    return counter.state;
  }
}, 0); // Initial state is 0
// on socket connection
RPCSource.start(counter, connection);

Client Side:

const channel = new RPCChannel<typeof counter>(connection);

// Wait for the channel to receive the initial state
await channel.promise;

// Get the current state
console.log('Current count:', channel.state); // 0

// Listen for state changes
channel.on('state', (newState, oldState) => {
  console.log(`Counter changed from ${oldState} to ${newState}`);
});

// Increment the counter
console.log('New count:', await channel.increment()); // 1
console.log('State is synchronized:', channel.state === 1); // true

Nested Channels

RPCSource allows you to create nested channels for more complex applications.
You can create a method that returns another RPCSource. When the client calls this method as constructor, a new channel is created.

Server Side:

const userSource = new RPCSource({
  updateName: (name) => {
    userSource.setState({ ...userSource.state, name });
    return userSource.state;
  }
}, { name: "Guest", email: "" });

// Create a main source that provides access to the inner source
export const mainService = new RPCSource({
  User: () => userSource
});

// Start the RPC service with the main source
RPCSource.start(mainService, connection);

Client Side:

// Create the main channel
const mainChannel = new RPCChannel<typeof mainService>(connection);

// Create a nested channel
const userChannel = new mainChannel.User();

// Wait until the channel receives the initial state
await userChannel.promise; 

// Access the nested channel's state
console.log('User:', userChannel.state); // { name: "Guest", email: "" }

// Update user information
await userChannel.updateName("John Doe");
console.log('Updated user:', userChannel.state); // { name: "John Doe", email: "" }

Another way to define nested channels is to use class-based approach:

class User extends RPCSource.with("$", { name: "Guest", email: "" }) {
  $updateName(name) {
    this.setState({ ...this.state, name });
    return this.state;
  }
};

export const mainService = new RPCSource({
  User: User // use the class as constructor of nested channel
});

// Start the RPC service with the main source
RPCSource.start(mainService, connection);

The difference is that in this case new mainChannel.User() will create a new instance of User class for each call.

3rd way is to use RPCSource.Channel to create nested channels manually:

const userSource = new RPCSource({
	updateName: (name) => {
		userSource.setState({...userSource.state, name});
		return userSource.state;
	}
}, {name: "Guest", email: ""});

// Create a main source that provides access to the inner source
export const mainService = new RPCSource({
	User: () => {
		const channel = new RPCSource.Channel(userSource);
		setTimeout(() => channel.close(), 5000); // auto close after 5 seconds
		return channel;
  }
});

Custom Events

RPCSource provides a powerful event system that allows the server to emit events to connected clients.

Server Side:

import RPCSource from '@flinbein/stateful-rpc/source';

// Define the event types using TypeScript
export const notificationService = new RPCSource({}).withEventTypes<{
  alive: [aliveSignal: boolean],
  alert: { // you can use nested events
    warning: [message: string, level: number],
    error: [code: string, message: string]
  }
}>();

// Start the RPC service with the main source
RPCSource.start(notificationService, connection);

// Later, emit events to all connected clients
notificationService.emit("alive", true);
notificationService.emit(["alert", "warning"], "System maintenance soon", 2);
notificationService.emit(["alert", "error"], "ERR_001", "Connection lost");

Client Side:

const channel = new RPCChannel<typeof notificationService>(connection);

// Listen for custom events
channel.on("alive", (aliveSignal) => {
  console.log(`notificationService is alive: ${aliveSignal}`);
});

// Listen for nested events using dot notation
channel.alert.on("warning", (message, level) => {
  console.log(`Warning (level ${level}): ${message}`);
});

// Or use array path for nested events
channel.on(["alert", "error"], (code, message) => {
  console.log(`Error ${code}: ${message}`);
});

You cannot use reserved event names as string: state, close, error, ready.
But you can use them as an array path:

// server
source.emit(["state"], "custom state event", 123);
// client
channel.on(['state'], (...eventData) => {
  // this is a custom event named "state", not the built-in state event.
  console.log(...eventData); // "custom state event" 123
});

Class-Based RPCSource

The constructor of RPCSource can accept a string (prefix) instead of an object with methods.
In this case, all methods of the object whose names start with the specified prefix will be used as remote methods.

const myService = new RPCSource("$");
// by default, `source` does not contain methods starting with `$`
// but you can add them later:
myService.$ping = () => "pong"; // add the "ping" method

you can extend class RPCSource to define methods:

class MyService extends RPCSource<"$", string> {
  constructor(initialState) {
    super("$", initialState);
  }
  $ping() {
    return "pong";
  }
}
const myService = new MyService("default state");

You can use the static method RPCSource.with(prefix, state?). This method returns a class inherited from RPCSource, and the specified parameters will be automatically passed to its constructor.

// Server-side
class Calculator extends RPCSource.with("$") { // "$" is bound to the constructor
  // Prefix is automatically recognized
  $multiply(this: undefined, a: number, b: number) {
    return a * b;
  }
  
  $divide(this: undefined, a: number, b: number) {
    if (b === 0) throw new Error("Division by zero");
    return a / b;
  }
}

RPCSource.start(new Calculator(), connection);

// Client-side
const calculator = new RPCChannel<Calculator>(connection);
const result = await calculator.multiply(5, 3); // Calls $multiply on the server

Context in RPC Methods

You can provide a context object when starting the RPC service:

// Server side
const users = new Map();

const userService = new RPCSource({
  // 'this.context' will be the context object (socket)
  setName: function(this: RPCSource, name: string) {
    users.set(this.context, name);
    return true;
  },
  getName: function(this: RPCSource) {
    return users.get(this.context);
  }
});


wss.on('connection', (socket) => {
  // Provide the socket as context
  RPCSource.start(userService, createConnection(socket), {context: socket});
});

// You cannot access the channel from outside the method
userService.context; // throws error

Normalization

Method RPCSource.normalize(normalizer, originFn) allows you to wrap methods with a normalizer function.

Parameters

  • normalizer: it takes an array of arguments and returns:
    • false: normalization fails
    • true: normalization passes, original arguments are used
    • any[]: normalization passes, new args will be used as arguments for the original function
  • originFn: the original function to be wrapped.

Returns

A new function that will be called if normalization passes.

Examples:


// assert all arguments are numbers
function assertAllAreNumbers(params: unknown[]): params is number[] {
  return params.every(arg => typeof arg === "number"); // returns boolean
}
// convert all arguments to numbers
function convertAllToNumbers(params: unknown[]): number[] {
  return params.map(arg => Number(arg)); // returns mapped values
}

const calculator = new RPCSource({
  add: RPCSource.normalize(assertAllAreNumbers, function(...args) {
    return args.reduce((a, b) => a + b, 0);
  }),
  multiply: RPCSource.normalize(convertAllToNumbers, function(...args) {
    return args.reduce((a, b) => a * b, 1);
  })
});

////////// client //////////
await channel.add(1, 2, 3); // returns 6
await channel.multiply(5, "5"); // returns 25
await channel.add(1, "2", 3); // throws normalization error

You can use libraries like Zod to normalize method arguments:

import z as * from "zod";

const normalizeSize = z.tuple([
  z.number().int().min(0), // first argument must be a non-negative integer
  z.literal(["px", "em", "%"]).optional() // second argument must be one of the specified strings or undefined
]).parse;

const sizeStore = new RPCSource({
  setSize: RPCSource.normalize(normalizeSize, function(size, units = "px") {
    // typeof size is inferred as number
    // typeof units is inferred as "px" | "em" | "%"
    return this.setState(`${size} ${units}`);
  })
}, "0px");

//////////  client side //////////
await channel.setSize(100) // sets state to "100 px"
await channel.setSize(-5, "em") // throws normalization error by Zod

You can also normalize a context object:

function normalizeContext(this: RPCSource, args: any[]) {
  if (!this.context.url.includes("?admin")) throw new Error("wrong context");
  return true;
}
const rpc = new RPCSource({
  echo: RPCSource.normalize(normalizeContext, function(value) {
    return "Echo: " + value;
  }),
});
//////////  client side //////////
await channel.echo("i am admin") // throws "wrong context" error if context.url does not include "?admin"

API Reference

class RPCSource

The server-side component that handles remote procedure calls.

import RPCSource from "@flinbein/stateful-rpc/source";

Constructor

new RPCSource(methods, initialState = undefined)

| Constructor Parameter | Type | Description | |-----------------------|---------------------|------------------------| | methods | object with methods | methods | | initialState | <T> | Optional initial state |

Methods

| Method | Description | |-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------| | setState(newState) | Updates the state and notifies all clients. You can pass a value or a function that receives the current state and returns the new state. | | emit(event, ...args)| Emits an event to all connected clients | | dispose(reason?) | Closes all connections and disposes the source | | withState<S>(state?)| Sets a new state type and optionally a new state value | | withEventTypes<E>() | Sets a new events type |

Static Methods

| Method | Description | |------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| | start(rpcSource, connection, options?) | Starts the RPC service with the given source. Returns a function to close all channels. | | with(methods_or_prefix, state?) | Binds parameters to RPCSource constructor. Returns a class that extends RPCSource. Used for class-based RPCSource. | | normalize(normalizer, originFn) | Wraps method with a normalizer function. Returns a new function that will be called if normalization passes. See normalization |

Properties

| Property | Type | Description | |---------------------|----------------------------------------------|--------------------------------------------------------------------------------------------------| | readonly state | <T> | The current state | | readonly disposed | boolean | Whether the source is disposed | | readonly channel | RPCSource.Channel | Current channel. Available remote methods via this.channel | | readonly context | any | The context object provided when starting the RPCSource. Available in methods via this.context |

Static Properties

| Property | Type | Description | |--------------------|----------------------------------------------|--------------------------------------------------------------------------------| | readonly channel | RPCSource.Channel | Current channel.Available in remote constructors via new.target.channel | | readonly context | any | Current context.Available in remote constructors via new.target.context |

class RPCChannel

The client-side component that communicates with an RPCSource.

Constructor

new RPCChannel<S>(connectionHandler, options?)

| Constructor Parameters | Type | Description | |------------------------------|-----------------------|--------------------------------------------------------------------------------------------| | <S> | extends RPCSource | Type of the source | | connectionHandler | Function | A function that handles message sending and receiving | | options? | object | Configuration options | | options.getNextChannelId? | Function | Function to generate unique channel IDs | | options.connectionTimeout? | number\|AbortSignal | Timeout for establishing the connection. Channel will be closed if not ready in time. |

You can specify a generic type <S> that corresponds to the RPCSource being used. In this case, TypeScript will automatically add the corresponding methods and constructors to the channel, and will also allow the use of typed events.

You can also specify types manually in this format:

new RPCChannel<
  methods: { /*...*/ },
  events: {/*...*/},
  state: /*...*/
>(connectionHandler, options?)

If you need an RPCChannel on the server side, you can create it directly from an existing RPCSource:

new RPCChannel(rpcSource, options?)

Properties

| Property | Type | Description | |-----------|-----------------|---------------------------------------------------| | state | <T> | The current state from the server | | closed | boolean | Whether the channel is closed | | ready | boolean | Whether the channel is ready | | promise | Promise<this> | A promise that resolves when the channel is ready | | then | undefined | Reserved |

Methods

| Method | Description | |------------------------|---------------------------------------------------------------------------| | close(reason?) | Closes the channel | | on(event, handler) | Adds an event listener | | once(event, handler) | Adds a one-time event listener | | off(event, handler) | Removes an event listener | | *(...args) | Proxy-based methods to call remote methods | | *.notify(...args) | Proxy-based methods to call remote methods without waiting for a response |

Events

| Event | Parameters | Description | |-----------|---------------------------|---------------------------------------------------------------------------------------------| | "close" | [reason] | Emitted when the channel is closed. Handler receives the close reason. | | "error" | [reason] | Emitted when an error occurs. Handler receives the error object. | | "ready" | [ ] | Emitted when the channel is ready. | | "state" | [newState, oldState?] | Emitted when the state changes. Handler receives the new state and old state (if available) | | * | [...*] | Custom events defined in the RPCSource. Handler receives event arguments. | | [...*] | [...*] | Nested custom events. Handler receives event arguments. |

class RPCSource.Channel

The remote channel associated with a client's RPCChannel.

Properties

| Property | Type | Description | |-----------|-----------------|----------------------------------------------------------| | closed | boolean | Whether the remote channel is closed | | ready | boolean | Whether the remote channel is established | | source | RPCSource | The associated RPCSource | | context | any | The context object provided when starting the RPCSource | | promise | Promise<this> | A promise that resolves when the remote channel is ready |

Methods

| Method | Description | |------------------------|-----------------------------------------------| | close(reason?) | Closes the remote channel | | on(event, handler) | Adds an event listener | | once(event, handler) | Adds a one-time event listener | | off(event, handler) | Removes an event listener | | emit(event, ...args) | Emits an event to associated client's channel |

Events

| Event | Parameters | Description | |-----------|---------------------------|------------------------------------------------------------------------| | "close" | [reason] | Emitted when the channel is closed. Handler receives the close reason. | | "error" | [reason] | Emitted when an error occurs. Handler receives the error object. | | "ready" | [ ] | Emitted when the channel is ready. |

Interaction Diagram

////////// server //////////
const red = new RPCSource({/*...*/});
const blue = new RPCSource({/*...*/});

const wss = new WebSocket.Server({/*...*/});
wss.on("connection", (ws) => {
  RPCSource.start(red, createConnection(ws, {prefix: "red"}));
  RPCSource.start(blue, createConnection(ws, {prefix: "blue"}));
})

////////// client 1 //////////
const ws1 = new WebSocket("ws://...");
// Connect to "Red" and "Blue" services
const channelRed = new RPCChannel<typeof red>(createConnection(ws1, {prefix: "red"}));
const channelBlue = new RPCChannel<typeof blue>(createConnection(ws1, {prefix: "blue"}));

////////// client 2 //////////
const ws2 = new WebSocket("ws://...");
// Connect to "Blue" service twice
const channelBlue1 = new RPCChannel<typeof red>(createConnection(ws2, {prefix: "blue"}));
const channelBlue2 = new RPCChannel<typeof blue>(createConnection(ws2, {prefix: "blue"}));

////////// shared //////////
function createConnection(ws, {prefix}) {
  return (onMessage, onClose) => {
    ws.on("message", (data) => {
      const [prefixReceived, ...rest] = JSON.parse(data.toString());
      if (prefixReceived === prefix) onMessage(...rest);
    });
    ws.on("close", () => onClose("Connection closed"));
    return (...args) => ws.send(JSON.stringify([prefix, ...args]));
  };
}
flowchart TB
    subgraph c1["client 1"]
        C11["RPCChannel #lt;Red#gt;"]
        ws1((("Websocket")))
        C12["RPCChannel #lt;Blue#gt;"]
    end
    subgraph c2["client 2"]
        C21["RPCChannel #lt;Blue#gt;"]
        C22["RPCChannel #lt;Blue#gt;"]
        ws2((("Websocket")))
    end
    subgraph s1["server"]
        S1["new RPCSource #lt;Red#gt;"]
        S2["new RPCSource #lt;Blue#gt;"]
        wss1((("Websocket")))
        wss2((("Websocket")))
        start11[/"RPCSource.start(red)"/]
        start12[/"RPCSource.start(blue)"/]
        start21[/"RPCSource.start(blue)"/]
        S1C11["RPCSource.Channel #lt;Red#gt;"]
        S1C12["RPCSource.Channel #lt;Blue#gt;"]
        S1C21["RPCSource.Channel #lt;Blue#gt;"]
        S1C22["RPCSource.Channel #lt;Blue#gt;"]
    end
    ws1 == new RPSChannel(ws) ==> C11 & C12
    ws2 == new RPSChannel(ws) ==> C21 & C22
    wss1 ==> start12
    start11 ==> S1C11
    start21 ==> S1C21 & S1C22
    C11 <-. channel data .......-> S1C11
    C12 <-. channel data .......-> S1C12
    C21 <-. channel data .......-> S1C21
    C22 <-. channel data .......-> S1C22
    S2 ===> start12
    S2 ===> start21
    ws1 <-. communication ...-> wss1
    ws2 <-. communication ...-> wss2
    S1 ===> start11
    wss1 ==> start11
    wss2 ==> start21
    start12 ==> S1C12
    
    style C11 stroke:red
    style C12 stroke:blue
    style C21 stroke:blue
    style C22 stroke:blue
    style S1 stroke:red
    style S2 stroke:blue
    style S1C11 stroke:red
    style S1C12 stroke:blue
    style S1C21 stroke:blue
    style S1C22 stroke:blue
    
    linkStyle 8 stroke:red
    linkStyle 9 stroke:blue
    linkStyle 10 stroke:blue
    linkStyle 11 stroke:blue

Using with WebSocket

A simple example of using the library with a WebSocket server and client using the ws library.

Server:

import ws from "ws";
import { RPCSource } from "@flinbein/stateful-rpc";

// Create an RPC source with methods and initial state
export const rpcSource = new RPCSource({
    ping: () => "pong",
    echo: (msg) => msg,
});

function createConnection(socket) {
  return (onMessage, onClose) => {
    // Handle incoming messages from the WebSocket
    socket.on("message", (data) => {
      onMessage(...JSON.parse(data.toString()));
    });
  
    // Handle WebSocket closure
    socket.on("close", () => onClose("Client disconnected"));

    // Return a function to send messages back to the client
    return (...args) => {
      socket.send(JSON.stringify(args));
    };
  };
}

const wss = new ws.Server({ port: 8080 });

wss.on("connection", (socket) => {
  RPCSource.start(rpcSource, createConnection(socket), {context: socket});
});

Client:

import { RPCChannel } from "@flinbein/stateful-rpc";
import type { rpcSource } from "./backend";

function createConnection(socket: WebSocket) {
  return (onMessage, onClose) => {
    socket.onmessage = (event) => {
      onMessage(...JSON.parse(event.data));
    };
    socket.onclose = () => onClose("Connection closed");
    return (...args) => {
      socket.send(JSON.stringify(args));
    };
  };
}

// Connect to WebSocket server
const socket = new WebSocket("ws://localhost:8080");

// Wait for the WebSocket to open before using the RPC channel
await new Promise(resolve => socket.onopen = resolve);

// Create RPC channel
const rpc = new RPCChannel<typeof rpcSource>(createConnection(socket));

// Use the RPC channel
console.log(await rpc.ping()); // "pong"
console.log(await rpc.echo("Hello, World!")); // "Hello, World!"

You can implement message serialization in the createConnection function as you see fit. This example uses JSON, which limits the types of data that can be transmitted.

Using with SharedWorker

SharedWorker:

import RPCSource from "@flinbein/stateful-rpc/source";

const source = new RPCSource({
  broadcast: (eventName: string, ...args: any) => {
    source.emit(eventName, ...args);
  },
  setState: (value: any) => void source.setState(value)
});

function createConnection(port: MessagePort) {
  return (onMessage: Function, onClose: Function) => {
    port.addEventListener("message", (event) => {
      const [type, ...args] = event.data;
      if (type === "message") return onMessage(...args);
      if (type === "close") return onClose(args[0]);
    });
    port.start();
    return (...args: any[]) => port.postMessage(["message", ...args]);
  }
}

onconnect = (event: MessageEvent) => {
  const port = event.ports[0];
  RPCSource.start(source, createConnection(port), {context: port});
  port.start();
  port.postMessage(["ready"]);
}

export type { source };

Client:

import RPCChannel from "@flinbein/stateful-rpc/channel";
import type { source } from "./sharedWorker.js"

function createConnection(port: MessagePort) {
  return (onMessage: Function, onClose: Function) => {
    port.addEventListener("message", (event) => {
      const [type, ...args] = event.data;
      if (type === "message") return onMessage(...args);
      if (type === "close") return onClose(args[0]);
    });
    // cleanup connection on unload
    window.addEventListener("beforeunload", () => {
      port.postMessage(["close", "closed by unload"]);
    })
    return (...args: any[]) => port.postMessage(["message", ...args]);
  }
}

const sharedWorker = new SharedWorker("./sharedWorker.js", { type: "module" });

// wait for ready
await new Promise(resolve => {
  const abortMessageListenerCtrl = new AbortController();
  sharedWorker.port.addEventListener("message", (event: MessageEvent) => {
    if (event.data[0] !== "ready") return;
    abortMessageListenerCtrl.abort();
    resolve(true);
  }, {signal: abortMessageListenerCtrl.signal});
  sharedWorker.port.start();
})

const channel = new RPCChannel<typeof source>(createConnection(sharedWorker.port));

// work with channel in different tabs:
channel.on("chatMessage", (from, message) => {
  console.log(`[${from}]: ${message}`);
});
channel.broadcast.notify("chatMessage", "Me", "Hello!");
await channel.setState("new-shared-state");

sharedWorker.port.postMessage(["close", "closed by user"]);
sharedWorker.port.close();

Using with WebRTC DataChannel

Hosting side:

const rpcSource = new RPCSource({/*,,,*/});
const pc = new RTCPeerConnection(/*,,,*/);
const dataChannel = pc.createDataChannel("rpc", {negotiated: true, id: 0});

dataChannel.addEventListener("open", () => {
	
  RPCSource.start(rpcSource, (onMessage, onClose) => {
    dataChannel.addEventListener("message", ({data}) => onMessage(...JSON.parse(data)));
    dataChannel.addEventListener("close", onClose);
	
    return (...args) => dataChannel.send(JSON.stringify(args));
  }, {context: dataChannel});
  
});
// ... signaling code to establish the connection

Joining side:

const pc = new RTCPeerConnection(/*,,,*/);
const dataChannel = pc.createDataChannel("rpc", {negotiated: true, id: 0});

dataChannel.addEventListener("open", () => {

  const rpcChannel = new RPCChannel<typeof rpcSource>((onMessage, onClose) => {
    dataChannel.addEventListener("message", ({data}) => onMessage(...JSON.parse(data)));
    dataChannel.addEventListener("close", onClose);
    
    return (...args) => channel.send(JSON.stringify(args));
  });

  // use rpcChannel...
});
// ... signaling code to establish the connection

Using with Other Transports

You can use the library with any transport that can send and receive messages as arrays. You need to implement a wrapper function that takes send and close functions and returns a sendToRemote function.

new RPCChannel((onMessage, onClose) => {
  return sendToRemote;
});
  • You should call onMessage(...args) when receiving a message intended for the RPCChannel.
  • You should call onClose(reason?) when the connection is broken. In this case, the current channel and all its child channels will be closed with the specified reason.
  • You should return a sendToRemote(...args) function that will send messages to the server.

You can additionally specify a getNextChannelId function in the RPCChannel constructor options to generate channel IDs. They should be unique for each channel within a single connection. This will reduce the size of messages as channel IDs will be smaller.

let i = 0;
new RPCChannel((onMessage, onClose) => {/*...*/}, {
  getNextChannelId: () => i++
});

On the server side, the wrapper is created similarly:

RPCSource.start(rpcSource, (onMessage, onClose) => {
  return sendToRemote;
});

You can specify the maximum number of channels that can be created by the client. You can also specify a context that will be available in RPCSource methods via this.channel.context.

const closeAllChannels = RPCSource.start(rpcSource, (onMessage, onClose) => {/*...*/}, {
  maxChannelsPerClient: 1000, // default is Infinity
  context: socket
});

You can call closeAllChannels(reason?) to close all channels associated with this connection.

License

MIT