npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@arcote.tech/arc-chat

v0.7.26

Published

Chat module with AI integration for Arc framework

Readme

@arcote.tech/arc-chat

Chat fragment dla Arc — Conversation/Message aggregate + AI generation listener

  • SSE streaming + React component. Builder API: chat(name).identifyBy(...).ai(...).build().

Ten dokument tłumaczy jak chat działa. Nie powtarza tego, co jest w kodzie — opisuje mental model, którego trzeba się trzymać przy każdej modyfikacji, żeby nie zepsuć architektury.


Mental model

DB jest jedynym źródłem prawdy o strukturze konwersacji. Stream jest tylko overlayem — ulotnym podglądem trwającej generacji. Timeline jest czystą funkcją obu — nigdy mutowalnym stanem.

Serwer: w trakcie generacji LLM streamuje chunki do stream-registry (in-memory, per messageId). Klient subskrybuje SSE po messageId i dostaje:

  1. init — snapshot aktualnego currentBlocks w momencie podłączenia
  2. live text_delta / tool_call_* — kolejne chunki
  3. done — koniec turnu (advisory — patrz niżej)

Dopiero gdy provider.streamComplete() zwróci pełen wynik, listener wywołuje completeAssistantTurn({ blocks })jedyny zapis treści do DB w całej turze, atomowo z flipem isGenerating: false. Następnie finalize(messageId) zamyka stream i po 5 s grace okresie drop'uje go z mapy.

Klient: chat-component.tsx NIE merguje kanałów imperatywnie. Trzy elementy:

  • liveQuery getByScope — struktura: wiadomości, finalne blocks, isGenerating / interrupted / error, tool_results,
  • useAssistantOverlays — per generujący row utrzymuje SSE i buduje overlay { blocks, status } przez shared reducer (applyStreamEvent — dokładnie ten sam kod, którym serwer akumuluje currentBlocks),
  • deriveTimeline(history, overlays, optimistic...) — czysta funkcja poza komponentem. Row isGenerating + overlay → renderuj overlay; bez overlaya → placeholder; interrupted → retry; zamknięty → finalne blocks z DB. Wyniki tooli zawsze z DB.

Autorytatywny koniec turnu to flip isGenerating: false w DB (przychodzi liveQuery razem z finalnymi blocks, atomowo w jednym rzędzie). SSE done i error tylko zdejmują caret wcześniej. Dzięki temu kolejność dostarczenia (done przed/po flipie DB, zgubione done, martwy socket) nie ma znaczenia — derywacja zawsze liczy się od najnowszego stanu obu źródeł.

To NIE jest event-sourcing dla streamingu. Snapshoty częściowej treści do DB były anti-pattern (niepotrzebny narzut, dublowanie stanu). Stream-registry to autorytatywne źródło live wartości; DB to autorytatywne źródło stanu po zamknięciu turnu.


Komponenty

src/
├─ aggregates/message.ts          Aggregate: pola, eventy, mutacje
├─ ordering.ts                    Kanoniczna kolejność rzędów (klient + serwer)
├─ listeners/
│  └─ ai-generation-listener.ts   Generation loop + 3 listenery (gen/resume/retry)
├─ routes/chat-stream-route.ts    GET /chat/:name/stream/:messageId (SSE)
│                                 + lazy repair osieroconych rzędów przy 410
├─ streaming/
│  ├─ blocks-reducer.ts           SHARED reducer eventy→blocks (serwer + klient)
│  └─ stream-registry.ts          In-memory per-messageId MessageStream
├─ react/
│  ├─ derive-timeline.ts          Czysta derywacja: (DB, overlays) → timeline + busy
│  ├─ use-assistant-overlays.ts   SSE per generujący row → overlay map
│  └─ chat-component.tsx          Cienki komponent: liveQuery + hook + derive + render
├─ tools/ask-questions.tsx        Reusable interactive tool
└─ chat-builder.ts                chat().identifyBy(...).ai(...).build()

Flow end-to-end

USER wpisuje "Cześć", klika Send
    │
    ▼
