@ebowwa/jsonl-hft
v1.3.1
Published
Generic HFT-grade JSONL parser - NO hardcoded fields, consumer defines what to extract. Sub-10µs latency with memchr SIMD optimization. Parallel parsing, gzip support, schema validation.
Maintainers
Readme
@ebowwa/jsonl-hft
Generic HFT-grade JSONL parser with sub-microsecond latency.
NO HARDCODED FIELDS - Consumer defines what fields to extract.
Features
- Parallel Parsing: Multi-core processing with rayon
- GZIP Support: Read and write compressed files
- Schema Validation: Validate JSONL against type schemas
- Type Inference: Automatic field type detection
- Error Recovery: Continue parsing on malformed lines
- Statistics: Real-time parsing metrics
- Memory-Mapped I/O: Efficient large file handling
- SIMD Optimized: memchr-accelerated byte search (~19 GiB/s)
Performance
| Benchmark | Time | Throughput | |-----------|------|------------| | parse_line | 232 ns | ~606 MiB/s | | find_field (first) | 15 ns | ~5 GiB/s | | find_field (last) | 58 ns | ~1.3 GiB/s | | pool_parser (1000 lines) | 113 µs | ~435 MiB/s | | memchr_search | 3.9 ns | ~19.7 GiB/s |
Installation
bun add @ebowwa/jsonl-hftUsage
Basic Parsing
import { parseDir, parseFile, parseBuffer, getVersion } from "@ebowwa/jsonl-hft";
// Define what fields you want to extract
const fields = ["session_id", "timestamp", "role", "message.content"];
// Parse a directory (recursive, parallel)
const entries = parseDir("/path/to/jsonl/files", fields);
// Parse a single file
const fileEntries = parseFile("/path/to/file.jsonl", fields);
// Parse a buffer
const bufferEntries = parseBuffer(jsonlBuffer, fields);
// Get version
console.log(getVersion()); // "1.2.0"GZIP Support
import { isGzip, parseGzipFile, writeGzip } from "@ebowwa/jsonl-hft";
// Check if file is gzip compressed
if (isGzip("/path/to/file.jsonl.gz")) {
// Parse compressed file
const entries = parseGzipFile("/path/to/file.jsonl.gz", fields);
}
// Write to gzip file
const result = writeGzip("/path/to/output.jsonl.gz", jsonlData, 9);
console.log(`Compression ratio: ${result.compressionRatio}`);Parallel Parsing
import { parseFileParallel, parseFilesParallel, parseDirParallel } from "@ebowwa/jsonl-hft";
// Parse single file with parallel chunks
const result = parseFileParallel("/path/to/large.jsonl", fields, 0); // 0 = auto chunk size
console.log(`Parsed ${result.linesProcessed} lines in ${result.parseTimeNs}ns`);
// Parse multiple files in parallel
const files = ["/path/a.jsonl", "/path/b.jsonl", "/path/c.jsonl"];
const multiResult = parseFilesParallel(files, fields);
// Parse directory in parallel
const dirResult = parseDirParallel("/path/to/jsonl/dir", fields);Schema Validation
import { validateFile, FieldType } from "@ebowwa/jsonl-hft";
const schema = [
{ name: "session_id", expectedType: FieldType.String, required: true },
{ name: "timestamp", expectedType: FieldType.String, required: true },
{ name: "value", expectedType: FieldType.Number, required: false },
];
const validation = validateFile("/path/to/data.jsonl", schema);
if (!validation.isValid) {
console.log(`Found ${validation.errorCount} errors`);
validation.errors.forEach(err => {
console.log(`Line ${err.lineNumber}: ${err.errorMessage}`);
});
}Type Inference
import { inferFieldTypes, FieldType } from "@ebowwa/jsonl-hft";
const line = Buffer.from('{"id":"123","count":42,"active":true}');
const fields = ["id", "count", "active"];
const types = inferFieldTypes(line, fields);
// types[0] = FieldType.String
// types[1] = FieldType.Number
// types[2] = FieldType.BooleanStatistics
import { getStats, resetStats, parseFileWithStats } from "@ebowwa/jsonl-hft";
// Parse with stats collection
const result = parseFileWithStats("/path/to/file.jsonl", fields);
// Get global stats
const stats = getStats();
console.log(`Throughput: ${stats.throughputMiBs} MiB/s`);
console.log(`Avg latency: ${stats.avgLatencyNs} ns`);
// Reset stats
resetStats();Error Recovery
import { parseFileWithRecovery } from "@ebowwa/jsonl-hft";
const result = parseFileWithRecovery("/path/to/dirty.jsonl", fields);
console.log(`Successful: ${result.stats.successfulLines}`);
console.log(`Failed: ${result.stats.failedLines}`);
// Access errors
result.errors.forEach(err => {
console.log(`Line ${err.lineNumber}: ${err.errorType}`);
});Batch Parsing
import { parseBatch, freeBatch } from "@ebowwa/jsonl-hft";
const buffers = [buf1, buf2, buf3];
const lengths = [buf1.length, buf2.length, buf3.length];
const result = parseBatch(buffers, lengths, fields);
console.log(`Parsed ${result.count} entries`);
// Cleanup
freeBatch(result);Streaming API
import { parseStream } from "@ebowwa/jsonl-hft";
// Process each line with a callback
parseStream("/path/to/large.jsonl", fields, (entry, lineNumber) => {
// Process entry
console.log(`Line ${lineNumber}:`, entry);
return true; // Continue processing
});Field Specification
Fields can be:
- Simple:
"session_id","timestamp","role" - Nested (dot notation):
"message.content","metadata.user.id"
The parser extracts only the fields you request - no wasted parsing.
API Reference
Parsing Functions
| Function | Description |
|----------|-------------|
| parseDir | Parse all JSONL files in directory recursively |
| parseFile | Parse single file with memory-mapped I/O |
| parseBuffer | Parse from buffer/string |
| parseFileParallel | Parallel file parsing with chunks |
| parseFilesParallel | Parse multiple files in parallel |
| parseDirParallel | Parallel directory parsing |
| parseGzipFile | Parse gzip-compressed file |
| parseFileWithStats | Parse with statistics collection |
| parseFileWithRecovery | Parse with error recovery |
| parseBatch | Parse multiple buffers |
| parseStream | Streaming callback-based parsing |
Validation Functions
| Function | Description |
|----------|-------------|
| validateFile | Validate JSONL against schema |
| inferFieldTypes | Infer types for fields |
Output Functions
| Function | Description |
|----------|-------------|
| writeGzip | Write to gzip-compressed file |
| writeFile | Write to uncompressed file |
Utility Functions
| Function | Description |
|----------|-------------|
| getVersion | Get library version |
| isGzip | Check if file is gzip compressed |
| getStats | Get global parsing statistics |
| resetStats | Reset statistics counters |
Types
enum FieldType {
Unknown = 0,
String = 1,
Number = 2,
Boolean = 3,
Null = 4,
Array = 5,
Object = 6,
}
interface StatsResult {
totalLines: bigint;
successfulLines: bigint;
failedLines: bigint;
totalBytes: bigint;
parseTimeNs: bigint;
avgLatencyNs: bigint;
minLatencyNs: bigint;
maxLatencyNs: bigint;
throughputMiBs: number;
}
interface ParallelResult {
entries: GenericEntry[];
count: number;
linesProcessed: bigint;
parseTimeNs: bigint;
}
interface WriteResult {
success: boolean;
bytesWritten: bigint;
compressedBytes: bigint;
compressionRatio: number;
errorMessage?: string;
}Build
cd packages/src/rust/jsonl-hft
bun run buildArchitecture
┌─────────────────────────────────────────────────────────┐
│ TypeScript (control plane) │
│ │ │
│ ▼ FFI call │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Rust Parser (data plane) │ │
│ │ • Memory-mapped I/O (memmap2) │ │
│ │ • Parallel processing (rayon) │ │
│ │ • Zero allocation hot path │ │
│ │ • SIMD-friendly byte scanning (memchr) │ │
│ │ • GZIP compression (flate2/zlib-ng) │ │
│ │ • Schema validation │ │
│ │ • Type inference │ │
│ │ • Statistics collection │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘License
MIT
