npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

use-resumable-stream

v0.3.3

Published

useChat, but resumable. Survives tab switch, refresh, and network drops.

Readme

use-resumable-stream

useChat, but resumable. Survives tab switch, refresh, and network drops.

npm version bundle size TypeScript

A React hook library for resumable SSE streams — designed for LLM generation, long-running tasks, and any streaming API that needs to survive tab switches, page refreshes, and network drops.

Why not Vercel AI SDK?

| Feature | Vercel AI SDK | use-resumable-stream | |---|:---:|:---:| | SSE streaming | ✅ | ✅ | | Resume after disconnect | ❌ | ✅ | | Survive tab switch | ❌ | ✅ | | Custom backend protocol | ❌ | ✅ | | Two-phase task ID (pending → real) | ❌ | ✅ | | Multi-task parallel | ❌ | ✅ | | Auto-resume list | ❌ | ✅ | | Vercel-only backend | Required | Not required |

Installation

npm install use-resumable-stream
# or
pnpm add use-resumable-stream

Quick Start

1. Wrap your app with StreamProvider

// app/layout.tsx (Next.js) or main.tsx (Vite)
import { StreamProvider } from 'use-resumable-stream';

export default function RootLayout({ children }) {
  return (
    <StreamProvider>
      {children}
    </StreamProvider>
  );
}

2. Define your transport and event reducer

import { createSseTransport } from 'use-resumable-stream';

// Define the shape of your streaming data
interface ChatData {
  content: string;
  reasoning: string;
}

// Create a transport that connects to your SSE endpoint
const startTransport = createSseTransport<{ prompt: string }>({
  url: '/api/chat/start',
  parseMessage: (raw) => {
    const msg = raw as { event_type: string; seq: number; data: unknown };
    switch (msg.event_type) {
      case 'META':
        return { type: 'META', realId: (msg.data as { work_id: string }).work_id, seq: msg.seq };
      case 'DELTA':
        return { type: 'DELTA', delta: (msg.data as { content: string }).content, seq: msg.seq };
      case 'SNAPSHOT':
        return { type: 'SNAPSHOT', snapshot: (msg.data as { content: string }).content, seq: msg.seq };
      case 'STOP':
        return { type: 'STOP', seq: msg.seq };
      case 'ERROR':
        return { type: 'ERROR', message: (msg.data as { message: string }).message, seq: msg.seq };
      case 'HEARTBEAT':
        return { type: 'HEARTBEAT' };
      default:
        return null;
    }
  },
});

const resumeTransport = createSseTransport<{ workId: string }>({
  url: '/api/chat/resume',
  buildBody: (body, resumeFromSeq) => ({
    work_id: body.workId,
    resume_from_seq: resumeFromSeq,
  }),
  parseMessage: startTransport.parseMessage, // reuse same parser
});

3. Use the hook in your component

import { useResumableStream } from 'use-resumable-stream';

function ChatView({ activeWorkId }: { activeWorkId: string | null }) {
  const { state, start, resume, cancel } = useResumableStream<ChatData, { prompt: string }>(
    activeWorkId,
    {
      startTransport,
      resumeTransport,
      initialData: () => ({ content: '', reasoning: '' }),

      // ⭐ The core: event reducer
      // Empty SNAPSHOT won't overwrite existing content (battle-tested guard)
      onEvent: (data, event) => {
        switch (event.type) {
          case 'DELTA':
            return { ...data, content: data.content + (event.delta as string) };
          case 'SNAPSHOT': {
            const snap = event.snapshot as string;
            return snap ? { ...data, content: snap } : data; // guard: empty snapshot won't overwrite
          }
          default:
            return data;
        }
      },

      // Keep connection alive when component unmounts (tab switch won't kill it)
      keepAliveOnUnmount: true,

      // Called when generation completes
      onCompleted: async (taskKey, realId) => {
        console.log('Generation complete:', realId);
        await refetchWorkList(); // refresh your list
      },
    },
  );

  const handleGenerate = () => {
    start({ prompt: 'Write a poem about the sea' });
  };

  return (
    <div>
      <button onClick={handleGenerate} disabled={state.status === 'streaming'}>
        Generate
      </button>
      <button onClick={cancel} disabled={state.status !== 'streaming'}>
        Stop
      </button>

      {state.status === 'connecting' && <p>Connecting...</p>}
      {state.status === 'streaming' && <p>Generating... (seq: {state.lastSeq})</p>}
      {state.status === 'failed' && <p>Error: {state.errorMessage}</p>}

      <div>{state.data.content}</div>
    </div>
  );
}