Klient: pendingSends += optimistic user message (busy=true od razu)
sendMessage mutation (atomowo)
    ├─ emit assistantTurnStarted  → projection: set empty assistant row
    │                                (isGenerating=true, brak blocks)
    └─ emit messageSent           → projection: set user row
                                  → triggeruje aiGenerationListener (async)
    │
    ▼
liveQuery getByScope() pushuje obie wiadomości do klienta
    ├─ pendingSend settled → drop optimistic
    └─ generatingIds = [assistantMsgId] → useAssistantOverlays otwiera SSE
    │
    ▼
fetch /route/chat/:name/stream/:messageId
    ├─ subscribe(messageId) → init z currentBlocks snapshot
    │     └─ Brak streamu → 410 → retry ×4 z backoffem
    │           ├─ route: markInterrupted (lazy repair, jeśli row stary)
    │           └─ po wyczerpaniu → overlay.status="gone" → UI "Interrupted"+Retry
    ▼
Listener: startStream() → provider.streamComplete(onChunk)
    onChunk → publish(messageId, event)
        ├─ currentBlocks = applyStreamEvent(currentBlocks, event)   ← shared reducer
        └─ broadcast SSE do wszystkich subscribers
    Klient: overlay.blocks = applyStreamEvent(overlay.blocks, event) ← TEN SAM reducer
    deriveTimeline renderuje overlay (caret na ostatnim bloku)
    │
    ▼
streamComplete zwraca pełen result.blocks
    │
    ▼
completeAssistantTurn({ blocks, error? })  ← jedyny zapis treści do DB
    │                                         (atomowo: blocks + isGenerating=false)
    ▼
finalize(messageId, { usage, finishReason })
    ├─ broadcast done (advisory — zdejmuje caret)
    └─ setTimeout(delete, 5s) — grace dla late subscribers
    │
    ▼
liveQuery update: isGenerating=false + blocks w JEDNYM rzędzie
    ├─ overlay GC (hook)
    └─ deriveTimeline renderuje finalne blocks z DB
       (te same klucze itemów co overlay → zero remount-flasha)

Porządkowanie rzędów (ordering.ts)

Mutacje sendMessage / respondToTool / startStage / systemMessage emitują assistantTurnStarted przed rzędem triggerującym (wymóg async listenera), więc pre-utworzony placeholder asystenta ma createdAt wcześniejszy niż pytanie, na które odpowiada. Samo orderBy createdAt ustawiłoby odpowiedź przed pytaniem.

orderMessages() daje kanoniczną kolejność: createdAt → tie-break rolą (user/system → tool_result → assistant) → _id → fix-up przesuwający pierwszy assistant row sesji za jej trigger row. Używają go oba końce: deriveTimeline (klient) i buildHistory (serwer — historia dla LLM). Jeśli zmieniasz emit-order w mutacjach albo timestampy — zacznij od tego pliku.


Edge cases

Graceful reload mid-stream (F5)

Serwer i listener nadal generują. Klient po refresh:

  1. liveQuery zwraca assistant row z isGenerating=true
  2. generatingIds zawiera ten row → hook otwiera SSE
  3. subscribe(messageId) zwraca aktualny currentBlocks w init event
  4. deriveTimeline renderuje to, co już wygenerowane + kontynuuje live

Bez duplikacji — brak replay buffer'a chunków, jest jeden snapshot. Reconnect (visibility / heartbeat / BFCache) działa identycznie: zabij połączenie, otwórz nowe, init resetuje bazę overlaya.

Server restart mid-stream

Proces ginie z currentBlocks w pamięci → utrata. DB ma row isGenerating=true, ale subscribe(messageId) zwraca null → 410.

  1. Route przy 410 woła markInterrupted (lazy repair): jeśli row ma isGenerating=true i jest starszy niż 10 s → emit generationInterrupted → projection ustawia isGenerating=false, interrupted=true. Trwałe i cross-client — każdy klient (też po F5) widzi interrupted z DB.
  2. Równolegle klient po wyczerpaniu retry ustawia overlay.status="gone" — natychmiastowy lokalny stan, zanim repair przejdzie przez liveQuery.
  3. Klik Retry → retryGeneration({ messageId }) (akceptuje isGenerating ORAZ interrupted rows): emit assistantTurnStarted (fresh row) + retryRequested (projection usuwa interrupted row) → aiRetryListener odpala runGenerationLoop.

