sveltekit-inngest
v1.1.0
Published
Typed realtime subscriptions for SvelteKit using Inngest + SSE.
Readme
sveltekit-inngest
Typed realtime subscriptions for SvelteKit using Inngest + SSE.
This package is bus-first:
- one global SSE endpoint
- static server channel registry
- multi-channel subscriptions in one
RealtimeManager - per-channel topic authorization
- optional per-message reauthorization (fail-closed)
The preferred client API is getRealtimeBusState(...).onMessage(...) for handling
live realtime events. Retained topic-state helpers are still available when you
want the latest value as Svelte 5 state (.current) instead of $store syntax.
Features
- Typed topic payloads from
inngest/realtimechannel definitions - One global realtime endpoint with channel routing
- Per-channel auth with topic filtering (
allowedTopics) - Optional per-message reauthorization before each emit
- Optional server and client failure callbacks
- Preferred typed message handling with
getRealtimeBusState(...).onMessage(...) - Optional retained-value reads with
getRealtimeBusTopicJson()and deprecatedgetRealtimeBusTopicState() - Multi-subscription diffing in
RealtimeManager - Built-in health events:
connecting,connected,degraded
Requirements
Peer dependencies:
svelte(Svelte 5)@sveltejs/kitsveltekit-sseinngest
Installation
bun add sveltekit-inngest
# or
pnpm add sveltekit-inngest
# or
npm install sveltekit-inngestQuick Start (Install to Live Events)
1. Define channels and topics
// src/lib/realtime/channels.ts
import { realtime } from "inngest/realtime";
import { z } from "zod";
const messageSchema = z.object({
message: z.string(),
});
export const demoChannel = realtime.channel({
name: "demo",
topics: {
message: {
schema: messageSchema,
},
"admin-message": {
schema: messageSchema,
},
},
});
export const userChannel = realtime.channel({
name: (userId: string) => `user:${userId}`,
topics: {
message: {
schema: messageSchema,
},
},
});2. Configure Inngest
// src/lib/server/inngest.ts
import { Inngest, eventType, staticSchema } from "inngest";
type DemoMessageEvent = {
message: string;
};
export const demoMessageEvent = eventType("app/demo.message", {
schema: staticSchema<DemoMessageEvent>(),
});
export const inngest = new Inngest({
id: "my-app",
});3. Publish to topics from an Inngest function
// src/lib/server/functions.ts
import { demoMessageEvent, inngest } from "$lib/server/inngest";
import { demoChannel } from "$lib/realtime/channels";
export const demoRealtime = inngest.createFunction(
{ id: "demo-realtime", triggers: [demoMessageEvent] },
async ({ event, step }) => {
await step.realtime.publish("publish message", demoChannel.message, {
message: event.data.message,
});
}
);4. Expose Inngest serve endpoint
// src/routes/api/inngest/+server.ts
import { serve } from "inngest/sveltekit";
import { inngest } from "$lib/server/inngest";
import { demoRealtime } from "$lib/server/functions";
export const { GET, POST, PUT } = serve({
client: inngest,
functions: [demoRealtime],
});5. Create one global realtime endpoint
// src/routes/api/events/+server.ts
import { createRealtimeEndpoint } from "sveltekit-inngest/server";
import { demoChannel, userChannel } from "$lib/realtime/channels";
import { inngest } from "$lib/server/inngest";
export const POST = createRealtimeEndpoint({
inngest,
reauthorizeOnEachMessage: true,
channels: {
demo: {
channel: demoChannel,
authorize: ({ locals, topics }) => {
if (!locals?.user) {
throw new Error("Missing authenticated user");
}
if (locals.user.role === "admin") return true;
return {
allowedTopics: topics.filter((topic) => topic === "message"),
};
},
},
user: {
channel: userChannel,
channelParams: (_event, requestedChannelId) => {
return requestedChannelId.startsWith("user:")
? requestedChannelId.slice("user:".length)
: "";
},
authorize: () => true,
},
},
onFailure: ({ stage, message }) => {
console.error(`[realtime:${stage}]`, message);
return { message };
},
});6. Provide subscriptions with RealtimeManager
<!-- src/routes/+page.svelte -->
<script lang="ts">
import { RealtimeManager } from "sveltekit-inngest/client";
import { demoChannel, userChannel } from "$lib/realtime/channels";
import RealtimePanel from "./realtime-panel.svelte";
const subscriptions = [
{ channel: demoChannel },
{ channel: userChannel, channelParams: "alice" },
];
</script>
<RealtimeManager endpoint="/api/events" {subscriptions}>
<RealtimePanel />
</RealtimeManager>7. Handle messages with onMessage() (preferred)
<!-- src/routes/realtime-panel.svelte -->
<script lang="ts">
import {
getRealtimeBusState,
getRealtimeBusTopicState,
} from "sveltekit-inngest/client";
import { demoChannel } from "$lib/realtime/channels";
const { health, onMessage } = getRealtimeBusState(demoChannel);
let liveMessages = $state<string[]>([]);
onMessage("message", async ({ data }) => {
liveMessages = [data.message, ...liveMessages].slice(0, 5);
});
// Optional: keep the latest value around for display-oriented UI.
const message = getRealtimeBusTopicState<typeof demoChannel, "message">(
demoChannel,
"message"
);
const adminMessage = getRealtimeBusTopicState<typeof demoChannel, "admin-message">(
demoChannel,
"admin-message"
);
</script>
<p>Connection: {health.current?.status ?? "connecting"}</p>
<pre>{JSON.stringify(liveMessages, null, 2)}</pre>
<pre>{JSON.stringify(message.current, null, 2)}</pre>
<pre>{JSON.stringify(adminMessage.current, null, 2)}</pre>Use onMessage(...) as the default way to react to incoming full topic envelopes.
getRealtimeBusTopicState(...) is deprecated: use onMessage(...). This will be
removed in a future major release.
If you still need a retained latest-value read for rendering, prefer
getRealtimeBusTopicJson(...).
8. Send an event
Send demoMessageEvent.create(...) to Inngest, for example:
await inngest.send(
demoMessageEvent.create({
message: "hello realtime",
})
);API Reference
Import paths
| Import path | Exports |
| --- | --- |
| sveltekit-inngest | Client exports (RealtimeManager, getRealtimeBus*, client types) |
| sveltekit-inngest/client | Same as root client exports |
| sveltekit-inngest/server | createRealtimeEndpoint and server types |
| sveltekit-inngest/server/createRealtimeEndpoint | Direct endpoint factory and server types |
Client exports
Runtime exports:
RealtimeManagergetRealtimeBus()getRealtimeBusState(channel, channelParams?)onMessage(topic, handler)viagetRealtimeBusState(...)getRealtimeBusTopicJson(channel, topic, options?)getRealtimeBusTopicState(channel, topic, options?)
Type exports include:
RealtimeManagerProps,RealtimeSubscription,RealtimeResolvedSubscriptionRealtimeClientFailureContext,RealtimeClientFailureSourceHealthPayload,HealthStatusTopicKey<TChannel>,TopicData<TChannel, TTopic>RealtimeBusState<TChannel>,RealtimeTopicHandler<TChannel, TTopic>,RealtimeUnsubscribeRealtimeTopicEnvelope,RealtimeTopicMessageRealtimeRequestParams,ReactiveCurrent,RealtimeTopicState,RealtimeHealthState
Server exports
Runtime export:
createRealtimeEndpoint(options)
Type exports include:
RealtimeEndpointOptionsRealtimeChannelConfigRealtimeAuthorizeContextRealtimeReauthorizeContextRealtimeHealthCheckOptionsRealtimeServerFailureContextRealtimeServerFailureStage
createRealtimeEndpoint(options)
Creates a SvelteKit POST request handler for the global realtime bus.
createRealtimeEndpoint({
inngest,
channels: {
[registryKey]: {
channel,
channelParams, // optional: string | (event, requestedChannelId, requestParams) => string | Promise<string>
authorize, // required
reauthorize, // optional
reauthorizeOnEachMessage, // optional channel override
},
},
reauthorizeOnEachMessage, // optional global default
healthCheck, // optional
onFailure, // optional
});Endpoint options
| Option | Required | Description |
| --- | --- | --- |
| inngest | Yes | Inngest client used for token generation and subscription |
| channels | Yes | Static channel registry used to resolve incoming payload.channel |
| reauthorizeOnEachMessage | No | Global default for per-message reauthorization (false by default) |
| healthCheck.enabled | No | Enable/disable interval health ticks (true default) |
| healthCheck.intervalMs | No | Health tick interval in ms (15000 default) |
| onFailure | No | Failure hook for request-time and stream-time errors |
Channel config options
| Option | Required | Description |
| --- | --- | --- |
| channel | Yes | Realtime.Channel or channel definition function |
| channelParams | No | Static string or resolver for channel builders |
| authorize | Yes | Initial request authorization callback |
| reauthorize | No | Optional per-message guard callback |
| reauthorizeOnEachMessage | No | Per-channel override of global reauth behavior |
authorize(context) result contract
true: allow all requested topicsfalse: deny request (403){ allowedTopics }: allow only subset of requested topics
authorize context
| Field | Description |
| --- | --- |
| event | Full SvelteKit RequestEvent |
| locals | event.locals |
| request | Raw Request |
| channelId | Resolved channel name |
| topics | Requested topic list |
| params | Sanitized request params (string \| number \| boolean \| null) |
reauthorize(context) additions
reauthorize receives the same fields as authorize, plus:
| Field | Description |
| --- | --- |
| messageTopic | Topic from the current realtime message |
| message | Raw message envelope |
onFailure(failure)
Called for request and stream failures.
- Return
voidto keep the original message. - Return
{ message }to override the client-facing message.
Override behavior:
- request failures return JSON
{ error: message } - stream failures use
health.detail = message
failure.stage is one of:
request-validationchannel-resolutiontopic-validationauthorizationreauthorizationstream
<RealtimeManager />
Props:
| Prop | Required | Description |
| --- | --- | --- |
| endpoint | No | Realtime endpoint URL. Default: "/api/events" |
| subscriptions | Yes | Array of RealtimeSubscription |
| onFailure | No | Client-side failure callback |
| children | No | Svelte snippet children |
RealtimeSubscription shape:
| Field | Required | Description |
| --- | --- | --- |
| channel | Yes | Realtime.Channel or channel definition |
| channelParams | No | String param for channel builders |
| topics | No | Explicit topic list (defaults to all channel topics) |
| params | No | Request params sent to endpoint |
Notes:
- Duplicate subscriptions resolving to the same
channelIdthrow immediately. - Topic lists are normalized (deduped and sorted) before connection signatures are computed.
Hook reference
getRealtimeBus()
Returns the full bus context from RealtimeManager.
Throws if called outside a RealtimeManager subtree.
getRealtimeBusState(channel, channelParams?)
Preferred client entrypoint for consuming realtime messages.
Returns channel-scoped context:
channelIdtopicsselect(eventName)fromsveltekit-ssehealthas Svelte 5 state wrapper (health.current)onMessage(topic, handler)for future-only typed topic listeners
Throws when the requested channel is not currently active.
onMessage(topic, handler):
- receives the full typed realtime envelope for the selected topic
- returns an unsubscribe function
- auto-cleans on component destroy when registered during component setup
- skips the retained initial store value, so it only fires for future messages
- logs thrown/rejected handler errors without affecting connection health
Use onMessage(...) by default for side effects, orchestration, and live event
handling.
Use getRealtimeBusTopicJson(...) when you want retained latest-value reads for
UI rendering.
getRealtimeBusTopicJson(channel, topic, options?)
Optional retained-value helper. Returns Readable<TOutput | null> for the latest
parsed message matching the selected topic.
options:
| Option | Description |
| --- | --- |
| channelParams | Channel builder param for composite channels |
| map(message) | Map envelope to custom output |
| or({ error, raw, previous }) | JSON parse fallback to recover/retain value |
getRealtimeBusTopicState(channel, topic, options?)
Deprecated: use onMessage(...). This will be removed in a future major release.
Legacy retained-value helper. Same behavior as getRealtimeBusTopicJson, but
wrapped in Svelte 5 state-first shape:
- read with
.current - type:
ReactiveCurrent<TOutput | null>
Request Payload
Realtime manager sends this JSON payload to your endpoint:
{
"channel": "demo",
"topics": ["message", "admin-message"],
"params": { "scope": "limited" }
}Rules:
channelis requiredtopicsis optional (defaults to all topics in the resolved channel)paramsvalues are normalized to primitives ornull
Runtime Behavior and Status Matrix
Request-time status codes
| Status | Stage | Meaning |
| --- | --- | --- |
| 400 | request-validation | Invalid JSON body or missing channel |
| 400 | channel-resolution | Requested channel is not available |
| 500 | channel-resolution | Multiple registry entries match requested channel |
| 400 | topic-validation | Unknown requested topics |
| 403 | authorization | authorize denied or filtered to zero topics |
| 500 | channel-resolution | Channel builder/params resolution failure |
Health stream semantics
Server emits health events:
connecting: stream bootstrap startedconnected: stream is livedegraded: failure occurred; stream closes fail-closed
When degraded, health.detail carries the resolved error message (including onFailure overrides).
Per-message reauthorization semantics
If reauthorizeOnEachMessage is enabled, each message is checked before emit:
- message must include a valid configured topic
- topic must be in originally authorized topic set
- if
reauthorizeexists, it must return exactlytrue - otherwise
authorizeis re-run withtopics: [message.topic]
Any failure degrades and closes the stream before the message is emitted.
Patterns
Composite channels
Composite channel entries are matched by resolved channel name.
const userChannel = realtime.channel({
name: (userId: string) => `user:${userId}`,
topics: {},
});
createRealtimeEndpoint({
inngest,
channels: {
user: {
channel: userChannel,
channelParams: (_event, requestedChannelId) => {
return requestedChannelId.slice("user:".length);
},
authorize: () => true,
},
},
});Nested RealtimeManager components
You can nest managers to scope subscriptions by subtree. Each manager creates and owns connections for its own subscriptions list.
Local Demo Setup
This repository includes a full demo app.
1. Configure env
cp example.env .envexample.env includes:
INNGEST_DEVINNGEST_BASE_URLINNGEST_EVENT_KEY- optional
INNGEST_SIGNING_KEY
2. Start local Inngest dev server
docker compose upThis uses /Users/whodges/dev/novadx/sveltekit-inngest/docker-compose.yaml and points Inngest to http://host.docker.internal:5173/api/inngest.
3. Run SvelteKit app
bun run devThen open:
http://localhost:5173/(main demo)http://localhost:5173/nested(nested manager demo)
Development and Release Commands
# type and svelte diagnostics
bun run check
# package-quality build + publint
bun run build
# release gate (check + build + npm pack dry run)
bun run release:verifyTest scripts are intentionally not included right now.
Troubleshooting
getRealtimeBus() requires <RealtimeManager>...
A hook is being called outside a RealtimeManager subtree.
Realtime channel "..." is not active
The requested channel (or channelParams) does not match any active subscription.
Request fails with 400 Requested channel is not available
payload.channeldoes not match any resolved registry channel namechannelParamsresolver produced unexpected channel name
Request fails with 500 Realtime channel registry is ambiguous
Multiple registry entries resolve to the same channel.name. Ensure one unique match per requested channel.
Stream degrades immediately
Check health.detail and server onFailure logs. Common causes:
- per-message reauthorization denied
- invalid emitted topic envelope
- thrown error inside auth callbacks
Client receives no topic updates
- ensure topic is included in subscription or allowed by auth filter
- verify published messages include matching
topic - verify channel IDs and
channelParamsare consistent between client and server
Breaking Changes (Bus-First Release)
This release is intentionally not backward compatible with older single-channel APIs.
Removed client APIs:
getRealtime()getRealtimeState()getRealtimeTopicJson(topic, ...)getRealtimeTopicState(topic, ...)
Removed server shape:
createRealtimeEndpoint({ channel, channelParams, authorize })
Migration note:
channelArgswas renamed tochannelParams(server + client). No alias is provided.
Use:
createRealtimeEndpoint({ channels: { ... } })<RealtimeManager subscriptions={...} />getRealtimeBus*hooks
License
MIT
