@s2-dev/resumable-stream
v2.1.0
Published
Resumable streams based on s2.dev, inspired by Vercel's implementation.
Readme
Usage
To use this package, you need to create an S2 access token and basin to store all your streams.
Sign up here, generate an access token and set it as
S2_ACCESS_TOKENin your env.Create a new basin from the
Basinstab with theCreate Stream on AppendandCreate Stream on Readoption enabled, and set it asS2_BASINin your env.
The incoming stream is batched and the batch size can be changed by setting S2_BATCH_SIZE. The maximum time to wait before flushing a batch can be tweaked by setting S2_LINGER_DURATION_MS to a duration in milliseconds.
The package exposes three entry points:
@s2-dev/resumable-stream: a genericReadableStream<string>resumer (createResumableStreamContext). Use for plain text streams or anything that isn't AI SDK.@s2-dev/resumable-stream/aisdk: a thin helper overUIMessageChunkstreams for the AI SDK'suseChat. See the AI SDK section below.@s2-dev/resumable-stream/tanstack-ai: a thin helper over TanStack AIStreamChunkstreams. See the TanStack AI section below.
Generic resumer
import { createResumableStreamContext } from "@s2-dev/resumable-stream";
import { after } from "next/server";
const streamContext = createResumableStreamContext({
waitUntil: (promise) => {
after(async () => {
await promise;
});
},
});
export async function POST(req: NextRequest, { params }: { params: Promise<{ streamId: string }> }) {
const { streamId } = await params;
const inputStream = makeTestStream();
const stream = await streamContext.resumableStream(
streamId,
() => inputStream,
);
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
},
});
}
export async function GET(req: NextRequest, { params }: { params: Promise<{ streamId: string }> }) {
const { streamId } = await params;
const stream = await streamContext.resumeStream(
streamId,
);
if (!stream) {
return new Response("Stream is already done", {
status: 422,
});
}
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
},
});
}How it works
Creation
- The input stream is immediately duplicated into two identical streams.
- One stream is returned to the caller for immediate consumption and the other stream is processed asynchronously:
- An initial fence command is appended to the S2 stream with a unique session token, claiming ownership of the stream to prevent any races. S2 streams are created on the first append (configured at the basin level).
- Data is continuously batched and flushed as it is read from the duplicated input stream to the S2 stream when the batch is full or a timeout occurs.
- When the input stream completes, a final fence command marking the stream as done is appended.
Resumption
- A caller requests to resume an existing stream by ID.
- A stream is returned that reads data from the S2 stream and processes it:
- Data is read from the S2 stream from the beginning. S2 streams are also created on read (configured at the basin level) if a read happens before an append to prevent any races.
- Data records are enqueued to the output stream controller for consumption.
- If a sentinel fence command is encountered, the stream is closed.
AI SDK
The ./aisdk subpath makes AI SDK useChat streams resumable via S2. makeResumable tees the UIMessageChunk stream: one branch streams directly back to the client as SSE, the other is persisted to S2 for resumption. The wire format matches the AI SDK's createUIMessageStreamResponse, so the stock DefaultChatTransport works out of the box.
A runnable end-to-end demo (Bun server + vanilla-JS client): examples/ai-sdk-resumable-chat.
// lib/s2.ts
import { createResumableChat } from "@s2-dev/resumable-stream/aisdk";
export const chat = createResumableChat({
accessToken: process.env.S2_ACCESS_TOKEN!,
basin: process.env.S2_BASIN!,
});// app/api/chat/route.ts
import { after } from "next/server";
import { convertToModelMessages, streamText, type UISnapshotMessage } from "ai";
import { openai } from "@ai-sdk/openai";
import { chat } from "@/lib/s2";
export async function POST(req: Request) {
const { id, messages } = (await req.json()) as { id: string; messages: UISnapshotMessage[] };
const streamName = `chat-${id}`;
const result = streamText({
model: openai("gpt-4o-mini"),
messages: await convertToModelMessages(messages),
});
return chat.makeResumable(streamName, result.toUIMessageStream(), {
waitUntil: (promise) => {
after(async () => {
await promise;
});
},
});
}// app/api/chat/[id]/stream/route.ts
import { chat } from "@/lib/s2";
export async function GET(
_req: Request,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
return chat.replay(`chat-${id}`);
}// app/page.tsx
"use client";
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
const transport = new DefaultChatTransport({
api: "/api/chat",
// `DefaultChatTransport` defaults reconnect to `${api}/${chatId}/stream`;
// override via `prepareReconnectToStreamRequest` if your route shape differs.
});
export default function Chat() {
const chat = useChat({ transport, resume: true });
return null;
}TanStack AI
The ./tanstack-ai subpath makes TanStack AI chat streams durable as S2 stores and replays the stream chunks
on reconnects.
For full chat apps, use mode: "session":
POSTstarts a generation and appends chunks to one durable S2 session log.GETreplays completed history and tails live chunks as SSE.DELETEstops the active in-process generation; that generation writes its ownRUN_FINISHEDstop chunk while closing.
Configure the S2 basin to create streams on append/read. Streams are not created before replay or generation.
import {
chat as tanstackChat,
convertMessagesToModelMessages,
} from "@tanstack/ai";
import { openaiText } from "@tanstack/ai-openai";
import { createResumableChat } from "@s2-dev/resumable-stream/tanstack-ai";
const chat = createResumableChat({
accessToken: process.env.S2_ACCESS_TOKEN!,
basin: process.env.S2_BASIN!,
mode: "session",
enableStop: true,
});
export async function POST(req: Request) {
const { id, messages } = await req.json();
return chat.makeSessionResponse(`chat-${id}`, {
messages,
source: (messages, { abortController }) =>
tanstackChat({
adapter: openaiText("gpt-4o-mini"),
messages: convertMessagesToModelMessages(messages),
abortController,
}),
waitUntil,
});
}
export async function GET(req: Request) {
const url = new URL(req.url);
return chat.replay(`chat-${url.searchParams.get("id")}`, {
fromSeqNum: url.searchParams.has("from")
? Number(url.searchParams.get("from"))
: undefined,
});
}
export async function DELETE(req: Request) {
const { id } = await req.json();
return chat.stopSession(`chat-${id}`);
}The ./tanstack-ai/client subpath exposes a useChat connection adapter for
those endpoints:
import { useChat } from "@tanstack/ai-react";
import { createS2Connection } from "@s2-dev/resumable-stream/tanstack-ai/client";
const connection = createS2Connection({
mode: "session",
sendUrl: "/api/chat",
stopUrl: "/api/chat",
subscribeUrl: `/api/chat/replay?id=${chatId}`,
body: { id: chatId },
});
const { messages, sendMessage, stop, isLoading, sessionGenerating } = useChat({
connection,
live: true,
});
const isGenerating = isLoading || sessionGenerating;
function stopGeneration() {
stop();
void connection.stop?.();
}Notes:
makeSessionResponsepasses the submitted TanStack messages tosource. It stores stream events only: the latest user text event and model chunks.- Session starts read only the last record to claim the next turn.
stopSessiondoes not scan the stream i.e. it aborts the active local generation and lets the running persistence pipeline close the run. - Local stop tracking is opt-in. Set
enableStop: trueonly when this server instance exposesstopSession; otherwise no active-generation map is created andstopSessionreturns 204. - Pass your platform's
waitUntilwhen available. Without it,makeSessionResponsewaits for persistence before returning so serverless runtimes do not abandon the stream after a 202 response. - First page load compacts completed history into an in-memory
MESSAGES_SNAPSHOTSSE event. Snapshots are not stored on an S2 stream. - Replay frames carry the next S2 sequence number as the SSE
id, so the client can reconnect from the last processed record. stop()only cancels TanStack's local request. Callconnection.stop?.()to stop server-side generation and persistence too. This is intentionally explicit so refresh/unmount does not stop a run.
For request-scoped streaming, keep the default mode: "single-use" and call
makeResumable. Omit subscribeUrl on the client to stream chunks on the POST
response; pass subscribeUrl only if the client should recover an active run on
mount.
Runnable examples:
examples/tanstack-ai-chat: small local S2 session chat.
