@obarlik/streaming-pipeline-core
v1.0.3
Published
π Memory-efficient circular buffer streaming pipeline with universal processing - by Codechu
Maintainers
Readme
π Streaming Pipeline Core

π Memory-Efficient Circular Buffer System
A production-ready TypeScript library by π Codechu
β¨ Features
π Circular Buffer System
- Bounded memory usage - never grows, automatically compacts old data
- O(1) operations - peek, advance, lookahead/lookbehind all constant time
- Configurable limits - set your own lookBehind/lookAhead buffer sizes
- Auto-refill streaming - continuous data flow without blocking
π― Universal Processing
- Single interface - one
process()method handles everything - Text & Binary - supports both string and Uint8Array inputs
- Pattern matching - lookahead/lookbehind for complex parsing
- Real-time streaming - immediate chunk output as content processes
π§ͺ Production Ready
- TypeScript - full type safety and IntelliSense support
- Modular tests - organized unit, integration, and performance tests
- Zero dependencies - completely self-contained
- Memory efficient - bounded memory usage for infinite streams
π Quick Start
npm install @codechu/streaming-pipeline-coreBasic Usage
import { PipelineFactory, MarkdownProcessor, HTMLRenderer } from '@codechu/streaming-pipeline-core';
// Create pipeline with circular buffer
const pipeline = PipelineFactory.createTextPipeline({
lookBehindSize: 256, // 256 chars of history
lookAheadSize: 1024 // 1KB lookahead for pattern matching
});
// Register processor and renderer
pipeline.registerProcessor(new MarkdownProcessor());
pipeline.registerRenderer(new HTMLRenderer());
// Stream processing
const markdown = \`# Hello World
This is **bold** and *italic* text.\`;
for await (const html of pipeline.processStream(markdown, 'html')) {
console.log(html); // Real-time HTML chunks
}Streaming from Large Sources
// Create high-performance pipeline
const pipeline = PipelineFactory.createHighPerformancePipeline();
pipeline.registerProcessor(new MarkdownProcessor());
pipeline.registerRenderer(new HTMLRenderer());
// Stream from ReadableStream (files, network, etc.)
const fileStream = fs.createReadStream('large-document.md');
const readableStream = new ReadableStream({
start(controller) {
fileStream.on('data', chunk => controller.enqueue(chunk));
fileStream.on('end', () => controller.close());
}
});
// Process as stream arrives - bounded memory usage
for await (const output of pipeline.processStream(readableStream, 'html')) {
// Real-time processing without loading entire file
console.log(output);
}π Circular Buffer Architecture
Memory-Efficient Design
Circular Buffer: [lookBehind] [current] [lookAhead]
β β β
History Position Future
Auto-compact: Old history automatically dropped when limit reached
Auto-refill: New data automatically loaded as needed
Bounded: Total memory = lookBehind + 1 + lookAhead (constant)Configuration Options
// Small buffer for simple processing
const pipeline = PipelineFactory.createTextPipeline({
lookBehindSize: 64, // Minimal history
lookAheadSize: 128 // Basic lookahead
});
// Large buffer for complex parsing
const pipeline = PipelineFactory.createTextPipeline({
lookBehindSize: 1024, // More context
lookAheadSize: 4096 // Complex pattern matching
});
// Binary processing
const pipeline = PipelineFactory.createBinaryPipeline({
lookBehindSize: 256,
lookAheadSize: 512
});π οΈ Creating Processors
Basic Processor
import { BaseStreamProcessor } from '@codechu/streaming-pipeline-core';
class MyProcessor extends BaseStreamProcessor {
readonly name = 'my-processor';
readonly priority = 10;
canProcess(context: StreamingContext): boolean {
// Check if this processor should handle current position
const current = context.buffer.peekChar();
return current === '#'; // Process headers
}
process(context: StreamingContext): { chunks: StreamChunk[]; advance: number } {
const buffer = context.buffer as TextCircularBuffer;
// Look ahead to find complete pattern
const line = buffer.lookAheadString(100);
const match = line.match(/^(#{1,6})\\s+(.+)/);
if (match) {
const level = match[1].length;
const text = match[2];
return {
chunks: [{
type: 'heading',
content: text,
data: { level }
}],
advance: match[0].length // Move past entire header
};
}
// Fallback: advance at least 1 to prevent infinite loops
return { chunks: [], advance: 1 };
}
}Advanced Pattern Matching
class AdvancedProcessor extends BaseStreamProcessor {
readonly name = 'advanced';
readonly priority = 15;
readonly preferredLookBehind = 64; // Need context
readonly preferredLookAhead = 256; // Complex patterns
process(context: StreamingContext): { chunks: StreamChunk[]; advance: number } {
const buffer = context.buffer as TextCircularBuffer;
// Use lookbehind for context
const before = buffer.lookBehindString(10);
const current = buffer.peekChar();
const ahead = buffer.lookAheadString(50);
// Smart pattern matching with context
if (current === '*' && ahead.startsWith('*') && !before.endsWith('\\\\')) {
// Bold text: **text**
const endIndex = ahead.indexOf('**', 2);
if (endIndex > 0) {
const boldText = ahead.slice(2, endIndex);
return {
chunks: [{ type: 'bold', content: boldText }],
advance: endIndex + 2 // Past closing **
};
}
}
return { chunks: [], advance: 1 };
}
}π Performance
Benchmarks
- Throughput: 500K+ chars/sec with complex markdown processing
- Memory: Constant usage regardless of input size
- Latency: First chunk < 10ms, average chunk latency < 1ms
- Scalability: Handles GB+ files with MB memory usage
Memory Efficiency
// Process 100MB file with only ~8KB memory usage
const pipeline = PipelineFactory.createTextPipeline({
lookBehindSize: 2048, // 2KB history
lookAheadSize: 4096 // 4KB lookahead
});
// Total buffer: ~6KB + overhead = ~8KB constant memory
// Even for infinite streams!
const infiniteStream = createInfiniteMarkdownStream();
for await (const output of pipeline.processStream(infiniteStream, 'html')) {
// Bounded memory forever
}π§ͺ Testing
Organized Test Structure
# Run all test categories
npm test
# Specific test categories
npm run test:unit # Component isolation tests
npm run test:integration # End-to-end workflow tests
npm run test:performance # Benchmarks and stress tests
npm run test:smoke # Quick validation testsTest Categories
- Unit Tests: CircularStreamBuffer, processor patterns, interface compliance
- Integration Tests: Complete pipeline workflows, error handling, real streams
- Performance Tests: Throughput, latency, memory usage, scalability benchmarks
π Documentation
- API Reference - Complete interface documentation
- Architecture Guide - Circular buffer system design
- Examples - Processor and renderer examples
- Performance Guide - Optimization and benchmarks
π§ Factory Patterns
Pre-configured Pipelines
// Text processing optimized
const textPipeline = PipelineFactory.createTextPipeline();
// Binary data processing
const binaryPipeline = PipelineFactory.createBinaryPipeline();
// High-performance for large content
const perfPipeline = PipelineFactory.createHighPerformancePipeline();Custom Configuration
const pipeline = new StreamingPipeline();
pipeline.configureBuffer({
lookBehindSize: 512,
lookAheadSize: 2048,
encoding: 'utf-8',
autoCompact: true
});π― Use Cases
Perfect For
- π Markdown parsers - Real-time preview with bounded memory
- π Log analyzers - Process GB+ logs with constant memory
- π Document processors - Streaming conversion with pattern matching
- π Web scrapers - Parse HTML/XML as it streams
- π Data transformers - CSV/JSON processing with lookahead/lookbehind
- π Protocol parsers - Binary protocol parsing with context
Production Examples
// Real-time markdown editor
const editor = PipelineFactory.createTextPipeline({
lookBehindSize: 128, // Fast context for UI
lookAheadSize: 256 // Pattern completion
});
// Large file processor
const processor = PipelineFactory.createHighPerformancePipeline({
lookBehindSize: 1024, // Rich context
lookAheadSize: 4096 // Complex patterns
});
// Memory-constrained environment
const minimal = PipelineFactory.createTextPipeline({
lookBehindSize: 32, // Minimal memory
lookAheadSize: 64
});π Why Circular Buffer?
β Memory Efficient
- Constant memory usage regardless of input size
- Automatic cleanup of old data
- No memory leaks or accumulation
β‘ Performance Optimized
- O(1) operations for all buffer access
- No data copying or shifting
- Optimized for streaming workloads
π― Developer Friendly
- Simple single-method processor interface
- Configurable buffer sizes for different use cases
- Built-in patterns for common scenarios
π Production Ready
- Comprehensive test coverage
- TypeScript support with full type safety
- Battle-tested circular buffer implementation
π Stream efficiently with bounded memory! π
Built with β€οΈ by Codechu
