npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

sveltekit-inngest

v1.1.0

Published

Typed realtime subscriptions for SvelteKit using Inngest + SSE.

Readme

sveltekit-inngest

Typed realtime subscriptions for SvelteKit using Inngest + SSE.

This package is bus-first:

  • one global SSE endpoint
  • static server channel registry
  • multi-channel subscriptions in one RealtimeManager
  • per-channel topic authorization
  • optional per-message reauthorization (fail-closed)

The preferred client API is getRealtimeBusState(...).onMessage(...) for handling live realtime events. Retained topic-state helpers are still available when you want the latest value as Svelte 5 state (.current) instead of $store syntax.

Features

  • Typed topic payloads from inngest/realtime channel definitions
  • One global realtime endpoint with channel routing
  • Per-channel auth with topic filtering (allowedTopics)
  • Optional per-message reauthorization before each emit
  • Optional server and client failure callbacks
  • Preferred typed message handling with getRealtimeBusState(...).onMessage(...)
  • Optional retained-value reads with getRealtimeBusTopicJson() and deprecated getRealtimeBusTopicState()
  • Multi-subscription diffing in RealtimeManager
  • Built-in health events: connecting, connected, degraded

Requirements

Peer dependencies:

  • svelte (Svelte 5)
  • @sveltejs/kit
  • sveltekit-sse
  • inngest

Installation

bun add sveltekit-inngest
# or
pnpm add sveltekit-inngest
# or
npm install sveltekit-inngest

Quick Start (Install to Live Events)

1. Define channels and topics

// src/lib/realtime/channels.ts
import { realtime } from "inngest/realtime";
import { z } from "zod";

const messageSchema = z.object({
  message: z.string(),
});

export const demoChannel = realtime.channel({
  name: "demo",
  topics: {
    message: {
      schema: messageSchema,
    },
    "admin-message": {
      schema: messageSchema,
    },
  },
});

export const userChannel = realtime.channel({
  name: (userId: string) => `user:${userId}`,
  topics: {
    message: {
      schema: messageSchema,
    },
  },
});

2. Configure Inngest

// src/lib/server/inngest.ts
import { Inngest, eventType, staticSchema } from "inngest";

type DemoMessageEvent = {
  message: string;
};

export const demoMessageEvent = eventType("app/demo.message", {
  schema: staticSchema<DemoMessageEvent>(),
});

export const inngest = new Inngest({
  id: "my-app",
});

3. Publish to topics from an Inngest function

// src/lib/server/functions.ts
import { demoMessageEvent, inngest } from "$lib/server/inngest";
import { demoChannel } from "$lib/realtime/channels";

export const demoRealtime = inngest.createFunction(
  { id: "demo-realtime", triggers: [demoMessageEvent] },
  async ({ event, step }) => {
    await step.realtime.publish("publish message", demoChannel.message, {
      message: event.data.message,
    });
  }
);

4. Expose Inngest serve endpoint

// src/routes/api/inngest/+server.ts
import { serve } from "inngest/sveltekit";
import { inngest } from "$lib/server/inngest";
import { demoRealtime } from "$lib/server/functions";

export const { GET, POST, PUT } = serve({
  client: inngest,
  functions: [demoRealtime],
});

5. Create one global realtime endpoint

// src/routes/api/events/+server.ts
import { createRealtimeEndpoint } from "sveltekit-inngest/server";
import { demoChannel, userChannel } from "$lib/realtime/channels";
import { inngest } from "$lib/server/inngest";

export const POST = createRealtimeEndpoint({
  inngest,
  reauthorizeOnEachMessage: true,
  channels: {
    demo: {
      channel: demoChannel,
      authorize: ({ locals, topics }) => {
        if (!locals?.user) {
          throw new Error("Missing authenticated user");
        }

        if (locals.user.role === "admin") return true;

        return {
          allowedTopics: topics.filter((topic) => topic === "message"),
        };
      },
    },
    user: {
      channel: userChannel,
      channelParams: (_event, requestedChannelId) => {
        return requestedChannelId.startsWith("user:")
          ? requestedChannelId.slice("user:".length)
          : "";
      },
      authorize: () => true,
    },
  },
  onFailure: ({ stage, message }) => {
    console.error(`[realtime:${stage}]`, message);
    return { message };
  },
});

