@cloudsignal/ai-transport
v0.1.1
Published
Vercel AI SDK transport using CloudSignal MQTT — drop-in replacement for HTTP streaming with offline recovery, multi-device sync, and ACL control
Maintainers
Readme
@cloudsignal/ai-transport
Drop-in MQTT transport for the Vercel AI SDK. Replace HTTP streaming with CloudSignal MQTT — get offline recovery, multi-device sync, and broker-level access control. Zero UI changes.
import { useChat } from '@ai-sdk/react';
import { CloudSignalChatTransport } from '@cloudsignal/ai-transport';
const { messages, sendMessage } = useChat({
transport: new CloudSignalChatTransport({
api: '/api/chat',
authEndpoint: '/api/auth/mqtt',
wssUrl: 'wss://connect.cloudsignal.app:18885/',
}),
});Same messages. Same sendMessage. Same UI code. Different infrastructure.
What changes
| | HTTP streaming (default) | CloudSignal MQTT | |---|---|---| | Disconnection | Response lost, must re-request | Retained message delivers full response on reconnect | | Multi-device | Not possible (point-to-point) | Any number of clients subscribe to the same session | | Access control | App-level middleware | Broker-level ACL on every publish/subscribe | | Tenant isolation | App-level | Broker-enforced mountpoints | | Delivery guarantee | None | QoS 0 (tokens), QoS 1 (complete response) | | Per-message cost | N/A | N/A (vs Ably/Pusher per-message fees) | | Protocol | HTTP | MQTT (ISO/IEC 20922) — no vendor lock-in |
Install
npm install @cloudsignal/ai-transport @cloudsignal/mqtt-client ai @ai-sdk/react@cloudsignal/mqtt-client and ai are peer dependencies.
Quick Start
1. Client — swap the transport
// app/page.tsx
'use client';
import { useChat } from '@ai-sdk/react';
import { CloudSignalChatTransport } from '@cloudsignal/ai-transport';
const transport = new CloudSignalChatTransport({
api: '/api/chat',
authEndpoint: '/api/auth/mqtt',
wssUrl: process.env.NEXT_PUBLIC_CLOUDSIGNAL_WSS_URL!,
});
export default function Chat() {
const { messages, sendMessage } = useChat({ transport });
return (
<div>
{messages.map((m) => (
<div key={m.id}>
{m.role}: {m.parts.map((p) => p.type === 'text' ? p.text : null)}
</div>
))}
<form onSubmit={(e) => { e.preventDefault(); sendMessage({ text: e.currentTarget.input.value }); }}>
<input name="input" />
</form>
</div>
);
}2. Server — publish stream to MQTT
// app/api/chat/route.ts
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { publishStreamToMqtt } from '@cloudsignal/ai-transport/server';
import { getServerMqttClient } from '@/lib/mqtt-server';
export async function POST(request: Request) {
const { chatId, messages, requestId } = await request.json();
const mqttClient = await getServerMqttClient();
const result = streamText({
model: anthropic('claude-sonnet-4-20250514'),
messages,
});
// Publish AI stream to MQTT — response arrives via subscription, not HTTP
publishStreamToMqtt(mqttClient, chatId, result, { requestId }).catch(console.error);
return Response.json({ status: 'streaming', chatId }, { status: 202 });
}3. Auth endpoint — issue temporary MQTT credentials
// app/api/auth/mqtt/route.ts
export async function POST() {
const res = await fetch(`${process.env.CLOUDSIGNAL_TOKEN_SERVICE_URL}/v1/create`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
organization_id: process.env.CLOUDSIGNAL_ORG_ID,
secret_key: process.env.CLOUDSIGNAL_SECRET_KEY,
user_email: `user-${crypto.randomUUID().slice(0, 8)}@chat.cloudsignal.app`,
}),
});
const data = await res.json();
return Response.json({
username: data.mqtt_username,
password: data.token_password,
});
}4. Server MQTT client — singleton connection
// lib/mqtt-server.ts
import CloudSignalClient from '@cloudsignal/mqtt-client';
let client: CloudSignalClient | null = null;
export async function getServerMqttClient(): Promise<CloudSignalClient> {
if (client?.isConnected()) return client;
client = new CloudSignalClient({ preset: 'server' });
await client.connectWithToken({
host: process.env.CLOUDSIGNAL_MQTTS_URL!,
organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
userEmail: '[email protected]',
});
return client;
}How It Works
1. useChat() calls transport.sendMessages()
2. Transport subscribes to MQTT topics chat/{chatId}/stream and chat/{chatId}/complete
3. Transport sends HTTP POST to /api/chat (returns 202 immediately)
4. Server calls streamText(), publishes each UIMessageChunk to MQTT (QoS 0)
5. Browser receives chunks via WebSocket subscription → ReadableStream → useChat renders
6. Server publishes full response as retained message to chat/{chatId}/complete (QoS 1)
7. If client disconnects and reconnects, broker delivers the retained complete messageThe HTTP POST only triggers the server. The actual response travels over MQTT. This is the key architectural difference from the default HTTP transport.
MQTT Topics
| Topic | QoS | Retained | Purpose |
|-------|-----|----------|---------|
| chat/{chatId}/stream | 0 | No | UIMessageChunk stream (each token) |
| chat/{chatId}/complete | 1 | Yes | Full response for offline recovery |
API
CloudSignalChatTransport
Implements the Vercel AI SDK ChatTransport interface.
new CloudSignalChatTransport({
// Required (one of):
client: existingCloudSignalClient, // Use a pre-connected client
// OR
authEndpoint: '/api/auth/mqtt', // Endpoint returning { username, password }
wssUrl: 'wss://connect.cloudsignal.app:18885/',
// Optional:
api: '/api/chat', // Server endpoint (default: '/api/chat')
headers: { Authorization: 'Bearer ...' }, // Extra headers (or a function returning them)
body: { model: 'claude-sonnet-4-20250514' }, // Extra body fields
streamTimeout: 60_000, // Close stream if no chunk arrives (ms, default: 60000)
reconnectTimeout: 1_000, // Wait for retained message on reconnect (ms, default: 1000)
});publishStreamToMqtt (server)
import { publishStreamToMqtt } from '@cloudsignal/ai-transport/server';
await publishStreamToMqtt(mqttClient, chatId, streamTextResult, {
requestId: 'req_abc123', // Matches client request for chunk filtering
streamQos: 0, // QoS for stream chunks (default: 0)
completeQos: 1, // QoS for retained complete message (default: 1)
});Security Best Practices
Credential isolation
Browser only sees: temporary MQTT username/password (60-min TTL)
Server only sees: CLOUDSIGNAL_SECRET_KEY, ANTHROPIC_API_KEY
Broker enforces: per-message ACL, mountpoint isolation, message size limitsThe organization's secret key (sk_...) never reaches the browser. The browser receives temporary credentials that expire and cannot be used to create new tokens.
Recommended ACL policy
{
"schema": "cloudsignal://acl/v2",
"default": "deny",
"global": [
{ "action": "sub", "topic": "chat/+/stream", "retain": false },
{ "action": "sub", "topic": "chat/+/complete" }
],
"rules": [
{
"action": "pub",
"topic": "chat/{user_id}/messages",
"binding": "user_id"
}
],
"publishers": [
{
"action": "pub",
"topic": "chat/#",
"retain": true,
"qos": [0, 1]
}
]
}This policy:
- Default deny — nothing is allowed unless explicitly permitted
- Global subscribe — any authenticated user can subscribe to chat streams (but not retain)
- Identity-bound publish — users can only publish to
chat/{their_user_id}/messages - Server publish — service accounts can publish to any chat topic with retain (for offline recovery)
- No client retain — clients cannot poison retained messages
Transport encryption
- Server → Broker: MQTTS (TLS on port 8883) — native MQTT, no WebSocket overhead
- Browser → Broker: WSS (TLS on port 18885) — MQTT over WebSocket Secure
Tenant isolation
Each organization operates in a separate VerneMQ mountpoint (org_short_id). Clients in different mountpoints cannot see each other's topics. This is a broker-level boundary — a bug in your application code cannot bypass it.
Edge Case Handling
| Scenario | Mitigation | |----------|-----------| | Server crashes mid-stream | Stream timeout closes the ReadableStream after configurable idle period | | HTTP POST returns error | Stream cleaned up immediately, error propagated to AI SDK | | Two rapid messages interleave | Request ID filtering — each stream ignores chunks from other requests | | Stale retained message from previous session | Chat ID validation on reconnectToStream | | Client disconnects mid-stream | Server continues, publishes retained complete message; client recovers on reconnect | | Subscription accumulation | Cleanup unsubscribes from stream topic on completion or abort | | Abort signal fired | Cleanup runs, stream closed, subscriptions removed | | Retained message never arrives | Configurable reconnect timeout (default 1s), resolves null |
Environment Variables
# Server-side (never exposed to browser)
ANTHROPIC_API_KEY=sk-ant-...
CLOUDSIGNAL_SECRET_KEY=sk_...
CLOUDSIGNAL_ORG_ID=your-org-uuid
CLOUDSIGNAL_TOKEN_SERVICE_URL=https://auth.cloudsignal.app
CLOUDSIGNAL_MQTTS_URL=mqtts://connect.cloudsignal.app:8883
# Client-side (safe to expose)
NEXT_PUBLIC_CLOUDSIGNAL_WSS_URL=wss://connect.cloudsignal.app:18885/Demo
See cloudsignal/ai-chat — a complete Next.js 16 chat app using this transport.
License
MIT
