@arcote.tech/arc-chat
v0.7.26
Published
Chat module with AI integration for Arc framework
Readme
@arcote.tech/arc-chat
Chat fragment dla Arc — Conversation/Message aggregate + AI generation listener
- SSE streaming + React component. Builder API:
chat(name).identifyBy(...).ai(...).build().
Ten dokument tłumaczy jak chat działa. Nie powtarza tego, co jest w kodzie — opisuje mental model, którego trzeba się trzymać przy każdej modyfikacji, żeby nie zepsuć architektury.
Mental model
DB jest jedynym źródłem prawdy o strukturze konwersacji. Stream jest tylko overlayem — ulotnym podglądem trwającej generacji. Timeline jest czystą funkcją obu — nigdy mutowalnym stanem.
Serwer: w trakcie generacji LLM streamuje chunki do stream-registry
(in-memory, per messageId). Klient subskrybuje SSE po messageId i dostaje:
init— snapshot aktualnegocurrentBlocksw momencie podłączenia- live
text_delta/tool_call_*— kolejne chunki done— koniec turnu (advisory — patrz niżej)
Dopiero gdy provider.streamComplete() zwróci pełen wynik, listener wywołuje
completeAssistantTurn({ blocks }) — jedyny zapis treści do DB w całej
turze, atomowo z flipem isGenerating: false. Następnie finalize(messageId)
zamyka stream i po 5 s grace okresie drop'uje go z mapy.
Klient: chat-component.tsx NIE merguje kanałów imperatywnie. Trzy elementy:
- liveQuery
getByScope— struktura: wiadomości, finalne blocks,isGenerating/interrupted/error, tool_results, useAssistantOverlays— per generujący row utrzymuje SSE i budujeoverlay { blocks, status }przez shared reducer (applyStreamEvent— dokładnie ten sam kod, którym serwer akumulujecurrentBlocks),deriveTimeline(history, overlays, optimistic...)— czysta funkcja poza komponentem. RowisGenerating+ overlay → renderuj overlay; bez overlaya → placeholder;interrupted→ retry; zamknięty → finalne blocks z DB. Wyniki tooli zawsze z DB.
Autorytatywny koniec turnu to flip isGenerating: false w DB (przychodzi
liveQuery razem z finalnymi blocks, atomowo w jednym rzędzie). SSE done i
error tylko zdejmują caret wcześniej. Dzięki temu kolejność dostarczenia
(done przed/po flipie DB, zgubione done, martwy socket) nie ma znaczenia —
derywacja zawsze liczy się od najnowszego stanu obu źródeł.
To NIE jest event-sourcing dla streamingu. Snapshoty częściowej treści do DB były anti-pattern (niepotrzebny narzut, dublowanie stanu). Stream-registry to autorytatywne źródło live wartości; DB to autorytatywne źródło stanu po zamknięciu turnu.
Komponenty
src/
├─ aggregates/message.ts Aggregate: pola, eventy, mutacje
├─ ordering.ts Kanoniczna kolejność rzędów (klient + serwer)
├─ listeners/
│ └─ ai-generation-listener.ts Generation loop + 3 listenery (gen/resume/retry)
├─ routes/chat-stream-route.ts GET /chat/:name/stream/:messageId (SSE)
│ + lazy repair osieroconych rzędów przy 410
├─ streaming/
│ ├─ blocks-reducer.ts SHARED reducer eventy→blocks (serwer + klient)
│ └─ stream-registry.ts In-memory per-messageId MessageStream
├─ react/
│ ├─ derive-timeline.ts Czysta derywacja: (DB, overlays) → timeline + busy
│ ├─ use-assistant-overlays.ts SSE per generujący row → overlay map
│ └─ chat-component.tsx Cienki komponent: liveQuery + hook + derive + render
├─ tools/ask-questions.tsx Reusable interactive tool
└─ chat-builder.ts chat().identifyBy(...).ai(...).build()Flow end-to-end
USER wpisuje "Cześć", klika Send
│
▼
Klient: pendingSends += optimistic user message (busy=true od razu)
sendMessage mutation (atomowo)
├─ emit assistantTurnStarted → projection: set empty assistant row
│ (isGenerating=true, brak blocks)
└─ emit messageSent → projection: set user row
→ triggeruje aiGenerationListener (async)
│
▼
liveQuery getByScope() pushuje obie wiadomości do klienta
├─ pendingSend settled → drop optimistic
└─ generatingIds = [assistantMsgId] → useAssistantOverlays otwiera SSE
│
▼
fetch /route/chat/:name/stream/:messageId
├─ subscribe(messageId) → init z currentBlocks snapshot
│ └─ Brak streamu → 410 → retry ×4 z backoffem
│ ├─ route: markInterrupted (lazy repair, jeśli row stary)
│ └─ po wyczerpaniu → overlay.status="gone" → UI "Interrupted"+Retry
▼
Listener: startStream() → provider.streamComplete(onChunk)
onChunk → publish(messageId, event)
├─ currentBlocks = applyStreamEvent(currentBlocks, event) ← shared reducer
└─ broadcast SSE do wszystkich subscribers
Klient: overlay.blocks = applyStreamEvent(overlay.blocks, event) ← TEN SAM reducer
deriveTimeline renderuje overlay (caret na ostatnim bloku)
│
▼
streamComplete zwraca pełen result.blocks
│
▼
completeAssistantTurn({ blocks, error? }) ← jedyny zapis treści do DB
│ (atomowo: blocks + isGenerating=false)
▼
finalize(messageId, { usage, finishReason })
├─ broadcast done (advisory — zdejmuje caret)
└─ setTimeout(delete, 5s) — grace dla late subscribers
│
▼
liveQuery update: isGenerating=false + blocks w JEDNYM rzędzie
├─ overlay GC (hook)
└─ deriveTimeline renderuje finalne blocks z DB
(te same klucze itemów co overlay → zero remount-flasha)Porządkowanie rzędów (ordering.ts)
Mutacje sendMessage / respondToTool / startStage / systemMessage
emitują assistantTurnStarted przed rzędem triggerującym (wymóg
async listenera), więc pre-utworzony placeholder asystenta ma createdAt
wcześniejszy niż pytanie, na które odpowiada. Samo orderBy createdAt
ustawiłoby odpowiedź przed pytaniem.
orderMessages() daje kanoniczną kolejność: createdAt → tie-break rolą
(user/system → tool_result → assistant) → _id → fix-up przesuwający
pierwszy assistant row sesji za jej trigger row. Używają go oba końce:
deriveTimeline (klient) i buildHistory (serwer — historia dla LLM).
Jeśli zmieniasz emit-order w mutacjach albo timestampy — zacznij od tego pliku.
Edge cases
Graceful reload mid-stream (F5)
Serwer i listener nadal generują. Klient po refresh:
- liveQuery zwraca assistant row z
isGenerating=true generatingIdszawiera ten row → hook otwiera SSEsubscribe(messageId)zwraca aktualnycurrentBlockswinitevent- deriveTimeline renderuje to, co już wygenerowane + kontynuuje live
Bez duplikacji — brak replay buffer'a chunków, jest jeden snapshot.
Reconnect (visibility / heartbeat / BFCache) działa identycznie: zabij
połączenie, otwórz nowe, init resetuje bazę overlaya.
Server restart mid-stream
Proces ginie z currentBlocks w pamięci → utrata. DB ma row
isGenerating=true, ale subscribe(messageId) zwraca null → 410.
- Route przy 410 woła
markInterrupted(lazy repair): jeśli row maisGenerating=truei jest starszy niż 10 s → emitgenerationInterrupted→ projection ustawiaisGenerating=false, interrupted=true. Trwałe i cross-client — każdy klient (też po F5) widzi interrupted z DB. - Równolegle klient po wyczerpaniu retry ustawia
overlay.status="gone"— natychmiastowy lokalny stan, zanim repair przejdzie przez liveQuery. - Klik Retry →
retryGeneration({ messageId })(akceptujeisGeneratingORAZinterruptedrows): emitassistantTurnStarted(fresh row) +retryRequested(projection usuwa interrupted row) →aiRetryListenerodpalarunGenerationLoop.
Błąd generacji
Error path listenera zapisuje error w rzędzie (projection
assistantTurnCompleted persystuje pole). Derywacja renderuje go z DB —
przeżywa F5 i reconnect. SSE error event jest tylko advisory.
Server tool call w środku tury
Po streamComplete z finishReason="tool_call":
completeAssistantTurn(blocks)— assistant row finalizowanyfinalize(messageId)— stream zamknięty- Każdy server tool:
saveToolResult→ tool_result row w DB - Następna iteracja loop'a:
startAssistantTurn→ nowy row (isGenerating=true) → nowymessageId→ nowy stream
Każda iteracja loop'a = osobny messageId = osobny stream. Input
pozostaje disabled przez całą pętlę dzięki regule busy w derywacji
(z DB, nie z lokalnego flagu): row isGenerating LUB ostatni assistant
row ma server-tool call bez wyniku LUB ostatni row to świeży tool_result
server-toola. Klauzule mają staleness cutoff (120 s) — gdy listener padł
między zapisami, busy degraduje się do enabled zamiast wisieć wiecznie.
Interactive tool (np. askQuestions)
completeAssistantTurn+finalize— pierwsza tura zamknięta- Listener returns (loop break)
- deriveTimeline: tool bez wyniku + nie-server → status
pending,hasWaitingInteractive=true→ input override - User odpowiada → optimistic
pendingToolResults(answer-view OD RAZU)respondToToolmutation (atomowoassistantTurnStarted+userResponded)
- tool_result row dociera liveQuery → optimistic entry GC, DB wygrywa
aiResumeListener→ kolejny turn streamuje — a answered tool pozostaje w answer-view, bo derywacja zawsze widzi resultMap z DB (nie ma guardu "w trakcie streamowania")
Stream-registry API
startStream(messageId) // idempotent. Listener woła przed publish
publish(messageId, event) // applyStreamEvent + broadcast SSE
subscribe(messageId): { // route handler. null → 410
stream, currentBlocks
} | null
finalize(messageId, finalDetails?) // broadcast done, close, delete po 5s
isActive(messageId): boolean // health check
getCurrentBlocks(messageId) // debug/test, readonlyPublishableEvent to subset ChatStreamEvent bez init/done/messageId —
init/done emit'uje registry, messageId wstrzykuje się automatycznie.
Key invariants
Derywacja (klient):
- Timeline jest CZYSTĄ funkcją
(history, overlays, optimistic) → items. Nigdy nie pisz do timeline'u imperatywnie — każdy merge dwóch kanałów przez mutowalny stan + flagę trybu kończył się produkcyjnymi race'ami (watchdogi/backstopy/nonce w git log to historia tych prób). - SSE reader aktualizuje WYŁĄCZNIE overlay (nigdy timeline) i robi to
wyłącznie przez
applyStreamEvent. - Klucze itemów (
${msgId}_t${n},toolCallId) są IDENTYCZNE dla overlaya i finalnych blocks — przejście stream→DB nie remountuje elementów. - Stale overlay dla zamkniętego rowa jest ignorowany przez derywację (GC w hooku to optymalizacja, nie poprawność).
Live wartość:
currentBlocksw stream-registry jest jedynym źródłem prawdy dla treści in-progress assistanta; jedyna semantyka akumulacji toapplyStreamEvent(blocks-reducer.ts) — zmiana TYLKO tam, inaczej klient i serwer się rozjadą- inwariant reconnectu: snapshot w punkcie K + replay od K == pełny replay
(test w
blocks-reducer.test.ts) partialBlocks/partialLastSeq/sekwencery NIE ISTNIEJĄ —initsnapshot załatwia reconnect; jeśli pojawi się PR dodający je, odrzuć
DB:
- Assistant row z
isGenerating=truemablocks=undefined - Po
assistantTurnCompletedrow maisGenerating=false+blocksfinal- ewentualny
error— atomowo, w jednym rzędzie (na tym stoi cała derywacja: klient nigdy nie zobaczy flipa bez finalnej treści)
- ewentualny
- Treść NIGDY nie ląduje w DB chunk po chunku
interrupted=trueustawia wyłączniegenerationInterrupted(lazy repair w route przy 410, próg wieku 10 s)
Stream lifecycle:
startStream(messageId)PRZED pierwszympublish(listener gwarantuje, synchronicznie przed 1. awaitem)finalize(messageId)POcompleteAssistantTurn(DB → in-memory order)- Każda iteracja generation loop'a → osobny
messageId→ osobny stream
Subscribe:
- Pierwszy event po
subscribe()to ZAWSZEinit subscribe()zwracanull(→ 410) tylko gdy stream nie istnieje w mapie (poza grace window)
Gotchas dla modyfikacji
assistantTurnStarted emit'owany PRZED messageSent/userResponded/
retryRequested w jednej mutacji. Powód: async listener reaguje na
to drugie i potrzebuje, żeby assistant row już istniał w DB. Konsekwencja:
placeholder ma wcześniejszy createdAt niż trigger — dlatego istnieje
fix-up w ordering.ts. Zmieniasz jedno → sprawdź drugie.
buildHistory w listenerze pomija assistant rows z isGenerating=true
&& !blocks. Czyli interrupted rows (przed retryRequested projection)
oraz fresh rows w trakcie generacji nie trafiają do LLM history. Po retry
fresh row też jest skip'owany — historia kończy się na ostatniej user
message, LLM kontynuuje od niej.
Server-tool execution loop NIE używa stream-registry. Po finalize dla
tury z tool_calls kolejne publish byłyby no-opem. Server tool results
trafiają do klienta przez liveQuery (saveToolResult → tool_result row).
To świadome — następna tura ma własny stream, a derywacja i tak czyta
wyniki tooli z DB.
queueMicrotask w pętli readera SSE (use-assistant-overlays). NIE
zamieniaj na setTimeout(0): Chrome throttluje timeouty w kartach w tle
do ≥1 s — pętla zamienia się w 1-event/s freeze. Microtaski nie są
throttlowane.
Brak retencji buforów eventów. Klient, który podłączy się 6 s po
finalize, dostanie 410. Brak ?afterSeq, brak replay. Po zamknięciu turnu
klient ma final blocks z DB i nie potrzebuje SSE; 410 dla świeżego rowa
obsługuje retry z backoffem + lazy repair.
Pola tekstowe z JSON-em (blocks, content) mają DWA kształty.
W świeżym in-memory store to stringi, ale adapter Postgresa auto-parsuje
kolumny tekstowe wyglądające jak JSON (deserializeValue) — po hydracji
store'a z bazy (restart serwera) te same pola przychodzą jako gotowe
tablice/obiekty, również do klienta przez liveQuery. Każde miejsce czytające
msg.blocks / msg.content musi tolerować oba kształty (parseBlocks w
derive-timeline, buildHistory w listenerze). Regresja "po odświeżeniu
znikają wiadomości asystenta" brała się dokładnie stąd.
Reguła busy ma staleness cutoff. Klauzule "tool bez wyniku" i "świeży tool_result" wygasają po 120 s braku aktywności w DB — celowo: lepszy przedwcześnie aktywny input (zachowanie sprzed redesignu) niż chat zablokowany na zawsze po crashu listenera.
Powiązane fragmenty
@arcote.tech/arc-ai— provider abstraction,StreamChunk,ChatStreamEvent, tool system, billing@arcote.tech/arc-ai-{openai,claude,gemini}— implementacje providerów@arcote.tech/arc-ds— DS components: Chat, ChatMessage, ChatInput, ChatToolLog, ChatLabels@arcote.tech/arc-ai-voice— VoiceTextarea (dyktowanie głosowe out-of-the-box)
