npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@cloudsignal/ai-transport

v0.1.1

Published

Vercel AI SDK transport using CloudSignal MQTT — drop-in replacement for HTTP streaming with offline recovery, multi-device sync, and ACL control

Readme

@cloudsignal/ai-transport

Drop-in MQTT transport for the Vercel AI SDK. Replace HTTP streaming with CloudSignal MQTT — get offline recovery, multi-device sync, and broker-level access control. Zero UI changes.

import { useChat } from '@ai-sdk/react';
import { CloudSignalChatTransport } from '@cloudsignal/ai-transport';

const { messages, sendMessage } = useChat({
  transport: new CloudSignalChatTransport({
    api: '/api/chat',
    authEndpoint: '/api/auth/mqtt',
    wssUrl: 'wss://connect.cloudsignal.app:18885/',
  }),
});

Same messages. Same sendMessage. Same UI code. Different infrastructure.

What changes

| | HTTP streaming (default) | CloudSignal MQTT | |---|---|---| | Disconnection | Response lost, must re-request | Retained message delivers full response on reconnect | | Multi-device | Not possible (point-to-point) | Any number of clients subscribe to the same session | | Access control | App-level middleware | Broker-level ACL on every publish/subscribe | | Tenant isolation | App-level | Broker-enforced mountpoints | | Delivery guarantee | None | QoS 0 (tokens), QoS 1 (complete response) | | Per-message cost | N/A | N/A (vs Ably/Pusher per-message fees) | | Protocol | HTTP | MQTT (ISO/IEC 20922) — no vendor lock-in |

Install

npm install @cloudsignal/ai-transport @cloudsignal/mqtt-client ai @ai-sdk/react

@cloudsignal/mqtt-client and ai are peer dependencies.

Quick Start

1. Client — swap the transport

// app/page.tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { CloudSignalChatTransport } from '@cloudsignal/ai-transport';

const transport = new CloudSignalChatTransport({
  api: '/api/chat',
  authEndpoint: '/api/auth/mqtt',
  wssUrl: process.env.NEXT_PUBLIC_CLOUDSIGNAL_WSS_URL!,
});

export default function Chat() {
  const { messages, sendMessage } = useChat({ transport });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>
          {m.role}: {m.parts.map((p) => p.type === 'text' ? p.text : null)}
        </div>
      ))}
      <form onSubmit={(e) => { e.preventDefault(); sendMessage({ text: e.currentTarget.input.value }); }}>
        <input name="input" />
      </form>
    </div>
  );
}

2. Server — publish stream to MQTT

// app/api/chat/route.ts
import { streamText } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import { publishStreamToMqtt } from '@cloudsignal/ai-transport/server';
import { getServerMqttClient } from '@/lib/mqtt-server';

export async function POST(request: Request) {
  const { chatId, messages, requestId } = await request.json();

  const mqttClient = await getServerMqttClient();

  const result = streamText({
    model: anthropic('claude-sonnet-4-20250514'),
    messages,
  });

  // Publish AI stream to MQTT — response arrives via subscription, not HTTP
  publishStreamToMqtt(mqttClient, chatId, result, { requestId }).catch(console.error);

  return Response.json({ status: 'streaming', chatId }, { status: 202 });
}

3. Auth endpoint — issue temporary MQTT credentials

// app/api/auth/mqtt/route.ts
export async function POST() {
  const res = await fetch(`${process.env.CLOUDSIGNAL_TOKEN_SERVICE_URL}/v1/create`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      organization_id: process.env.CLOUDSIGNAL_ORG_ID,
      secret_key: process.env.CLOUDSIGNAL_SECRET_KEY,
      user_email: `user-${crypto.randomUUID().slice(0, 8)}@chat.cloudsignal.app`,
    }),
  });

  const data = await res.json();
  return Response.json({
    username: data.mqtt_username,
    password: data.token_password,
  });
}

4. Server MQTT client — singleton connection

// lib/mqtt-server.ts
import CloudSignalClient from '@cloudsignal/mqtt-client';

let client: CloudSignalClient | null = null;

export async function getServerMqttClient(): Promise<CloudSignalClient> {
  if (client?.isConnected()) return client;

  client = new CloudSignalClient({ preset: 'server' });
  await client.connectWithToken({
    host: process.env.CLOUDSIGNAL_MQTTS_URL!,
    organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
    userEmail: '[email protected]',
  });

  return client;
}

How It Works

1. useChat() calls transport.sendMessages()
2. Transport subscribes to MQTT topics chat/{chatId}/stream and chat/{chatId}/complete
3. Transport sends HTTP POST to /api/chat (returns 202 immediately)
4. Server calls streamText(), publishes each UIMessageChunk to MQTT (QoS 0)
5. Browser receives chunks via WebSocket subscription → ReadableStream → useChat renders
6. Server publishes full response as retained message to chat/{chatId}/complete (QoS 1)
7. If client disconnects and reconnects, broker delivers the retained complete message

