@stdiobus/node
v1.1.0
Published
Native Node.js binding for stdio_bus - AI agent transport layer. No external binary required.
Downloads
411
Maintainers
Readme
@stdiobus/node
Native Node.js binding for stdio_bus - the AI agent transport layer.
Features
- No external binary required - Native addon includes libstdio_bus
- High performance - Direct C integration, no process spawning
- Cross-platform - Native on macOS/Linux, Docker on Windows
- Multiple transport modes - Embedded, TCP, or Unix socket
- Session-based routing - Automatic message routing to workers
- Worker lifecycle management - Automatic restart with backoff
Installation
npm install @stdiobus/nodePrebuilt binaries: macOS (x64, arm64), Linux (x64, arm64). Windows via Docker backend.
Quick Start
Embedded Mode (Default)
Messages are sent/received directly via the JavaScript API:
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configJson: {
pools: [{ id: 'echo', command: 'node', args: ['./echo-worker.js'], instances: 1 }]
}
});
await bus.start();
bus.send(JSON.stringify({
jsonrpc: '2.0',
id: '1',
method: 'echo',
params: { message: 'hello' }
}));
// poll() returns messages from workers
const messages = bus.poll(500);
console.log('Response:', JSON.parse(messages[0]));
await bus.stop();[stdio_bus] INFO: stdio_bus instance created
[stdio_bus] INFO: stdio_bus started with 1 workers (mode: embedded)
[echo-worker] Started, waiting for NDJSON messages on stdin...
Response: {"jsonrpc":"2.0","id":"1","result":{"echo":{"message":"hello"},"method":"echo","timestamp":"..."}}
[stdio_bus] INFO: Initiating graceful shutdown
[stdio_bus] INFO: All workers stoppedReal-World Usage (ACP Agent)
Full ACP protocol flow: initialize agent, create session, send prompt. Requires an ACP-compatible worker and appropriate credentials.
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configJson: {
pools: [{ id: 'acp-worker', command: 'node', args: ['./acp-worker.js'], instances: 1 }]
}
});
await bus.start();
// 1. Initialize agent
const init = await bus.request('initialize', {
protocolVersion: 1,
clientInfo: { name: 'my-app', version: '1.0.0' },
clientCapabilities: {}
}, { agentId: 'my-agent', timeout: 60000 });
console.log('Agent:', init.agentInfo);
// 2. Create session
const session = await bus.request('session/new', {
cwd: process.cwd(),
mcpServers: []
}, { agentId: 'my-agent' });
const sessionId = session.sessionId;
// 3. Send prompt
const result = await bus.request('session/prompt', {
sessionId,
prompt: [{ type: 'text', text: 'What is 2+2?' }]
}, { agentId: 'my-agent' });
console.log('Response:', result.text);
await bus.stop();TCP Mode
Accept external client connections over TCP:
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configJson: {
pools: [{ id: 'worker', command: 'node', args: ['./worker.js'], instances: 4 }]
},
listenMode: 'tcp',
tcpHost: '0.0.0.0',
tcpPort: 8080
});
await bus.start();
console.log('Listening on TCP port 8080');
console.log('Workers:', bus.getWorkerCount());
// Monitor connections
setInterval(() => {
console.log('Connected clients:', bus.getClientCount());
}, 5000);
// Clients can connect with: nc localhost 8080
// And send NDJSON messagesUnix Socket Mode
Accept connections via Unix domain socket:
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configJson: {
pools: [{ id: 'worker', command: 'node', args: ['./worker.js'], instances: 2 }]
},
listenMode: 'unix',
unixPath: '/tmp/stdiobus.sock'
});
await bus.start();
console.log('Listening on /tmp/stdiobus.sock');
// Clients can connect with: nc -U /tmp/stdiobus.sockConfiguration
Configuration is passed programmatically via configJson:
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configJson: {
pools: [
{
id: 'mcp-worker',
command: 'node',
args: ['./worker.js'],
instances: 4
}
],
limits: {
max_input_buffer: 1048576,
max_output_queue: 4194304,
max_restarts: 5,
restart_window_sec: 60
}
}
});File-based config is also supported for backward compatibility:
const bus = new StdioBus({ configPath: './config.json' });configJson and configPath are mutually exclusive.
API Reference
Constructor
new StdioBus(options)Options:
| Option | Type | Required | Default | Description |
|--------|------|----------|---------|-------------|
| configJson | object | * | - | Programmatic config (mutually exclusive with configPath) |
| configPath | string | * | - | Path to JSON config file (mutually exclusive with configJson) |
| listenMode | string | No | 'none' | Transport mode: 'none', 'tcp', or 'unix' (native only) |
| tcpHost | string | No | '127.0.0.1' | TCP bind address (native tcp mode) |
| tcpPort | number | Yes* | - | TCP port (required for native tcp mode) |
| unixPath | string | Yes | - | Unix socket path (*required for native unix mode) |
| logLevel | number | No | 1 | Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR (native only) |
Backend selection (backend option):
'auto'(default): Uses native on macOS/Linux, docker on Windows'native': Force native backend (fails on Windows)'docker': Force Docker backend (works everywhere)
Methods
bus.getBackendType()
Returns the backend being used: 'native' or 'docker'.
bus.onMessage(handler)
Register a message handler. Called for each message received from workers.
bus.onMessage((msg) => {
const data = JSON.parse(msg);
console.log('Received:', data);
});bus.start(): Promise<void>
Start the bus and spawn worker processes.
bus.stop(timeoutSec?: number): Promise<void>
Stop the bus gracefully. Workers receive SIGTERM and have timeoutSec seconds to exit (default: 30).
bus.send(message: string): boolean
Send a JSON-RPC message to workers. Returns true if queued successfully.
bus.request(method, params?, options?): Promise<any>
Send a request and wait for response with automatic correlation.
const result = await bus.request('tools/list', {}, {
timeout: 30000, // Timeout in ms (default: 30000)
sessionId: 'abc123' // Optional session ID for routing
});bus.getState(): number
Get current bus state:
| Constant | Value | Description |
|----------|-------|-------------|
| BusState.CREATED | 0 | Created but not started |
| BusState.STARTING | 1 | Workers being spawned |
| BusState.RUNNING | 2 | Running and accepting messages |
| BusState.STOPPING | 3 | Graceful shutdown in progress |
| BusState.STOPPED | 4 | Fully stopped |
bus.getStats(): object
Get statistics:
{
messagesIn: 100, // Messages sent to workers
messagesOut: 100, // Messages received from workers
bytesIn: 10240, // Total bytes sent
bytesOut: 20480, // Total bytes received
workerRestarts: 0, // Number of worker restarts
routingErrors: 0, // Messages that couldn't be routed
clientConnects: 5, // Client connections (TCP/Unix modes)
clientDisconnects: 2 // Client disconnections (TCP/Unix modes)
}bus.getWorkerCount(): number
Get number of running workers.
bus.getClientCount(): number
Get number of connected clients (TCP/Unix modes only, returns 0 in embedded mode).
bus.getListenMode(): string
Get the listen mode: 'none', 'tcp', or 'unix'.
bus.isRunning(): boolean
Check if bus is in RUNNING state.
bus.destroy(): void
Destroy the bus and release all resources.
Constants
const { BusState, ListenMode } = require('@stdiobus/node');
// Bus states
BusState.CREATED // 0
BusState.STARTING // 1
BusState.RUNNING // 2
BusState.STOPPING // 3
BusState.STOPPED // 4
// Listen modes
ListenMode.NONE // 'none'
ListenMode.TCP // 'tcp'
ListenMode.UNIX // 'unix'Docker Mode (Windows & Cross-Platform)
Run stdio_bus in a Docker container. Required on Windows, optional on macOS/Linux.
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configPath: './config.json',
backend: 'docker',
docker: {
image: 'stdiobus/stdiobus:node20',
pullPolicy: 'if-missing'
}
});
await bus.start();
console.log('Backend:', bus.getBackendType()); // 'docker'
// Same API as native backend
const result = await bus.request('tools/list', {});
await bus.stop();Docker options:
| Option | Default | Description |
|--------|---------|-------------|
| image | stdiobus/stdiobus:node20 | Docker image to use |
| pullPolicy | if-missing | never, if-missing, or always |
| enginePath | docker | Path to docker CLI |
| startupTimeoutMs | 15000 | Container startup timeout |
| containerNamePrefix | stdiobus | Container name prefix |
| extraArgs | [] | Extra docker run arguments |
| env | {} | Environment variables |
Requirements: Docker Desktop installed and running.
Examples
Echo Server (TCP)
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configPath: './echo-config.json',
listenMode: 'tcp',
tcpHost: '0.0.0.0',
tcpPort: 9000
});
await bus.start();
console.log('Echo server running on port 9000');
process.on('SIGINT', async () => {
console.log('Shutting down...');
await bus.stop();
process.exit(0);
});MCP Proxy
const { StdioBus } = require('@stdiobus/node');
const bus = new StdioBus({
configPath: './mcp-config.json',
listenMode: 'unix',
unixPath: '/tmp/mcp-proxy.sock'
});
await bus.start();
// Workers handle MCP protocol, clients connect via Unix socketArchitecture
sequenceDiagram
participant App as Node.js Application
participant JS as @stdiobus/node
participant NAPI as binding.c (N-API)
participant Lib as libstdio_bus.a
participant W as Worker Processes
App->>JS: new StdioBus(config)
JS->>NAPI: create(options)
NAPI->>Lib: stdio_bus_create()
App->>JS: bus.start()
JS->>NAPI: start()
NAPI->>Lib: stdio_bus_start()
Lib->>W: fork/exec workers
App->>JS: bus.send(message)
JS->>NAPI: send(msg)
NAPI->>Lib: stdio_bus_ingest()
Lib->>W: route to worker (stdin)
W->>Lib: response (stdout)
Lib->>NAPI: on_message callback
NAPI->>JS: poll() returns messages
JS->>App: onMessage(handler)
App->>JS: bus.stop()
JS->>NAPI: stop()
NAPI->>Lib: stdio_bus_stop()
Lib->>W: SIGTERMThe C library handles:
- Worker process management - Fork/exec, monitoring, restart with backoff
- Message routing - Session-based routing with round-robin assignment
- NDJSON framing - Line-delimited JSON-RPC messages
- Backpressure - Per-connection output queues with limits
- TCP/Unix listeners - Accept external client connections
License
Apache-2.0
