stream-guard
v1.0.1
Published
The circuit breaker for Node.js streams. Prevent memory leaks, slow loris attacks, and zombie connections. Enforces timeouts, heap limits, and stall detection with zero dependencies.
Maintainers
Readme
stream-guard
Production-grade circuit breaker for Node.js streams. Prevents leaks, hangs, and OOM crashes.
The Problem: Why Streams Are Dangerous
Node.js streams are powerful but dangerous by default:
| Threat | Description | Impact | |--------|-------------|--------| | Slow Loris Attack | Malicious client sends data extremely slowly | Server hangs forever, file descriptors exhausted | | Zombie Connections | Stream stops emitting data but never closes | Memory leak, connection pool exhaustion | | Infinite Data | Unbounded streams consume unlimited memory | OOM crash, service restart | | Memory Leaks | Event listeners pile up on long-lived streams | Gradual memory growth until crash |
The "Just Use setTimeout" Fallacy
// ❌ This is NOT enough
const timeout = setTimeout(() => {
stream.destroy();
}, 30000);
stream.on('data', () => {
clearTimeout(timeout);
// Now what? Stream could still stall between chunks!
});The setTimeout approach fails because:
- It doesn't detect stalls between chunks
- It doesn't limit total bytes transferred
- It doesn't protect against memory exhaustion
- Timer cleanup is error-prone and often missed
The Solution: stream-guard
stream-guard wraps any Node.js stream with configurable safety constraints:
import { guard } from 'stream-guard';
import { createReadStream } from 'node:fs';
// ✅ Protected stream - will be destroyed if any limit is violated
const stream = guard(createReadStream('data.json'), {
timeout: 30_000, // Absolute timeout: 30 seconds max
stalled: 5_000, // Stall detection: 5 seconds without data = dead
maxBytes: 10 * 1024 * 1024, // Byte limit: 10MB max
});
stream.pipe(response);When a limit is violated, the stream is immediately destroyed with a typed error:
import { guard, StreamTimeoutError, StreamStalledError } from 'stream-guard';
stream.on('error', (err) => {
if (err instanceof StreamTimeoutError) {
console.log(`Timeout after ${err.elapsedMs}ms`);
} else if (err instanceof StreamStalledError) {
console.log(`Stalled for ${err.idleMs}ms`);
}
});Installation
npm install stream-guardyarn add stream-guardpnpm add stream-guardAPI Reference
guard(stream, options)
Wraps a stream with safety constraints. Returns the same stream instance.
function guard<T extends GuardableStream>(stream: T, options?: GuardOptions): TOptions
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| timeout | number | - | Absolute timeout in milliseconds. Stream is destroyed if it doesn't complete within this time. |
| stalled | number | - | Stall protection in milliseconds. Stream is destroyed if no data flows for this duration. |
| maxBytes | number | - | Byte limit. Stream is destroyed if total bytes exceed this value. |
| maxHeap | number | - | Heap limit in bytes. Stream is destroyed if process.memoryUsage().heapUsed exceeds this. |
| heapCheckInterval | number | 100 | Interval in ms between heap checks. |
| heapCheckChunks | number | - | Check heap every N chunks instead of by time interval. |
| onDestroy | (error: Error) => void | - | Callback when stream is destroyed by guard. |
Returns
The same stream instance with guards attached. Works seamlessly with .pipe().
createGuard(defaultOptions)
Creates a reusable guard factory with preset defaults.
const apiGuard = createGuard({
timeout: 30_000,
maxBytes: 5 * 1024 * 1024,
});
// All streams use these defaults
apiGuard(stream1);
apiGuard(stream2, { stalled: 5000 }); // Can still overrideisGuarded(stream)
Check if a stream has already been guarded.
if (!isGuarded(stream)) {
guard(stream, options);
}getGuardMetadata(stream)
Access guard metadata for monitoring.
const metadata = getGuardMetadata(stream);
console.log(metadata.guardId); // Unique guard ID
console.log(metadata.getBytesTransferred()); // Bytes so far
console.log(metadata.getElapsedTime()); // Time since guard attached
console.log(metadata.isActive()); // Guard still active?
metadata.release(); // Manually remove guardsError Classes
All errors extend StreamGuardError and include:
timestamp: When the error occurredconstraint: Which limit was violated
| Error Class | Constraint | Properties |
|-------------|------------|------------|
| StreamTimeoutError | timeout | timeoutMs, elapsedMs |
| StreamStalledError | stalled | stalledMs, idleMs |
| StreamLimitError | maxBytes | maxBytes, bytesTransferred |
| StreamHeapError | maxHeap | maxHeap, heapUsed |
Type Guards
import {
isStreamGuardError,
isStreamTimeoutError,
isStreamStalledError,
isStreamLimitError,
isStreamHeapError,
} from 'stream-guard';
stream.on('error', (err) => {
if (isStreamGuardError(err)) {
metrics.increment(`stream.guard.${err.constraint}`);
}
});Usage Examples
HTTP File Upload Protection
import { guard, StreamLimitError } from 'stream-guard';
import express from 'express';
app.post('/upload', (req, res) => {
const upload = guard(req, {
timeout: 60_000, // 1 minute max
stalled: 10_000, // 10s stall = abort
maxBytes: 50 * 1024 * 1024, // 50MB max
});
upload.on('error', (err) => {
if (err instanceof StreamLimitError) {
res.status(413).send('File too large');
} else {
res.status(408).send('Upload timeout');
}
});
upload.pipe(fs.createWriteStream('/uploads/file'));
});Database Export with Memory Protection
import { guard } from 'stream-guard';
const exportStream = guard(db.query('SELECT * FROM huge_table').stream(), {
timeout: 300_000, // 5 minutes
maxHeap: 500 * 1024 * 1024, // 500MB heap limit
heapCheckInterval: 50,
onDestroy: (err) => {
logger.error('Export aborted', { error: err.message });
alertOps('Database export failed - memory limit');
},
});
exportStream.pipe(csvTransform).pipe(response);Microservice Request Streaming
import { createGuard } from 'stream-guard';
// Reusable guard for all API responses
const apiGuard = createGuard({
timeout: 30_000,
stalled: 5_000,
maxBytes: 10 * 1024 * 1024,
});
async function fetchFromService(url: string) {
const response = await fetch(url);
return apiGuard(Readable.fromWeb(response.body));
}WebSocket Backpressure Protection
import { guard } from 'stream-guard';
ws.on('connection', (socket) => {
const guarded = guard(socket, {
stalled: 30_000, // Client must ACK within 30s
maxBytes: 100 * 1024 * 1024, // 100MB per connection
});
guarded.on('error', (err) => {
logger.warn('WebSocket terminated', { reason: err.message });
socket.terminate();
});
});Why stream-guard?
| Feature | setTimeout | stream.timeout() | stream-guard |
|---------|--------------|-------------------|------------------|
| Absolute timeout | ✅ | ❌ | ✅ |
| Stall detection | ❌ | ❌ | ✅ |
| Byte limits | ❌ | ❌ | ✅ |
| Heap protection | ❌ | ❌ | ✅ |
| Automatic cleanup | ❌ | ❌ | ✅ |
| Typed errors | ❌ | ❌ | ✅ |
| Zero dependencies | N/A | N/A | ✅ |
| Works with pipe() | ❌ | ✅ | ✅ |
Best Practices
1. Always set a timeout for network streams
// Every external stream should have an absolute timeout
guard(httpResponse, { timeout: 30_000 });2. Use stall detection for user-facing uploads
// Slow uploads are often attacks or broken clients
guard(uploadStream, { stalled: 10_000 });3. Combine with byte limits for untrusted input
guard(request, {
timeout: 60_000,
stalled: 5_000,
maxBytes: 10 * 1024 * 1024,
});4. Monitor with onDestroy callback
guard(stream, {
timeout: 30_000,
onDestroy: (err) => {
metrics.increment('stream.killed', { reason: err.constraint });
},
});5. Use metadata for debugging
const guarded = guard(stream, options);
setInterval(() => {
const meta = getGuardMetadata(guarded);
if (meta?.isActive()) {
logger.debug('Stream progress', {
bytes: meta.getBytesTransferred(),
elapsed: meta.getElapsedTime(),
});
}
}, 5000);Requirements
- Node.js 18.0.0 or later
- TypeScript 5.0+ (for type definitions)
License
MIT © 2026
Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing) - Run tests (
npm test) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing) - Open a Pull Request
