use-workflow-stream
v0.1.0-alpha.0
Published
React hook for consuming Workflow DevKit streams with multi-stream support, auto-reconnection, and Zod validation
Maintainers
Readme
use-workflow-stream
⚠️ ALPHA: This package is in early development. APIs may change without notice. Use at your own risk.
React hook for consuming Workflow DevKit streams with multi-stream support, auto-reconnection, and Zod validation.
Features
- 🔄 Multiple Named Streams - Consume logs, metrics, and output simultaneously
- 🔌 Auto-Reconnection - Automatically resume after network interruptions
- ✅ Zod Validation - Optional runtime type checking with full type inference
- 📡 Format Support - JSON lines and Server-Sent Events (SSE)
- ⚛️ React 19+ - Built with modern React patterns (
useSyncExternalStore) - 🎯 TypeScript - Full type safety throughout
Installation
npm install use-workflow-stream zod
# or
pnpm add use-workflow-stream zod
# or
yarn add use-workflow-stream zodQuick Start
import { useWorkflowStream } from 'use-workflow-stream';
function MyComponent({ runId }: { runId: string }) {
const { chunks, latest, isLoading, error, isDone } = useWorkflowStream({
runId,
});
if (error) return <div>Error: {error.message}</div>;
if (isLoading && chunks.length === 0) return <div>Loading...</div>;
return (
<div>
<h2>Latest: {latest}</h2>
<ul>
{chunks.map((chunk, i) => (
<li key={i}>{chunk}</li>
))}
</ul>
{isDone && <p>Stream complete!</p>}
</div>
);
}Usage
Basic Usage
const { chunks, latest, isLoading, error, isDone } = useWorkflowStream({
runId: 'run_abc123',
});Multiple Named Streams
Consume multiple streams from the same workflow run:
const output = useWorkflowStream({ runId });
const logs = useWorkflowStream({ runId, namespace: 'logs' });
const metrics = useWorkflowStream({ runId, namespace: 'metrics' });With Zod Validation
Add runtime type checking with Zod schemas:
import { z } from 'zod';
const logSchema = z.object({
level: z.enum(['info', 'warn', 'error']),
message: z.string(),
timestamp: z.string().datetime(),
});
const { chunks, error } = useWorkflowStream({
runId: 'run_123',
schema: logSchema,
});With Callbacks
Handle stream events:
const { chunks } = useWorkflowStream({
runId: 'run_123',
onStart: (runId) => console.log('Stream started:', runId),
onChunk: (chunk, index) => console.log('Received chunk:', chunk),
onEnd: (count) => console.log('Stream ended, total chunks:', count),
onError: (error) => console.error('Stream error:', error),
});Conditional Streaming
Enable/disable streams dynamically:
const [showLogs, setShowLogs] = useState(false);
const logs = useWorkflowStream({
runId,
namespace: 'logs',
enabled: showLogs,
});Custom API Endpoint
Override the default API endpoint:
const { chunks } = useWorkflowStream({
runId: 'run_123',
api: '/api/custom-endpoint',
});API Reference
useWorkflowStream(options)
Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| runId | string | required | Workflow run ID |
| namespace | string | undefined | Optional stream namespace |
| schema | z.ZodSchema<T> | undefined | Optional Zod schema for validation |
| autoReconnect | boolean | true | Automatically reconnect on interruption |
| maxRetries | number | 3 | Maximum consecutive errors before giving up |
| fetch | typeof fetch | globalThis.fetch | Custom fetch implementation |
| api | string | '/api/trigger' | API endpoint base URL |
| startIndex | number | 0 | Start reading from this chunk index |
| enabled | boolean | true | Enable/disable the stream |
| onStart | (runId: string) => void | undefined | Called when stream starts |
| onChunk | (chunk: T, index: number) => void | undefined | Called for each chunk |
| onEnd | (chunkCount: number) => void | undefined | Called when stream ends |
| onError | (error: Error) => void | undefined | Called on error |
Return Value
| Property | Type | Description |
|----------|------|-------------|
| chunks | T[] | Array of all chunks received |
| latest | T \| undefined | Latest chunk (or undefined if none yet) |
| isLoading | boolean | Loading state |
| error | Error \| null | Error state |
| isDone | boolean | Stream has finished |
| chunkCount | number | Total chunks received |
| reconnect | () => void | Manually reconnect to stream |
| reset | () => void | Clear all chunks and reset |
How It Works
Transport Layer
The hook automatically detects and handles two stream formats:
- JSON Lines - Newline-delimited JSON (default Workflow format)
- Server-Sent Events (SSE) - Standard
text/event-stream
Binary Data
Binary data is automatically detected and decoded from base64:
// Workflow sends binary as: { data: "base64string" }
// Hook automatically decodes to: "decoded string"Auto-Reconnection
When autoReconnect is enabled (default), the hook will:
- Automatically retry on network errors
- Use exponential backoff (1s, 2s, 4s, ...)
- Resume from last chunk index to avoid duplicates
- Give up after
maxRetriesconsecutive failures
State Management
Built with React 19's useSyncExternalStore for:
- Efficient re-renders
- Concurrent rendering support
- Server-side rendering compatibility
Examples
See the examples/ directory for more:
basic.tsx- Basic usagemultiple-streams.tsx- Multiple named streamswith-validation.tsx- Zod validation
Development
# Install dependencies
pnpm install
# Build
pnpm build
# Run tests
pnpm test
# Type check
pnpm typecheckLicense
MIT