4. Auto-resume in-progress tasks on page load

import { AutoResumeList } from 'use-resumable-stream';

function WorkListPage() {
  const { data: workList } = useWorkList();

  return (
    <div>
      {/* Automatically resumes all in-progress tasks */}
      <AutoResumeList
        taskKeys={workList
          ?.filter(w => w.status === 'GENERATING')
          .map(w => String(w.work_id)) ?? []}
        resumeTransport={resumeTransport}
        initialData={() => ({ content: '', reasoning: '' })}
        onEvent={chatEventReducer}
        onCompleted={async () => {
          await refetchWorkList();
        }}
      />

      {workList?.map(work => (
        <WorkCard key={work.work_id} workId={String(work.work_id)} />
      ))}
    </div>
  );
}

// In WorkCard, subscribe to the stream state
function WorkCard({ workId }: { workId: string }) {
  const store = useStreamStore<ChatData>();
  const streamState = useSyncExternalStore(
    (listener) => store.subscribe(workId, listener),
    () => store.getState(workId),
  );

  if (streamState?.status === 'streaming') {
    return <div className="generating">{streamState.data.content}</div>;
  }
  // ...
}

API Reference

<StreamProvider>

Application-level provider. Must wrap your entire app.

⭐ Recommended pattern: Always create the store explicitly and pass it to both <StreamProvider> and createTaskRunner to ensure they share the same store instance.

// ✅ Recommended: explicit shared store
import { createStreamStore, StreamProvider } from 'use-resumable-stream';

const store = createStreamStore();

export default function RootLayout({ children }) {
  return (
    <StreamProvider store={store}>
      {children}
    </StreamProvider>
  );
}

// In your hook/component:
const runner = createTaskRunner(store, { resumeTransport, initialData, onEvent });

⚠️ Anti-pattern: dual store split

If you pass no store to <StreamProvider>, it creates its own internal store (storeB). If you also call createTaskRunner(myStore, ...) with a separately created myStore (storeA), AutoResumeList reads from storeB while your runner writes to storeA — they never see each other.

// ❌ Wrong: two independent stores
const myStore = createStreamStore();          // storeA
const runner = createTaskRunner(myStore, ...);

<StreamProvider>                              // creates storeB internally
  <AutoResumeList ... />                      // reads storeB → never sees runner's writes
</StreamProvider>
<StreamProvider store={optionalCustomStore}>
  {children}
</StreamProvider>

useResumableStream(activeTaskKey, options)

Core hook for managing a resumable stream.

Parameters:

  • activeTaskKey: TaskKey | null — The currently active task key to subscribe to
  • options: ResumableStreamOptions<TData, TStartBody, TResumeBody>

Options:

| Option | Type | Default | Description | |---|---|---|---| | startTransport | Transport | — | Transport for starting new tasks | | resumeTransport | Transport | — | Transport for resuming tasks | | initialData | () => TData | Required | Factory for initial data | | onEvent | (data, event) => data | Required | Event reducer | | keepAliveOnUnmount | boolean | true | Keep connection alive on unmount | | retryDelays | number[] | [1000, 2000, 4000] | Retry backoff delays (ms) | | onCompleted | (key, realId) => void | — | Called on STOP event | | onFailed | (key, message) => void | — | Called on ERROR event |

Returns:

| Property | Type | Description | |---|---|---| | state | StreamState<TData> | Current stream state | | start | (body, options?) => { pendingKey, realId } | Start a new task. Pass options.pendingKey to use a pre-generated key | | resume | (taskKey, fromSeq?) => void | Manually resume a task | | cancel | () => void | Cancel current connection | | getTaskState | (taskKey) => StreamState \| undefined | Read any task's state |

