npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@s2-dev/resumable-stream

v2.1.0

Published

Resumable streams based on s2.dev, inspired by Vercel's implementation.

Readme

Usage

To use this package, you need to create an S2 access token and basin to store all your streams.

  1. Sign up here, generate an access token and set it as S2_ACCESS_TOKEN in your env.

  2. Create a new basin from the Basins tab with the Create Stream on Append and Create Stream on Read option enabled, and set it as S2_BASIN in your env.

The incoming stream is batched and the batch size can be changed by setting S2_BATCH_SIZE. The maximum time to wait before flushing a batch can be tweaked by setting S2_LINGER_DURATION_MS to a duration in milliseconds.

The package exposes three entry points:

  • @s2-dev/resumable-stream: a generic ReadableStream<string> resumer (createResumableStreamContext). Use for plain text streams or anything that isn't AI SDK.
  • @s2-dev/resumable-stream/aisdk: a thin helper over UIMessageChunk streams for the AI SDK's useChat. See the AI SDK section below.
  • @s2-dev/resumable-stream/tanstack-ai: a thin helper over TanStack AI StreamChunk streams. See the TanStack AI section below.

Generic resumer

import { createResumableStreamContext } from "@s2-dev/resumable-stream";
import { after } from "next/server";

const streamContext = createResumableStreamContext({
  waitUntil: (promise) => {
    after(async () => {
      await promise;
    });
  },
});

export async function POST(req: NextRequest, { params }: { params: Promise<{ streamId: string }> }) {
  const { streamId } = await params;
  const inputStream = makeTestStream();
  const stream = await streamContext.resumableStream(
    streamId,
    () => inputStream,
  );
  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
    },
  });
}

export async function GET(req: NextRequest, { params }: { params: Promise<{ streamId: string }> }) {
  const { streamId } = await params;
  const stream = await streamContext.resumeStream(
    streamId,
  );
  if (!stream) {
    return new Response("Stream is already done", {
      status: 422,
    });
  }
  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
    },
  });
}

How it works

Creation

  1. The input stream is immediately duplicated into two identical streams.
  2. One stream is returned to the caller for immediate consumption and the other stream is processed asynchronously:
    • An initial fence command is appended to the S2 stream with a unique session token, claiming ownership of the stream to prevent any races. S2 streams are created on the first append (configured at the basin level).
    • Data is continuously batched and flushed as it is read from the duplicated input stream to the S2 stream when the batch is full or a timeout occurs.
    • When the input stream completes, a final fence command marking the stream as done is appended.

Resumption

  1. A caller requests to resume an existing stream by ID.
  2. A stream is returned that reads data from the S2 stream and processes it:
    • Data is read from the S2 stream from the beginning. S2 streams are also created on read (configured at the basin level) if a read happens before an append to prevent any races.
    • Data records are enqueued to the output stream controller for consumption.
    • If a sentinel fence command is encountered, the stream is closed.

AI SDK

The ./aisdk subpath makes AI SDK useChat streams resumable via S2. makeResumable tees the UIMessageChunk stream: one branch streams directly back to the client as SSE, the other is persisted to S2 for resumption. The wire format matches the AI SDK's createUIMessageStreamResponse, so the stock DefaultChatTransport works out of the box.

A runnable end-to-end demo (Bun server + vanilla-JS client): examples/ai-sdk-resumable-chat.

// lib/s2.ts
import { createResumableChat } from "@s2-dev/resumable-stream/aisdk";

export const chat = createResumableChat({
  accessToken: process.env.S2_ACCESS_TOKEN!,
  basin: process.env.S2_BASIN!,
});
// app/api/chat/route.ts
import { after } from "next/server";
import { convertToModelMessages, streamText, type UISnapshotMessage } from "ai";
import { openai } from "@ai-sdk/openai";
import { chat } from "@/lib/s2";

export async function POST(req: Request) {
  const { id, messages } = (await req.json()) as { id: string; messages: UISnapshotMessage[] };
  const streamName = `chat-${id}`;
  const result = streamText({
    model: openai("gpt-4o-mini"),
    messages: await convertToModelMessages(messages),
  });

  return chat.makeResumable(streamName, result.toUIMessageStream(), {
    waitUntil: (promise) => {
      after(async () => {
        await promise;
      });
    },
  });
}
// app/api/chat/[id]/stream/route.ts
import { chat } from "@/lib/s2";

