npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@durable-streams/tanstack-ai-transport

v0.0.2

Published

TanStack AI adapters for Durable Streams

Readme

@durable-streams/tanstack-ai-transport

TanStack AI adapters for Durable Streams.

What this package does

This package gives you:

  • A TanStack-compatible client connection (subscribe + send)
  • Server helpers to write TanStack chunks into Durable Streams
  • Chat-session helpers for echoing user prompts and enforcing JSON stream format
  • Snapshot materialization helpers for SSR hydration + resume offsets

Use this when you want chat sessions that survive refreshes/reconnects and can be shared across multiple clients.

Install

pnpm add @durable-streams/tanstack-ai-transport @durable-streams/client

Quick start

Client: create a durable session connection

import { durableStreamConnection } from "@durable-streams/tanstack-ai-transport"

const connection = durableStreamConnection({
  sendUrl: "/api/chat?id=chat_123",
  readUrl: "/api/chat-stream?id=chat_123",
  initialOffset: undefined,
})

Pass this connection to useChat from @tanstack/ai-react.

Server: write model chunks to durable stream

import { toDurableChatSessionResponse } from "@durable-streams/tanstack-ai-transport"

return toDurableChatSessionResponse({
  stream: {
    writeUrl,
    headers,
  },
  newMessages: [latestUserMessage],
  responseStream, // AsyncIterable<TanStackChunk>
})

This appends the new user message to the durable stream, pipes AI response chunks, and returns an empty success response.

Integration guide (recommended chat session flow)

1) Client connection

const connection = durableStreamConnection({
  sendUrl: `/api/chat?id=${chatId}`,
  readUrl: `/api/chat-stream?id=${encodeURIComponent(chatId)}`,
  initialOffset: resumeOffsetFromSSR,
})

Use with useChat({ id: chatId, connection, live: true }).

2) POST route (/api/chat)

  • Validate chat id
  • Build durable stream write URL
  • Keep newMessages explicit (usually latest user message)
  • Start your model stream (responseStream)
  • Return toDurableChatSessionResponse(...)

3) GET proxy route (/api/chat-stream)

  • Accept a chat id query param
  • Build upstream durable read URL on the server from that id
  • Forward query params like offset, live, etc.
  • Add durable read auth headers on the server
  • Return upstream body/headers to client

4) SSR hydrate + resume

For chat page loaders:

  • materializeSnapshotFromDurableStream({ readUrl, headers })
  • send messages + offset to client
  • use that offset as initialOffset on durableStreamConnection

This avoids replaying entire history on first subscribe.

API reference

Client APIs

durableStreamConnection(options)

Creates a TanStack-compatible connection object with:

  • subscribe(abortSignal?) => AsyncIterable<TanStackChunk>
  • send(messages, data?, abortSignal?) => Promise<void>

DurableStreamConnectionOptions:

  • sendUrl: string (required) - where send(...) POSTs
  • readUrl?: string - where subscribe(...) reads from (defaults to sendUrl)
  • initialOffset?: string - initial durable offset for resuming
  • emitSnapshotOnSubscribe?: boolean (default true) - emit synthetic MESSAGES_SNAPSHOT on initial catch-up
  • headers?: HeadersInit - applied to both read and write requests
  • fetchClient?: typeof fetch - custom fetch implementation

Behavior:

  • send(...) POSTs { messages, data } to sendUrl
  • subscribe(...) reads durable JSON stream batches and yields TanStack chunks
  • internal offset is updated as batches arrive, so later subscribes continue from the latest seen offset

materializeSnapshotFromDurableStream(options)

Reads a non-live durable stream and materializes TanStack message state.

Input:

  • readUrl: string
  • headers?: HeadersInit
  • offset?: string

Returns:

  • { messages: Array<any>; offset?: string }

sanitizeChunkForStorage(chunk)

Removes duplicated content on TEXT_MESSAGE_CONTENT chunks (keeps delta) to reduce stored payload size.

Server APIs

toDurableChatSessionResponse(options)

High-level helper for chat session writes.

ToDurableChatSessionResponseOptions:

  • stream: DurableChatSessionStreamTarget (writeUrl, headers, createIfMissing)
  • newMessages: DurableSessionMessage[] - explicitly appended prompt messages
  • responseStream: AsyncIterable<TanStackChunk> - model chunk source
  • mode?: "immediate" | "await" (default immediate)
  • waitUntil?: (promise: Promise<unknown>) => void

Notes:

  • chat session streams are always application/json
  • appends newMessages using toMessageEchoChunks(...)
  • sanitizes chunks before writing

toDurableStreamResponse(source, options)

Lower-level generic writer for arbitrary async chunk sources.

ToDurableStreamResponseOptions:

  • stream: DurableStreamTarget
  • mode?: "immediate" | "await"
  • waitUntil?: (promise: Promise<unknown>) => void
  • exposeLocationHeader?: boolean

ensureDurableChatSessionStream(streamTarget)

Ensures a durable stream exists for chat-session usage, enforcing application/json.

toMessageEchoChunks(message)

Converts a user/system/assistant message into:

  • TEXT_MESSAGE_START
  • TEXT_MESSAGE_CONTENT (if text exists)
  • TEXT_MESSAGE_END

This preserves explicit message IDs when provided.

appendSanitizedChunksToStream(stream, chunks, contentType?)

Appends chunk array with sanitizeChunkForStorage applied.

pipeSanitizedChunksToStream(source, stream, contentType?)

Pipes async chunk source with sanitizeChunkForStorage applied.

Shared types

  • TanStackChunk
  • DurableSessionConnection
  • DurableStreamConnection (alias)
  • DurableStreamConnectionOptions
  • DurableChatSessionStreamTarget
  • DurableStreamTarget
  • ToDurableStreamResponseMode
  • ToDurableStreamResponseOptions
  • ToDurableChatSessionResponseOptions
  • DurableSessionMessage
  • DurableSessionMessagePart
  • WaitUntil

Response contract

toDurableStreamResponse(..., { mode: "immediate" })

  • Status: 201
  • Header: Location: <read-url>
  • Body: { "streamUrl": "<read-url>" }

toDurableStreamResponse(..., { mode: "await" })

  • Status: 200
  • Header: Location: <read-url>
  • Body: { "streamUrl": "<read-url>", "finalOffset": "<offset>" }

toDurableChatSessionResponse(..., { mode: "immediate" })

  • Status: 202
  • Body: empty

toDurableChatSessionResponse(..., { mode: "await" })

  • Status: 200
  • Body: empty

waitUntil usage

return toDurableChatSessionResponse({
  stream: { writeUrl, headers },
  newMessages,
  responseStream,
  waitUntil: ctx.waitUntil.bind(ctx),
})

Use waitUntil in worker-style runtimes so background writes continue after the pointer response is returned.