@mqttkit/asyncapi
v0.2.0
Published
AsyncAPI 3.0 documentation generator and HTTP plugin for mqttkit.
Downloads
76
Maintainers
Readme
@mqttkit/asyncapi
mqttkit 的 AsyncAPI 3.0 文档生成器与 HTTP 插件。读取 MqttRouter 路由元数据生成 AsyncAPI 文档,通过 HTTP 同时提供 JSON、YAML 和可浏览的文档页面。
安装
bun add @mqttkit/core @mqttkit/asyncapi使用
import { aedes } from '@mqttkit/aedes'
import { asyncapi } from '@mqttkit/asyncapi'
import { MqttApp, router } from '@mqttkit/core'
const app = new MqttApp()
.use(aedes({ tcp: { port: 1883 } }))
.use(
router()
.topic('devices/:uid/events', {
qos: 1,
schema: {
type: 'object',
required: ['temperature'],
properties: { temperature: { type: 'number' } },
},
async onMessage(ctx) {
console.log(ctx.params.uid, ctx.payload.toString())
},
meta: {
summary: '设备遥测上报',
description: '所属用户上报设备遥测数据。',
tags: ['device'],
},
}),
)
.use(
asyncapi({
info: { title: 'mqttkit demo', version: '0.1.0' },
servers: { tcp: { host: 'localhost:1883', protocol: 'mqtt' } },
port: 9000,
}),
)
await app.listen()
// http://localhost:9000/docs
// http://localhost:9000/asyncapi.json
// http://localhost:9000/asyncapi.yamlSchema 集成
插件读取每个路由的 schema 并作为 messages.<id>.payload 输出。根据 schema 类型行为不同:
| schema 类型 | 文档里的结果 |
| -------------------------------------------- | ----------------------------------------------------- |
| 普通 JSON Schema 对象 | 原样使用 |
| TypeBox Type.X(...) (原始) | 原样使用(TypeBox schema 本身就是 JSON Schema) |
| 带 ~jsonSchema 字段的 Standard Schema | 使用挂上的 JSON Schema |
| 没挂 JSON Schema 的 Standard Schema | 占位:{ description: 'Validated by <vendor>' } |
要让 zod schema 在文档里输出完整 JSON Schema,用 @mqttkit/zod 的 jsonify():
import { jsonify } from '@mqttkit/zod'
import { z } from 'zod'
router().topic('users/:id', {
schema: jsonify(z.object({ name: z.string(), age: z.number().int() })),
/* ... */
})TypeBox 不需要额外步骤——通过 @mqttkit/typebox 注册 typeboxProvider 做运行时校验,文档里自动就有完整 schema。
路由注解约定
插件读取 TopicConfig 的以下字段:
| 字段 | 映射到 |
| -------------- | -------------------------------------------- |
| schema | messages.<id>.payload(JSON Schema) |
| qos | bindings.mqtt.qos |
| retain | bindings.mqtt.retain |
| publish | 非 false 时生成 send 操作 |
| subscribe | 非 false 时生成 receive 操作 |
| meta.summary | 操作 summary |
| meta.description | channel 描述 |
| meta.tags | channel 标签 |
| meta.examples | 消息示例 |
| meta.message.name / meta.message.contentType | 消息 ID 与 contentType |
插件参数
asyncapi({
info: { title, version, description? }, // 必填
servers?: { [id]: { host, protocol, description? } },
prefix?: '/api', // URL 前缀
port?: 9000, // 未传 `server` 时自起 HTTP server
host?: '127.0.0.1',
server?: existingHttpServer, // 附加到已有的 http.Server
})接到已有 HTTP 框架
如果不想让插件单独起 HTTP 端口,可以用 createAsyncApiHandlers 拿到 path 与带缓存的 body builder,挂到 Elysia、Hono、Express、Fastify 等任何框架。
import { aedes } from '@mqttkit/aedes'
import { createAsyncApiHandlers } from '@mqttkit/asyncapi'
import { MqttApp, router } from '@mqttkit/core'
import { Elysia } from 'elysia'
const mqttApp = new MqttApp()
.use(aedes({ tcp: { port: 1883 } }))
.use(router().topic('devices/:uid/events', { /* ... */ }))
await mqttApp.listen()
const docs = createAsyncApiHandlers(mqttApp, {
info: { title: 'mqttkit', version: '0.1.0' },
servers: { tcp: { host: 'localhost:1883', protocol: 'mqtt' } },
prefix: '/docs/mqtt',
})
new Elysia()
.get(docs.paths.json, ({ set }) => {
set.headers['content-type'] = 'application/json; charset=utf-8'
return docs.json()
})
.get(docs.paths.yaml, ({ set }) => {
set.headers['content-type'] = 'application/yaml; charset=utf-8'
return docs.yaml()
})
.get(docs.paths.docs, ({ set }) => {
set.headers['content-type'] = 'text/html; charset=utf-8'
return docs.html()
})
.listen(3000)docs.paths 会得到 { json: '/docs/mqtt/asyncapi.json', yaml: '/docs/mqtt/asyncapi.yaml', docs: '/docs/mqtt/docs' }。运行时新增路由后调用 docs.invalidate() 触发重建。
与 MQTT-over-WebSocket 共用同一端口
如果希望用同一个 HTTP 端口既提供 docs,又提供 MQTT-over-WebSocket,让 aedes 复用已有的 http.Server,再把 Elysia 的 handle(Request) 桥接到这个 server 上即可。可运行版本见 examples/asyncapi-elysia:
import { createServer } from 'node:http'
import { Readable } from 'node:stream'
import { aedes } from '@mqttkit/aedes'
import { createAsyncApiHandlers } from '@mqttkit/asyncapi'
import { MqttApp } from '@mqttkit/core'
import { Elysia } from 'elysia'
const httpServer = createServer()
const mqttApp = new MqttApp()
.use(aedes({
tcp: { port: 1883 },
ws: { server: httpServer, path: '/mqtt' }, // 复用同一个 http.Server
}))
// ... routers
await mqttApp.listen()
const docs = createAsyncApiHandlers(mqttApp, { /* ... */ })
const elysia = new Elysia()
.get(docs.paths.docs, ({ set }) => { set.headers['content-type']='text/html'; return docs.html() })
.get(docs.paths.json, ({ set }) => { set.headers['content-type']='application/json'; return docs.json() })
httpServer.on('request', async (req, res) => {
const url = `http://${req.headers.host}${req.url}`
const body = req.method && req.method !== 'GET' && req.method !== 'HEAD'
? (Readable.toWeb(req) as unknown as ReadableStream)
: null
const response = await elysia.handle(new Request(url, {
method: req.method, headers: req.headers as any, body, duplex: 'half',
} as RequestInit))
res.statusCode = response.status
response.headers.forEach((v, k) => res.setHeader(k, v))
if (response.body) for await (const chunk of response.body as any) res.write(chunk)
res.end()
})
httpServer.listen(3300)
// mqtt: mqtt://localhost:1883 (TCP)
// ws://localhost:3300/mqtt (MQTT-over-WS,和 docs 同端口)
// docs: http://localhost:3300/docs/mqtt/docs编程式接口
import { buildAsyncApi, renderAsyncApiHtml } from '@mqttkit/asyncapi'
const doc = buildAsyncApi(app, { info: { title: 'docs', version: '1.0.0' } })
const html = renderAsyncApiHtml(doc, '/asyncapi.json')备注
- 文档页面通过 CDN 加载
@asyncapi/react-component,离线环境请直接消费 JSON 或 YAML 端点。 - 生成的文档符合 AsyncAPI 3.0,可被
@asyncapi/parser校验通过。