export async function GET(
  _req: Request,
  { params }: { params: Promise<{ id: string }> },
) {
  const { id } = await params;
  return chat.replay(`chat-${id}`);
}
// app/page.tsx
"use client";
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";

const transport = new DefaultChatTransport({
  api: "/api/chat",
  // `DefaultChatTransport` defaults reconnect to `${api}/${chatId}/stream`;
  // override via `prepareReconnectToStreamRequest` if your route shape differs.
});

export default function Chat() {
  const chat = useChat({ transport, resume: true });
  return null;
}

TanStack AI

The ./tanstack-ai subpath makes TanStack AI chat streams durable as S2 stores and replays the stream chunks on reconnects.

For full chat apps, use mode: "session":

  • POST starts a generation and appends chunks to one durable S2 session log.
  • GET replays completed history and tails live chunks as SSE.
  • DELETE stops the active in-process generation; that generation writes its own RUN_FINISHED stop chunk while closing.

Configure the S2 basin to create streams on append/read. Streams are not created before replay or generation.

import {
  chat as tanstackChat,
  convertMessagesToModelMessages,
} from "@tanstack/ai";
import { openaiText } from "@tanstack/ai-openai";
import { createResumableChat } from "@s2-dev/resumable-stream/tanstack-ai";

const chat = createResumableChat({
  accessToken: process.env.S2_ACCESS_TOKEN!,
  basin: process.env.S2_BASIN!,
  mode: "session",
  enableStop: true,
});

export async function POST(req: Request) {
  const { id, messages } = await req.json();

  return chat.makeSessionResponse(`chat-${id}`, {
    messages,
    source: (messages, { abortController }) =>
      tanstackChat({
        adapter: openaiText("gpt-4o-mini"),
        messages: convertMessagesToModelMessages(messages),
        abortController,
      }),
    waitUntil,
  });
}

export async function GET(req: Request) {
  const url = new URL(req.url);
  return chat.replay(`chat-${url.searchParams.get("id")}`, {
    fromSeqNum: url.searchParams.has("from")
      ? Number(url.searchParams.get("from"))
      : undefined,
  });
}

export async function DELETE(req: Request) {
  const { id } = await req.json();
  return chat.stopSession(`chat-${id}`);
}

The ./tanstack-ai/client subpath exposes a useChat connection adapter for those endpoints:

import { useChat } from "@tanstack/ai-react";
import { createS2Connection } from "@s2-dev/resumable-stream/tanstack-ai/client";

const connection = createS2Connection({
  mode: "session",
  sendUrl: "/api/chat",
  stopUrl: "/api/chat",
  subscribeUrl: `/api/chat/replay?id=${chatId}`,
  body: { id: chatId },
});

const { messages, sendMessage, stop, isLoading, sessionGenerating } = useChat({
  connection,
  live: true,
});

const isGenerating = isLoading || sessionGenerating;

function stopGeneration() {
  stop();
  void connection.stop?.();
}

Notes:

  • makeSessionResponse passes the submitted TanStack messages to source. It stores stream events only: the latest user text event and model chunks.
  • Session starts read only the last record to claim the next turn. stopSession does not scan the stream i.e. it aborts the active local generation and lets the running persistence pipeline close the run.
  • Local stop tracking is opt-in. Set enableStop: true only when this server instance exposes stopSession; otherwise no active-generation map is created and stopSession returns 204.
  • Pass your platform's waitUntil when available. Without it, makeSessionResponse waits for persistence before returning so serverless runtimes do not abandon the stream after a 202 response.
  • First page load compacts completed history into an in-memory MESSAGES_SNAPSHOT SSE event. Snapshots are not stored on an S2 stream.
  • Replay frames carry the next S2 sequence number as the SSE id, so the client can reconnect from the last processed record.
  • stop() only cancels TanStack's local request. Call connection.stop?.() to stop server-side generation and persistence too. This is intentionally explicit so refresh/unmount does not stop a run.

For request-scoped streaming, keep the default mode: "single-use" and call makeResumable. Omit subscribeUrl on the client to stream chunks on the POST response; pass subscribeUrl only if the client should recover an active run on mount.

Runnable examples: