@tashiscool/stream
v0.1.0
Published
Streaming utilities for real-time LLM responses
Maintainers
Readme
@llm-utils/stream
Streaming utilities for real-time LLM responses. Handle HTTP streaming, chunk aggregation, and JSONL parsing with ease.
Installation
pnpm add @llm-utils/stream
# or
npm install @llm-utils/streamFeatures
- HTTP Response Interception - Transform streaming responses without breaking semantics
- Chunk Aggregation - Aggregate chunks from multiple sources with lifecycle handlers
- JSONL Streaming - Parse and serialize JSONL streams (server and client)
- Type-Safe - Full TypeScript support with generics
Usage
Intercept and Transform Streaming Responses
import { interceptResponseWrites } from '@llm-utils/stream';
app.post('/chat', async (req, res) => {
const llmStream = await callLLM(req.body);
interceptResponseWrites(res, {
transform: async (chunk) => {
// Transform each chunk before sending to client
const data = JSON.parse(chunk);
const enriched = { ...data, timestamp: Date.now() };
return JSON.stringify(enriched) + '\n';
},
onFinish: async () => {
// Cleanup after stream ends
await saveConversation();
}
});
llmStream.pipe(res);
});Aggregate Chunks from Multiple Sources
import { createChunkAggregator } from '@llm-utils/stream';
const aggregator = createChunkAggregator({
onBegin: async (message) => {
console.log(`Starting message from node: ${message.nodeId}`);
},
onItem: async (message, delta) => {
// Stream delta to client
sendToClient(delta);
},
onEnd: async (message) => {
// Save complete message
await saveMessage(message);
},
onError: async (message, error) => {
console.error(`Error in ${message.nodeId}:`, error);
}
});
// Ingest structured chunks
for await (const chunk of structuredStream) {
await aggregator.ingest(chunk);
}
// Finalize any incomplete messages
await aggregator.finalizeAll();JSONL Streaming
import { streamJsonLines, parseJsonLines } from '@llm-utils/stream';
// Server: Stream objects as JSONL
app.get('/events', (req, res) => {
streamJsonLines(res, async function* () {
for (const event of events) {
yield event;
}
});
});
// Client: Parse JSONL stream
const response = await fetch('/events');
for await (const event of parseJsonLines(response)) {
handleEvent(event);
}API Reference
interceptResponseWrites(res, options)
Intercepts Express response writes for transformation.
interface InterceptOptions {
transform: (text: string) => Promise<string | undefined>;
onFinish?: () => Promise<void>;
onError?: (error: Error) => void;
}createChunkAggregator(handlers)
Creates a stateful chunk aggregator for multi-source streaming.
interface StructuredChunk {
nodeId: string;
runIndex: number;
itemIndex: number;
type: 'begin' | 'item' | 'end' | 'error';
data?: string;
error?: string;
}
interface ChunkHandlers {
onBegin?: (message: AggregatedMessage) => Promise<void>;
onItem?: (message: AggregatedMessage, delta: string) => Promise<void>;
onEnd?: (message: AggregatedMessage) => Promise<void>;
onError?: (message: AggregatedMessage, error?: string) => Promise<void>;
}streamJsonLines(res, generator)
Streams objects as JSONL to an HTTP response.
parseJsonLines(response)
Parses a JSONL stream from a fetch Response.
License
MIT
