streamx-rpc
v0.0.1
Published
A very simple JSON-based RPC protocol on top of [streamx](https://github.com/streamxorg/streamx) streams.
Downloads
34
Readme
streamx-rpc
A very simple JSON-based RPC protocol on top of streamx streams.
Similar to JSON-RPC 2.0, but with support for bidirectional streams.
Works over any duplex stream. Support for WebSockets (in the browser and in Node.js) is included directly.
Usage
On the server, with express:
const RPC = require('streamx-rpc/websocket')
const express = require('express')
const expressWS = require('express-ws')
const app = express()
app.use(expressWS())
app.ws('/rpc', (ws, req) => {
const rpc = new RPC(ws)
rpc.register('sum', (args) => {
return args.a + args.b
})
rpc.registerStream('transform', (stream, args) => {
stream.on('data', row => {
if (args.upper) stream.write(row.toUpperCase())
else stream.write(row.toLowerCase())
})
})
})
app.listen(9000)
In a Node.js client:
const RPC = require('streamx-rpc/websocket')
const rpc = new RPC('ws://localhost:9000')
const sum = await rpc.call('sum', { a: 3, b: 7 })
console.log(sum)
// 10
const stream = rpc.callStream('transform', { upper: true })
stream.write('hello')
stream.write('world')
stream.on('data', console.log)
// HELLO
// WORLD
In the browser:
<script type="module">
import RPC from './node_modules/streamx-rpc/dist/browser/websocket.js'
// or, with a bundler that speaks commonjs:
// import RPC from 'streamx-rpc/websocket.js'
const rpc = new RPC('ws://localhost:9000')
// same as above
</script>
API
const rpc = new RPC(opts)
Create a new RPC handler. opts
and their defaults are:
ndjson: true
: Set tofalse
to not add and expect line endings after each message. Only usable on a message-based transport (not on a stream with arbitrary chunks)forceAck: false
: Require thathelo
messages are exchanged before accepting requests. Iffalse
helo and ack messages are still sent but not required.
rpc
is a Duplex stream that can be piped into a transport stream:
const net = require('net')
const rpc = new RPC()
const stream = net.connnect('localhost:3000')
stream.pipe(rpc).pipe(stream)
rpc.register(method, handler)
Register a method. method
is any string. handler
is a function. It may be an async function or return a promise. The result of the resolved promise (or the return value if handler
is a sync function) is returned.
handler
is called with (args, env)
.
rpc.registerStream(method, handler)
handler
is called with (stream, args, env)
where stream
is a Duplex stream.
rpc.use(middleware)
Register a middleware function. The function will be called before a method is invoked. middleware
is called with ({ method, args, env })
. It may return a promise which is awaited, and can throw an error to abort the request and return the error instead. It can also modify or add values to the env
object. Modifying args
is possible but discouraged.
const result = await rpc.call(method, args, env)
Call a method on the remote end. args
and env
can be anything that JSON.stringify
accepts. Returns a promise that resolves to the returned value.
const stream = rpc.callStream(method, args, env)
Calls a streaming method on the remote end. Returns a Duplex stream.