6. Provide subscriptions with RealtimeManager

<!-- src/routes/+page.svelte -->
<script lang="ts">
  import { RealtimeManager } from "sveltekit-inngest/client";
  import { demoChannel, userChannel } from "$lib/realtime/channels";
  import RealtimePanel from "./realtime-panel.svelte";

  const subscriptions = [
    { channel: demoChannel },
    { channel: userChannel, channelParams: "alice" },
  ];
</script>

<RealtimeManager endpoint="/api/events" {subscriptions}>
  <RealtimePanel />
</RealtimeManager>

7. Handle messages with onMessage() (preferred)

<!-- src/routes/realtime-panel.svelte -->
<script lang="ts">
  import {
    getRealtimeBusState,
    getRealtimeBusTopicState,
  } from "sveltekit-inngest/client";
  import { demoChannel } from "$lib/realtime/channels";

  const { health, onMessage } = getRealtimeBusState(demoChannel);
  let liveMessages = $state<string[]>([]);

  onMessage("message", async ({ data }) => {
    liveMessages = [data.message, ...liveMessages].slice(0, 5);
  });

  // Optional: keep the latest value around for display-oriented UI.
  const message = getRealtimeBusTopicState<typeof demoChannel, "message">(
    demoChannel,
    "message"
  );

  const adminMessage = getRealtimeBusTopicState<typeof demoChannel, "admin-message">(
    demoChannel,
    "admin-message"
  );
</script>

<p>Connection: {health.current?.status ?? "connecting"}</p>
<pre>{JSON.stringify(liveMessages, null, 2)}</pre>
<pre>{JSON.stringify(message.current, null, 2)}</pre>
<pre>{JSON.stringify(adminMessage.current, null, 2)}</pre>

Use onMessage(...) as the default way to react to incoming full topic envelopes. getRealtimeBusTopicState(...) is deprecated: use onMessage(...). This will be removed in a future major release. If you still need a retained latest-value read for rendering, prefer getRealtimeBusTopicJson(...).

8. Send an event

Send demoMessageEvent.create(...) to Inngest, for example:

await inngest.send(
  demoMessageEvent.create({
    message: "hello realtime",
  })
);

API Reference

Import paths

| Import path | Exports | | --- | --- | | sveltekit-inngest | Client exports (RealtimeManager, getRealtimeBus*, client types) | | sveltekit-inngest/client | Same as root client exports | | sveltekit-inngest/server | createRealtimeEndpoint and server types | | sveltekit-inngest/server/createRealtimeEndpoint | Direct endpoint factory and server types |

Client exports

Runtime exports:

  • RealtimeManager
  • getRealtimeBus()
  • getRealtimeBusState(channel, channelParams?)
  • onMessage(topic, handler) via getRealtimeBusState(...)
  • getRealtimeBusTopicJson(channel, topic, options?)
  • getRealtimeBusTopicState(channel, topic, options?)

Type exports include:

  • RealtimeManagerProps, RealtimeSubscription, RealtimeResolvedSubscription
  • RealtimeClientFailureContext, RealtimeClientFailureSource
  • HealthPayload, HealthStatus
  • TopicKey<TChannel>, TopicData<TChannel, TTopic>
  • RealtimeBusState<TChannel>, RealtimeTopicHandler<TChannel, TTopic>, RealtimeUnsubscribe
  • RealtimeTopicEnvelope, RealtimeTopicMessage
  • RealtimeRequestParams, ReactiveCurrent, RealtimeTopicState, RealtimeHealthState

Server exports

Runtime export:

  • createRealtimeEndpoint(options)

Type exports include:

  • RealtimeEndpointOptions
  • RealtimeChannelConfig
  • RealtimeAuthorizeContext
  • RealtimeReauthorizeContext
  • RealtimeHealthCheckOptions
  • RealtimeServerFailureContext
  • RealtimeServerFailureStage

createRealtimeEndpoint(options)

Creates a SvelteKit POST request handler for the global realtime bus.

createRealtimeEndpoint({
  inngest,
  channels: {
    [registryKey]: {
      channel,
      channelParams, // optional: string | (event, requestedChannelId, requestParams) => string | Promise<string>
      authorize, // required
      reauthorize, // optional
      reauthorizeOnEachMessage, // optional channel override
    },
  },
  reauthorizeOnEachMessage, // optional global default
  healthCheck, // optional
  onFailure, // optional
});