Błąd generacji

Error path listenera zapisuje error w rzędzie (projection assistantTurnCompleted persystuje pole). Derywacja renderuje go z DB — przeżywa F5 i reconnect. SSE error event jest tylko advisory.

Server tool call w środku tury

Po streamComplete z finishReason="tool_call":

  1. completeAssistantTurn(blocks) — assistant row finalizowany
  2. finalize(messageId) — stream zamknięty
  3. Każdy server tool: saveToolResult → tool_result row w DB
  4. Następna iteracja loop'a: startAssistantTurn → nowy row (isGenerating=true) → nowy messageId → nowy stream

Każda iteracja loop'a = osobny messageId = osobny stream. Input pozostaje disabled przez całą pętlę dzięki regule busy w derywacji (z DB, nie z lokalnego flagu): row isGenerating LUB ostatni assistant row ma server-tool call bez wyniku LUB ostatni row to świeży tool_result server-toola. Klauzule mają staleness cutoff (120 s) — gdy listener padł między zapisami, busy degraduje się do enabled zamiast wisieć wiecznie.

Interactive tool (np. askQuestions)

  1. completeAssistantTurn + finalize — pierwsza tura zamknięta
  2. Listener returns (loop break)
  3. deriveTimeline: tool bez wyniku + nie-server → status pending, hasWaitingInteractive=true → input override
  4. User odpowiada → optimistic pendingToolResults (answer-view OD RAZU)
    • respondToTool mutation (atomowo assistantTurnStarted + userResponded)
  5. tool_result row dociera liveQuery → optimistic entry GC, DB wygrywa
  6. aiResumeListener → kolejny turn streamuje — a answered tool pozostaje w answer-view, bo derywacja zawsze widzi resultMap z DB (nie ma guardu "w trakcie streamowania")

Stream-registry API

startStream(messageId)              // idempotent. Listener woła przed publish
publish(messageId, event)           // applyStreamEvent + broadcast SSE
subscribe(messageId): {             // route handler. null → 410
  stream, currentBlocks
} | null
finalize(messageId, finalDetails?)  // broadcast done, close, delete po 5s
isActive(messageId): boolean        // health check
getCurrentBlocks(messageId)         // debug/test, readonly

PublishableEvent to subset ChatStreamEvent bez init/done/messageIdinit/done emit'uje registry, messageId wstrzykuje się automatycznie.


Key invariants

Derywacja (klient):

  • Timeline jest CZYSTĄ funkcją (history, overlays, optimistic) → items. Nigdy nie pisz do timeline'u imperatywnie — każdy merge dwóch kanałów przez mutowalny stan + flagę trybu kończył się produkcyjnymi race'ami (watchdogi/backstopy/nonce w git log to historia tych prób).
  • SSE reader aktualizuje WYŁĄCZNIE overlay (nigdy timeline) i robi to wyłącznie przez applyStreamEvent.
  • Klucze itemów (${msgId}_t${n}, toolCallId) są IDENTYCZNE dla overlaya i finalnych blocks — przejście stream→DB nie remountuje elementów.
  • Stale overlay dla zamkniętego rowa jest ignorowany przez derywację (GC w hooku to optymalizacja, nie poprawność).

Live wartość:

  • currentBlocks w stream-registry jest jedynym źródłem prawdy dla treści in-progress assistanta; jedyna semantyka akumulacji to applyStreamEvent (blocks-reducer.ts) — zmiana TYLKO tam, inaczej klient i serwer się rozjadą
  • inwariant reconnectu: snapshot w punkcie K + replay od K == pełny replay (test w blocks-reducer.test.ts)
  • partialBlocks/partialLastSeq/sekwencery NIE ISTNIEJĄinit snapshot załatwia reconnect; jeśli pojawi się PR dodający je, odrzuć