createSseTransport(options)

Creates an SSE transport using fetch + ReadableStream (supports POST).

const transport = createSseTransport({
  url: '/api/stream',
  parseMessage: (raw) => { /* return StreamEvent or null */ },
  buildBody: (body, resumeFromSeq) => ({ ...body, resume_from_seq: resumeFromSeq }),
  headers: { 'X-Custom-Header': 'value' },
  withCredentials: true, // default
});

<AutoResumeList>

Automatically resumes all in-progress tasks from a list.

<AutoResumeList
  taskKeys={['123', '456', '789']}
  resumeTransport={resumeTransport}
  initialData={() => initialState}
  onEvent={reducer}
  onCompleted={handleCompleted}
  store={myStore}  // optional: inject explicit store to avoid dual-store split
/>

| Prop | Type | Description | |---|---|---| | taskKeys | string[] | Task keys to resume (only real keys, not pending) | | resumeTransport | Transport | Transport for resuming | | initialData | () => TData | Initial data factory | | onEvent | (data, event) => data | Event reducer | | store | StreamStore | Optional explicit store. Pass the same store used by your TaskRunner to avoid dual-store split | | onCompleted | (key, realId) => void | Called on STOP | | onFailed | (key, message) => void | Called on ERROR | | retryDelays | number[] | Retry backoff delays |

Stream Event Types

type StreamEvent =
  | { type: 'META'; realId: string; seq: number }      // Server assigned real ID
  | { type: 'DELTA'; delta: unknown; seq: number }     // Incremental data
  | { type: 'SNAPSHOT'; snapshot: unknown; seq: number } // Full snapshot (overwrite)
  | { type: 'STOP'; realId?: string; seq: number }     // Natural end
  | { type: 'ERROR'; message: string; seq: number }    // Business error
  | { type: 'HEARTBEAT' };                             // Keepalive (no seq advance)

Stream Status

type StreamStatus =
  | 'idle'        // Not started
  | 'connecting'  // Establishing connection
  | 'streaming'   // Receiving data
  | 'completed'   // Finished (STOP event)
  | 'failed';     // Error (ERROR event or retries exhausted)

Architecture

┌─────────────────────────────────────────────────────────┐
│  Your React Components                                  │
│  useResumableStream() / AutoResumeList                  │
└──────────────────────┬──────────────────────────────────┘
                       │ useSyncExternalStore
┌──────────────────────▼──────────────────────────────────┐
│  StreamStore (singleton, app-level)                     │
│  Map<TaskKey, StreamState> + Listeners                  │
│  pendingKey → realKey alias mapping                     │
└──────────────────────┬──────────────────────────────────┘
                       │ read/write
┌──────────────────────▼──────────────────────────────────┐
│  TaskRunner                                             │
│  - Manages SSE connections                              │
│  - Exponential backoff retry                            │
│  - pendingKey → realKey migration                       │
│  - Deduplication (no double connections)                │
└──────────────────────┬──────────────────────────────────┘
                       │ connect()
┌──────────────────────▼──────────────────────────────────┐
│  Transport (pluggable)                                  │
│  createSseTransport() / custom WebSocket / HTTP polling │
└─────────────────────────────────────────────────────────┘

Battle-tested Guards

This library was extracted from a production LLM writing app. Every guard below was added after a real bug:

| Guard | Problem it solves | |---|---| | keepAliveOnUnmount: true | Tab switch kills connection → progress lost | | Empty SNAPSHOT check in onEvent | Resume returns empty snapshot → content wiped | | isPendingKey() check before network | Pending key sent to backend → 400 error | | Terminal state check before resume | Completed task re-subscribed → content overwritten | | Double terminal guard inside runResume | Race: STOP arrives after resume() guard passes → status reverts from completed to connecting | | useSyncExternalStore (not SWR cache) | Write to SWR cache, read from subscription → no update | | Alias map (pendingKey → realKey) | activeWorkId switches before realId arrives → blank screen |

License

MIT