Endpoint options

| Option | Required | Description | | --- | --- | --- | | inngest | Yes | Inngest client used for token generation and subscription | | channels | Yes | Static channel registry used to resolve incoming payload.channel | | reauthorizeOnEachMessage | No | Global default for per-message reauthorization (false by default) | | healthCheck.enabled | No | Enable/disable interval health ticks (true default) | | healthCheck.intervalMs | No | Health tick interval in ms (15000 default) | | onFailure | No | Failure hook for request-time and stream-time errors |

Channel config options

| Option | Required | Description | | --- | --- | --- | | channel | Yes | Realtime.Channel or channel definition function | | channelParams | No | Static string or resolver for channel builders | | authorize | Yes | Initial request authorization callback | | reauthorize | No | Optional per-message guard callback | | reauthorizeOnEachMessage | No | Per-channel override of global reauth behavior |

authorize(context) result contract

  • true: allow all requested topics
  • false: deny request (403)
  • { allowedTopics }: allow only subset of requested topics

authorize context

| Field | Description | | --- | --- | | event | Full SvelteKit RequestEvent | | locals | event.locals | | request | Raw Request | | channelId | Resolved channel name | | topics | Requested topic list | | params | Sanitized request params (string \| number \| boolean \| null) |

reauthorize(context) additions

reauthorize receives the same fields as authorize, plus:

| Field | Description | | --- | --- | | messageTopic | Topic from the current realtime message | | message | Raw message envelope |

onFailure(failure)

Called for request and stream failures.

  • Return void to keep the original message.
  • Return { message } to override the client-facing message.

Override behavior:

  • request failures return JSON { error: message }
  • stream failures use health.detail = message

failure.stage is one of:

  • request-validation
  • channel-resolution
  • topic-validation
  • authorization
  • reauthorization
  • stream

<RealtimeManager />

Props:

| Prop | Required | Description | | --- | --- | --- | | endpoint | No | Realtime endpoint URL. Default: "/api/events" | | subscriptions | Yes | Array of RealtimeSubscription | | onFailure | No | Client-side failure callback | | children | No | Svelte snippet children |

RealtimeSubscription shape:

| Field | Required | Description | | --- | --- | --- | | channel | Yes | Realtime.Channel or channel definition | | channelParams | No | String param for channel builders | | topics | No | Explicit topic list (defaults to all channel topics) | | params | No | Request params sent to endpoint |

Notes:

  • Duplicate subscriptions resolving to the same channelId throw immediately.
  • Topic lists are normalized (deduped and sorted) before connection signatures are computed.

Hook reference

getRealtimeBus()

Returns the full bus context from RealtimeManager.

Throws if called outside a RealtimeManager subtree.

getRealtimeBusState(channel, channelParams?)

Preferred client entrypoint for consuming realtime messages.

Returns channel-scoped context:

  • channelId
  • topics
  • select(eventName) from sveltekit-sse
  • health as Svelte 5 state wrapper (health.current)
  • onMessage(topic, handler) for future-only typed topic listeners

Throws when the requested channel is not currently active.

onMessage(topic, handler):

  • receives the full typed realtime envelope for the selected topic
  • returns an unsubscribe function
  • auto-cleans on component destroy when registered during component setup
  • skips the retained initial store value, so it only fires for future messages
  • logs thrown/rejected handler errors without affecting connection health

Use onMessage(...) by default for side effects, orchestration, and live event handling.

Use getRealtimeBusTopicJson(...) when you want retained latest-value reads for UI rendering.

getRealtimeBusTopicJson(channel, topic, options?)

Optional retained-value helper. Returns Readable<TOutput | null> for the latest parsed message matching the selected topic.

options:

| Option | Description | | --- | --- | | channelParams | Channel builder param for composite channels | | map(message) | Map envelope to custom output | | or({ error, raw, previous }) | JSON parse fallback to recover/retain value |

getRealtimeBusTopicState(channel, topic, options?)

Deprecated: use onMessage(...). This will be removed in a future major release.

Legacy retained-value helper. Same behavior as getRealtimeBusTopicJson, but wrapped in Svelte 5 state-first shape:

  • read with .current
  • type: ReactiveCurrent<TOutput | null>

