@businys/ops
v0.8.0
Published
Production MCP middleware: audit, rate limiting, reputation, metering, OpenTelemetry tracing, stdio bridge, and the Observer Mode dashboard.
Maintainers
Readme
@businys/ops
Production middleware for MCP servers. Observer Mode, stdio Bridge, OpenTelemetry, Agent Lineage, rate limiting, metering, audit logging, and agent reputation — extracted from the production stack at businys.app.
npm install @businys/opsSee everything. Monetize anything. Trust completely. — businys.dev
Observer Mode
One import. No account. Dashboard at localhost:3100.
import { observe } from '@businys/ops';
const ops = await observe();
// Dashboard: http://localhost:3100
// Add ops.middleware to your MCP server pipeline
// Every tool call streams to the dashboard in real timeThe dashboard shows every tool call: agent ID, tool name, duration, status, reputation score. SSE-powered, no polling.
Options
const ops = await observe({
port: 3200,
hostname: '0.0.0.0',
maxCalls: 500, // ring buffer size, default 200
});
// Programmatic access
const calls = await ops.storage.getCalls({ limit: 20 });
const stats = await ops.storage.getStats();
await ops.close();Dashboard API
GET /api/calls paginated call history
GET /api/stats aggregated stats
GET /api/reputation/:id agent reputation record
GET /events SSE stream of new callsstdio Bridge
Wrap any stdio MCP server as a managed Streamable HTTP endpoint with the full middleware pipeline. Works with any server — Node, Python, npx.
import { createBridge } from '@businys/ops';
const bridge = await createBridge(['node', 'my-server.js'], {
port: 3100,
// proxy: createMCPProxy({ rateLimit: { globalMax: 100 } }),
});
console.log(bridge.url); // http://localhost:3100
// GET /health → { status: "ok", serverInfo: { ... } }
// POST / → JSON-RPC; tools/call runs through full middleware pipeline
// Other methods → proxied directly to the child process
await bridge.close();CLI (bridge)
npx @businys/ops bridge node ./my-server.js
npx @businys/ops bridge python server.py --port 3200 --rate-limit 50
npx @businys/ops bridge npx some-mcp-server --name my-tools
Options:
--port <n> HTTP port (default: 3100)
--host <host> Bind hostname (default: localhost)
--name <name> Server name for middleware context
--rate-limit <n> Max calls per agent per minute (default: 100)
--timeout <ms> Response timeout (default: 30000)The bridge reads X-Agent-Id and Authorization headers to identify agents, and propagates Agent Lineage headers automatically (see below).
OpenTelemetry
Pass any OTel-compatible Tracer — zero new dependencies. The middleware uses structural typing so it works with any OTel SDK version.
import { createMCPProxy } from '@businys/ops';
import { trace } from '@opentelemetry/api';
const proxy = createMCPProxy({
telemetry: {
tracer: trace.getTracer('my-mcp-server'),
recordInput: true, // attach input JSON as span attribute
inputMaxLength: 500, // truncate long inputs (default 500)
attributePrefix: 'mcp', // default
},
});Span name: serverName/toolName. Standard attributes:
| Attribute | Value |
|---|---|
| mcp.tool.name | Tool name |
| mcp.tool.group | Tool group (prefix before first _) |
| mcp.tool.tier | core / craft / kit |
| mcp.agent.id | Agent identifier |
| mcp.server.name | Server name |
| mcp.tool.destructive | Boolean |
| mcp.duration_ms | Call duration |
| mcp.is_error | Boolean |
| mcp.status_code | If present in result |
Agent Lineage
A causal DAG that traces every tool call back to the originating human intent — through every agent delegation, with cryptographic integrity. Each node is SHA-256 chained to its parent, making the chain tamper-evident.
Built for the EU AI Act (August 2026 deadline), SOC 2 auditors, and multi-agent workflows.
import { createMCPProxy, MemoryLineageStore, verifyLineage } from '@businys/ops';
const store = new MemoryLineageStore();
const proxy = createMCPProxy({
lineage: { store },
});
// After calls complete — verify the chain is intact
const result = await verifyLineage(rootId, store);
console.log(result.valid); // true iff all hashes check out
console.log(result.maxDepth); // delegation depth reached
console.log(result.errors); // [] if validHeader propagation
When using the Bridge, lineage context propagates automatically via HTTP headers:
# Inbound (set by calling agent)
X-Lineage-Root: <rootId> originating human prompt
X-Lineage-Parent: <parentId> immediate parent node
X-Lineage-Depth: <number> delegation depth (0 = human)
# Outbound (returned by bridge in response)
X-Lineage-Id: <nodeId> id of the node created for this callDownstream agents receive X-Lineage-Id and pass it forward as X-Lineage-Parent to build the chain across service boundaries.
Custom LineageStore
import type { LineageStore, LineageNode } from '@businys/ops';
const myStore: LineageStore = {
async recordNode(node: LineageNode) { /* persist to DB */ },
async getChain(rootId: string) { /* return nodes sorted by timestamp */ },
async getNode(id: string) { /* lookup single node */ },
};Full pipeline — createMCPProxy
Assembles the complete middleware stack in the correct order.
import { createMCPProxy, MemoryLineageStore } from '@businys/ops';
const proxy = createMCPProxy({
rateLimit: {
globalMax: 1000, // calls per window across all agents
groupMax: 100, // calls per window per agent
windowMs: 3_600_000 // 1 hour
},
lineage: { store: new MemoryLineageStore() },
// telemetry: { tracer },
auditLog: (entry) => console.log(entry), // default: stderr
disable: {
confirmation: true, // disable specific layers
},
middleware: [myCustomMiddleware], // append after built-ins
});
// proxy.middleware — the assembled pipeline
// proxy.storage — the StorageAdapter
// proxy.run(ctx, handler) — run pipeline directlyDefault pipeline order:
1. Lineage — causal DAG node created before anything can block the call
2. Telemetry — OTel span wraps full duration (outermost after lineage)
3. Reputation — blocks/throttles agents by behavior score
4. Rate limit — per-agent + per-group sliding window
5. Confirmation — destructive actions require { confirm: true }
6. Metering — records every call to StorageAdapter
7. Audit — structured JSON to stderrIndividual middleware
import {
createReputationMiddleware,
createRateLimitMiddleware,
createAuditMiddleware,
createConfirmationMiddleware,
createMeteringMiddleware,
createTelemetryMiddleware,
createLineageMiddleware,
MemoryAdapter,
MemoryLineageStore,
} from '@businys/ops';
const storage = new MemoryAdapter();
const middleware = [
createLineageMiddleware({ store: new MemoryLineageStore() }),
createReputationMiddleware(storage),
createRateLimitMiddleware({ globalMax: 60 }),
createConfirmationMiddleware(),
createMeteringMiddleware({ storage }),
createAuditMiddleware(),
];Agent reputation
Agents accumulate a score (0–1000, default 100) based on behavior. Scores drop on bad signals and recover on successes.
| Signal | Impact | |---|---| | Successful call | +1 | | Error response | −5 | | Loop detected | −50 | | Rate limit breach | −25 | | Schema violation | −25 |
| Score | Status | |---|---| | > 25 | Active — normal operation | | ≤ 25 | Throttled — warning appended to every response | | ≤ 0 | Blocked — call rejected |
Loop detection uses a ring buffer of the last 5 calls per agent. Three or more identical calls (same tool + same input hash) triggers a loop signal.
CLI
npx @businys/ops observe Start dashboard at localhost:3100
npx @businys/ops observe --port 3200
npx @businys/ops status Print stats from a running instance
npx @businys/ops bridge node ./server.js Wrap a stdio server as HTTP
npx @businys/ops bridge python s.py --port 3200 --rate-limit 50
npx @businys/ops versionCustom storage
MemoryAdapter is the default — zero config, in-process ring buffer.
Bring your own backend by implementing StorageAdapter:
import type { StorageAdapter } from '@businys/ops';
class PostgresAdapter implements StorageAdapter {
async recordCall(record) { /* INSERT */ }
async getCalls(options) { /* SELECT */ }
async getStats() { /* aggregations */ }
async getReputation(agentId) { /* SELECT */ }
async updateReputation(agentId, sig) { /* UPSERT */ }
async checkLoop(agentId, hash) { /* ring buffer */ }
subscribe(listener) { return () => {}; }
}Writing middleware
import type { Middleware } from '@businys/ops';
const myMiddleware: Middleware = {
name: 'my-middleware',
async execute(ctx, next) {
console.log('Before:', ctx.toolName);
const result = await next();
console.log('After:', result.isError ? 'error' : 'ok');
return result;
},
};MiddlewareContext
interface MiddlewareContext {
toolName: string
toolGroup: string
toolTier: 'core' | 'craft' | 'kit'
method: string
path: string
input: Record<string, unknown>
agentId: string
serverName: string
startedAt: number // Date.now()
destructive: boolean // true = confirmation required
}Links
MIT License © Hiatys Systems Ltd.