The HTTP POST only triggers the server. The actual response travels over MQTT. This is the key architectural difference from the default HTTP transport.

MQTT Topics

| Topic | QoS | Retained | Purpose | |-------|-----|----------|---------| | chat/{chatId}/stream | 0 | No | UIMessageChunk stream (each token) | | chat/{chatId}/complete | 1 | Yes | Full response for offline recovery |

API

CloudSignalChatTransport

Implements the Vercel AI SDK ChatTransport interface.

new CloudSignalChatTransport({
  // Required (one of):
  client: existingCloudSignalClient,      // Use a pre-connected client
  // OR
  authEndpoint: '/api/auth/mqtt',         // Endpoint returning { username, password }
  wssUrl: 'wss://connect.cloudsignal.app:18885/',

  // Optional:
  api: '/api/chat',                       // Server endpoint (default: '/api/chat')
  headers: { Authorization: 'Bearer ...' }, // Extra headers (or a function returning them)
  body: { model: 'claude-sonnet-4-20250514' },   // Extra body fields
  streamTimeout: 60_000,                  // Close stream if no chunk arrives (ms, default: 60000)
  reconnectTimeout: 1_000,               // Wait for retained message on reconnect (ms, default: 1000)
});

publishStreamToMqtt (server)

import { publishStreamToMqtt } from '@cloudsignal/ai-transport/server';

await publishStreamToMqtt(mqttClient, chatId, streamTextResult, {
  requestId: 'req_abc123',   // Matches client request for chunk filtering
  streamQos: 0,              // QoS for stream chunks (default: 0)
  completeQos: 1,            // QoS for retained complete message (default: 1)
});

Security Best Practices

Credential isolation

Browser only sees:  temporary MQTT username/password (60-min TTL)
Server only sees:   CLOUDSIGNAL_SECRET_KEY, ANTHROPIC_API_KEY
Broker enforces:    per-message ACL, mountpoint isolation, message size limits

The organization's secret key (sk_...) never reaches the browser. The browser receives temporary credentials that expire and cannot be used to create new tokens.

Recommended ACL policy

{
  "schema": "cloudsignal://acl/v2",
  "default": "deny",
  "global": [
    { "action": "sub", "topic": "chat/+/stream", "retain": false },
    { "action": "sub", "topic": "chat/+/complete" }
  ],
  "rules": [
    {
      "action": "pub",
      "topic": "chat/{user_id}/messages",
      "binding": "user_id"
    }
  ],
  "publishers": [
    {
      "action": "pub",
      "topic": "chat/#",
      "retain": true,
      "qos": [0, 1]
    }
  ]
}

This policy:

  • Default deny — nothing is allowed unless explicitly permitted
  • Global subscribe — any authenticated user can subscribe to chat streams (but not retain)
  • Identity-bound publish — users can only publish to chat/{their_user_id}/messages
  • Server publish — service accounts can publish to any chat topic with retain (for offline recovery)
  • No client retain — clients cannot poison retained messages

Transport encryption

  • Server → Broker: MQTTS (TLS on port 8883) — native MQTT, no WebSocket overhead
  • Browser → Broker: WSS (TLS on port 18885) — MQTT over WebSocket Secure

Tenant isolation

Each organization operates in a separate VerneMQ mountpoint (org_short_id). Clients in different mountpoints cannot see each other's topics. This is a broker-level boundary — a bug in your application code cannot bypass it.

Edge Case Handling

| Scenario | Mitigation | |----------|-----------| | Server crashes mid-stream | Stream timeout closes the ReadableStream after configurable idle period | | HTTP POST returns error | Stream cleaned up immediately, error propagated to AI SDK | | Two rapid messages interleave | Request ID filtering — each stream ignores chunks from other requests | | Stale retained message from previous session | Chat ID validation on reconnectToStream | | Client disconnects mid-stream | Server continues, publishes retained complete message; client recovers on reconnect | | Subscription accumulation | Cleanup unsubscribes from stream topic on completion or abort | | Abort signal fired | Cleanup runs, stream closed, subscriptions removed | | Retained message never arrives | Configurable reconnect timeout (default 1s), resolves null |

Environment Variables

# Server-side (never exposed to browser)
ANTHROPIC_API_KEY=sk-ant-...
CLOUDSIGNAL_SECRET_KEY=sk_...
CLOUDSIGNAL_ORG_ID=your-org-uuid
CLOUDSIGNAL_TOKEN_SERVICE_URL=https://auth.cloudsignal.app
CLOUDSIGNAL_MQTTS_URL=mqtts://connect.cloudsignal.app:8883

# Client-side (safe to expose)
NEXT_PUBLIC_CLOUDSIGNAL_WSS_URL=wss://connect.cloudsignal.app:18885/

Demo

See cloudsignal/ai-chat — a complete Next.js 16 chat app using this transport.

License

MIT