DB:

  • Assistant row z isGenerating=true ma blocks=undefined
  • Po assistantTurnCompleted row ma isGenerating=false + blocks final
    • ewentualny erroratomowo, w jednym rzędzie (na tym stoi cała derywacja: klient nigdy nie zobaczy flipa bez finalnej treści)
  • Treść NIGDY nie ląduje w DB chunk po chunku
  • interrupted=true ustawia wyłącznie generationInterrupted (lazy repair w route przy 410, próg wieku 10 s)

Stream lifecycle:

  • startStream(messageId) PRZED pierwszym publish (listener gwarantuje, synchronicznie przed 1. awaitem)
  • finalize(messageId) PO completeAssistantTurn (DB → in-memory order)
  • Każda iteracja generation loop'a → osobny messageId → osobny stream

Subscribe:

  • Pierwszy event po subscribe() to ZAWSZE init
  • subscribe() zwraca null (→ 410) tylko gdy stream nie istnieje w mapie (poza grace window)

Gotchas dla modyfikacji

assistantTurnStarted emit'owany PRZED messageSent/userResponded/ retryRequested w jednej mutacji. Powód: async listener reaguje na to drugie i potrzebuje, żeby assistant row już istniał w DB. Konsekwencja: placeholder ma wcześniejszy createdAt niż trigger — dlatego istnieje fix-up w ordering.ts. Zmieniasz jedno → sprawdź drugie.

buildHistory w listenerze pomija assistant rows z isGenerating=true && !blocks. Czyli interrupted rows (przed retryRequested projection) oraz fresh rows w trakcie generacji nie trafiają do LLM history. Po retry fresh row też jest skip'owany — historia kończy się na ostatniej user message, LLM kontynuuje od niej.

Server-tool execution loop NIE używa stream-registry. Po finalize dla tury z tool_calls kolejne publish byłyby no-opem. Server tool results trafiają do klienta przez liveQuery (saveToolResult → tool_result row). To świadome — następna tura ma własny stream, a derywacja i tak czyta wyniki tooli z DB.

queueMicrotask w pętli readera SSE (use-assistant-overlays). NIE zamieniaj na setTimeout(0): Chrome throttluje timeouty w kartach w tle do ≥1 s — pętla zamienia się w 1-event/s freeze. Microtaski nie są throttlowane.

Brak retencji buforów eventów. Klient, który podłączy się 6 s po finalize, dostanie 410. Brak ?afterSeq, brak replay. Po zamknięciu turnu klient ma final blocks z DB i nie potrzebuje SSE; 410 dla świeżego rowa obsługuje retry z backoffem + lazy repair.

Pola tekstowe z JSON-em (blocks, content) mają DWA kształty. W świeżym in-memory store to stringi, ale adapter Postgresa auto-parsuje kolumny tekstowe wyglądające jak JSON (deserializeValue) — po hydracji store'a z bazy (restart serwera) te same pola przychodzą jako gotowe tablice/obiekty, również do klienta przez liveQuery. Każde miejsce czytające msg.blocks / msg.content musi tolerować oba kształty (parseBlocks w derive-timeline, buildHistory w listenerze). Regresja "po odświeżeniu znikają wiadomości asystenta" brała się dokładnie stąd.

Reguła busy ma staleness cutoff. Klauzule "tool bez wyniku" i "świeży tool_result" wygasają po 120 s braku aktywności w DB — celowo: lepszy przedwcześnie aktywny input (zachowanie sprzed redesignu) niż chat zablokowany na zawsze po crashu listenera.


Powiązane fragmenty

  • @arcote.tech/arc-ai — provider abstraction, StreamChunk, ChatStreamEvent, tool system, billing
  • @arcote.tech/arc-ai-{openai,claude,gemini} — implementacje providerów
  • @arcote.tech/arc-ds — DS components: Chat, ChatMessage, ChatInput, ChatToolLog, ChatLabels
  • @arcote.tech/arc-ai-voice — VoiceTextarea (dyktowanie głosowe out-of-the-box)