@cloudsignal/mcp-over-mqtt
v0.1.5
Published
Model Context Protocol (MCP) over MQTT 5 — reference implementation and draft protocol spec, with per-tool broker-enforced ACL.
Maintainers
Readme
@cloudsignal/mcp-over-mqtt
MCP over MQTT — a reference implementation of the Model Context Protocol using MQTT 5 as the transport, with a draft protocol spec. Ships with broker-enforced per-tool ACL (identity-bound) so a compromised agent can only call tools it's authorized for — enforcement is at the wire, not in application code.
- Protocol-first: SPEC.md defines the wire protocol.
- Production-validated: running in the CloudSignal GTM agent system.
- Broker-agnostic API: designed for any MQTT 5 broker. v1.0 will verify EMQX, HiveMQ, Mosquitto.
- Companion to
@cloudsignal/agent: shares the same namespace and ACL model — A2A + MCP on a single broker.
Install
npm install @cloudsignal/mcp-over-mqtt @cloudsignal/mqtt-clientQuickstart — server
import { MCPServer, defineTool } from '@cloudsignal/mcp-over-mqtt';
const queryWatchConfigs = defineTool({
name: 'query_watch_configs',
description: 'List active watch configurations',
input_schema: {
type: 'object',
properties: {
platform: { type: 'string' },
},
additionalProperties: false,
},
allowed_callers: ['scout', 'analyst'],
handler: async (args, ctx) => {
console.log(`[${ctx.client}] querying with`, args);
// return real data from your DB
return { configs: [] };
},
});
const server = new MCPServer({
name: 'gtm-mcp-server',
namespace: 'gtm',
tools: [queryWatchConfigs],
broker: 'mqtts://connect.cloudsignal.app:8883',
credentials: {
orgId: 'org_...',
secretKey: 'sk_...',
userEmail: '[email protected]',
tokenServiceUrl: 'https://auth.cloudsignal.app',
},
});
await server.start();Quickstart — client
import { MCPClient } from '@cloudsignal/mcp-over-mqtt';
const client = new MCPClient({
clientId: 'scout',
namespace: 'gtm',
broker: 'mqtts://connect.cloudsignal.app:8883',
credentials: { /* same shape as server */ },
});
await client.start();
const result = await client.call('query_watch_configs', { platform: 'reddit' });
console.log(result);Streaming
Tools that want to emit intermediate output:
const longRunning = defineTool({
name: 'crawl_site',
description: 'Crawl a website, emitting progress',
supports_streaming: true,
input_schema: { type: 'object', properties: { url: { type: 'string' } } },
handler: async ({ url }, ctx) => {
for (let i = 0; i < 10; i++) {
await ctx.stream?.(`Crawled ${i * 10}%`);
await new Promise((r) => setTimeout(r, 500));
}
return { url, pages_scraped: 42 };
},
});Client subscribes to the stream:
const result = await client.call('crawl_site', { url: 'https://example.com' }, {
onStream: (partial) => console.log('progress:', partial),
timeoutMs: 120_000,
});Security — the real story
The unique value: broker-level per-tool ACL. ACL v2 policy example:
{
"rules": [
{ "topic": "gtm/mcp/tools/query_watch_configs/call",
"action": "pub", "binding": "agent_id",
"agents": ["scout", "analyst"] },
{ "topic": "gtm/mcp/tools/create_social_post/call",
"action": "pub", "binding": "agent_id",
"agents": ["operator"] },
{ "topic": "gtm/mcp/clients/{$self}/responses",
"action": "sub", "binding": "agent_id" }
]
}Scout CANNOT call create_social_post — the broker rejects the publish.
No application-layer trust needed. An exploited agent can only touch tools
it was authorized for.
Protocol
See SPEC.md — topic structure, Tool Card schema, call/response envelopes, security model, MQTT 3.1.1 compatibility, interop with A2A over MQTT.
DRAFT spec, v0.1. Stabilized in v1.0.
License
MIT