Request Payload

Realtime manager sends this JSON payload to your endpoint:

{
  "channel": "demo",
  "topics": ["message", "admin-message"],
  "params": { "scope": "limited" }
}

Rules:

  • channel is required
  • topics is optional (defaults to all topics in the resolved channel)
  • params values are normalized to primitives or null

Runtime Behavior and Status Matrix

Request-time status codes

| Status | Stage | Meaning | | --- | --- | --- | | 400 | request-validation | Invalid JSON body or missing channel | | 400 | channel-resolution | Requested channel is not available | | 500 | channel-resolution | Multiple registry entries match requested channel | | 400 | topic-validation | Unknown requested topics | | 403 | authorization | authorize denied or filtered to zero topics | | 500 | channel-resolution | Channel builder/params resolution failure |

Health stream semantics

Server emits health events:

  • connecting: stream bootstrap started
  • connected: stream is live
  • degraded: failure occurred; stream closes fail-closed

When degraded, health.detail carries the resolved error message (including onFailure overrides).

Per-message reauthorization semantics

If reauthorizeOnEachMessage is enabled, each message is checked before emit:

  1. message must include a valid configured topic
  2. topic must be in originally authorized topic set
  3. if reauthorize exists, it must return exactly true
  4. otherwise authorize is re-run with topics: [message.topic]

Any failure degrades and closes the stream before the message is emitted.

Patterns

Composite channels

Composite channel entries are matched by resolved channel name.

const userChannel = realtime.channel({
  name: (userId: string) => `user:${userId}`,
  topics: {},
});

createRealtimeEndpoint({
  inngest,
  channels: {
    user: {
      channel: userChannel,
      channelParams: (_event, requestedChannelId) => {
        return requestedChannelId.slice("user:".length);
      },
      authorize: () => true,
    },
  },
});

Nested RealtimeManager components

You can nest managers to scope subscriptions by subtree. Each manager creates and owns connections for its own subscriptions list.

Local Demo Setup

This repository includes a full demo app.

1. Configure env

cp example.env .env

example.env includes:

  • INNGEST_DEV
  • INNGEST_BASE_URL
  • INNGEST_EVENT_KEY
  • optional INNGEST_SIGNING_KEY

2. Start local Inngest dev server

docker compose up

This uses /Users/whodges/dev/novadx/sveltekit-inngest/docker-compose.yaml and points Inngest to http://host.docker.internal:5173/api/inngest.

3. Run SvelteKit app

bun run dev

Then open:

  • http://localhost:5173/ (main demo)
  • http://localhost:5173/nested (nested manager demo)

Development and Release Commands

# type and svelte diagnostics
bun run check

# package-quality build + publint
bun run build

# release gate (check + build + npm pack dry run)
bun run release:verify

Test scripts are intentionally not included right now.

Troubleshooting

getRealtimeBus() requires <RealtimeManager>...

A hook is being called outside a RealtimeManager subtree.

Realtime channel "..." is not active

The requested channel (or channelParams) does not match any active subscription.

Request fails with 400 Requested channel is not available

  • payload.channel does not match any resolved registry channel name
  • channelParams resolver produced unexpected channel name

Request fails with 500 Realtime channel registry is ambiguous

Multiple registry entries resolve to the same channel.name. Ensure one unique match per requested channel.

Stream degrades immediately

Check health.detail and server onFailure logs. Common causes:

  • per-message reauthorization denied
  • invalid emitted topic envelope
  • thrown error inside auth callbacks

Client receives no topic updates

  • ensure topic is included in subscription or allowed by auth filter
  • verify published messages include matching topic
  • verify channel IDs and channelParams are consistent between client and server

Breaking Changes (Bus-First Release)

This release is intentionally not backward compatible with older single-channel APIs.

Removed client APIs:

  • getRealtime()
  • getRealtimeState()
  • getRealtimeTopicJson(topic, ...)
  • getRealtimeTopicState(topic, ...)

Removed server shape:

  • createRealtimeEndpoint({ channel, channelParams, authorize })

Migration note:

  • channelArgs was renamed to channelParams (server + client). No alias is provided.

Use:

  • createRealtimeEndpoint({ channels: { ... } })
  • <RealtimeManager subscriptions={...} />
  • getRealtimeBus* hooks

License

MIT