dspx
v1.3.0
Published
High-performance DSP library with native C++ acceleration and Redis state persistence
Maintainers
Readme
dspx
A high-performance DSP library with native C++ acceleration, Redis state persistence, and comprehensive time-series processing. Built for Node.js backends processing real-time biosignals, audio, and sensor data.
** v1.0.0 Release** Fully tested (500+ tests passing), battle-tested architecture, comprehensive documentation. Ready for production workloads.
A modern DSP library built for Node.js backends processing real-time biosignals, audio streams, and sensor data. Features native C++ filters with full state serialization to Redis, enabling seamless processing across service restarts and distributed workers.
✨ Features
- 🚀 Native C++ Performance – Optimized circular buffers and filters with SIMD acceleration for real-time processing
- 🎯 SIMD Acceleration – AVX2/SSE2/NEON optimizations provide 2-8x speedup on batch operations and rectification
- ⚡ Optimal FIR Filters (NEW) – Pre-computed Parks-McClellan coefficients ship with library (30-50% faster than window-based)
- 🔧 TypeScript-First – Full type safety with excellent IntelliSense
- 📡 Low state serialization overhead – Less than 0.1ms latency for state save/load operations
- ⏱️ Time-Series Processing – Support for irregular timestamps and time-based windows
- 🔗 Fluent Pipeline API – Chain filter operations with method chaining
- 🎯 Zero-Copy Processing – Direct TypedArray manipulation for minimal overhead
- 📊 Multi-Channel Support – Process multi-channel signals (EMG, EEG, audio) with independent filter states per channel
- ⚡ Async Processing – Background thread processing to avoid blocking the Node.js event loop
- 🛡️ Crash Recovery – Resume processing from exact state after service restarts
🏗️ Architecture
The library uses a layered architecture with clear separation between TypeScript and C++ components:
graph TB
subgraph "TypeScript Layer (src/ts/)"
TS_API["TypeScript API<br/>bindings.ts"]
TS_TYPES["Type Definitions<br/>types.ts"]
TS_UTILS["Utilities<br/>CircularLogBuffer, TopicRouter"]
TS_REDIS["Redis Backend<br/>backends.ts"]
end
subgraph "N-API Bridge Layer"
NAPI["Node-API Bindings<br/>(native module)"]
end
subgraph "C++ Layer (src/native/)"
subgraph "dsp::adapters (N-API Adapters)"
ADAPTER_EXAMPLE["Example: MovingAverageStage<br/>(supports batch/moving modes)"]
ADAPTER_STATELESS["Example: RectifyStage<br/>(stateless only)"]
end
PIPELINE["DspPipeline.cc<br/>(Stage Orchestration)"]
subgraph "dsp::core (Pure C++ Algorithms)"
CORE_ENGINE["SlidingWindowFilter<br/>(Generic Stateful Engine)"]
CORE_FILTER["Example: MovingAverageFilter<br/>(wraps SlidingWindowFilter)<br/><b>Stateful - Moving Mode</b>"]
subgraph "Statistical Policies"
POLICY_EXAMPLE["Example: MeanPolicy<br/>(pure computation)<br/><b>Stateless - Batch Mode</b>"]
end
end
subgraph "dsp::utils (Data Structures)"
CIRCULAR["CircularBuffer<br/>(Array & Vector)<br/><b>Stateful Storage</b>"]
end
end
subgraph "External Services"
REDIS_DB[("Redis<br/>(State Persistence)")]
end
%% Connections
TS_API --> NAPI
TS_REDIS --> REDIS_DB
NAPI --> PIPELINE
PIPELINE --> ADAPTER_EXAMPLE
PIPELINE --> ADAPTER_STATELESS
ADAPTER_EXAMPLE --> CORE_FILTER
ADAPTER_EXAMPLE --> POLICY_EXAMPLE
CORE_FILTER --> CORE_ENGINE
CORE_ENGINE --> POLICY_EXAMPLE
CORE_ENGINE --> CIRCULAR
%% Styling
classDef tsLayer fill:#3178c6,stroke:#235a97,color:#fff
classDef cppCore fill:#00599c,stroke:#003f6f,color:#fff
classDef cppEngine fill:#1a8cff,stroke:#0066cc,color:#fff
classDef cppPolicy fill:#66b3ff,stroke:#3399ff,color:#000
classDef cppAdapter fill:#659ad2,stroke:#4a7ba7,color:#fff
classDef cppUtils fill:#a8c5e2,stroke:#7a9fbe,color:#000
classDef external fill:#dc382d,stroke:#a82820,color:#fff
class TS_API,TS_TYPES,TS_UTILS,TS_REDIS tsLayer
class CORE_MA,CORE_RMS,CORE_MAV,CORE_VAR,CORE_ZSCORE cppCore
class CORE_ENGINE cppEngine
class POLICY_MEAN,POLICY_RMS,POLICY_MAV,POLICY_VAR,POLICY_ZSCORE cppPolicy
class ADAPTER_MA,ADAPTER_RMS,ADAPTER_RECT,ADAPTER_VAR,ADAPTER_ZSCORE cppAdapter
class CIRCULAR cppUtils
class REDIS_DB externalKey Architectural Principles
1. Namespace Separation
dsp::core: Pure C++ algorithms with no Node.js dependencies. Reusable in other C++ projects.dsp::adapters: N-API wrapper classes that expose core algorithms to JavaScript.dsp::utils: Shared data structures (circular buffers) used by core algorithms.
2. Policy-Based Design (Zero-Cost Abstraction)
The sliding window filters use compile-time polymorphism via template policies:
SlidingWindowFilter<T, Policy>: Generic engine handling circular buffer and window logic- Statistical Policies: Define specific computations (MeanPolicy, RmsPolicy, VariancePolicy, etc.)
- Zero overhead: Compiler inlines policy methods, achieving hand-written performance
- Extensibility: New statistical measures require only a new policy class
Example: MovingAverageFilter is a thin wrapper around SlidingWindowFilter<float, MeanPolicy<float>>
2.1 Layered State Management
State serialization follows a 3-tier delegation pattern where each layer manages only its own state:
Policy Layer → getState() returns policy-specific state (running sums, counters, etc.)
↓
SlidingWindowFilter → getState() returns {buffer contents, policy state}
↓
Filter Wrapper → getState() delegates to SlidingWindowFilterExample Implementation:
// Policy manages its own statistical state
template <typename T>
struct MeanPolicy {
T m_sum{}; // Policy-specific state
auto getState() const -> T { return m_sum; }
void setState(T sum) { m_sum = sum; }
};
// SlidingWindowFilter coordinates buffer + policy state
template <typename T, typename Policy>
class SlidingWindowFilter {
auto getState() const {
return std::make_pair(m_buffer.toVector(), m_policy.getState());
}
template <typename PolicyState>
void setState(const std::vector<T>& bufferData, const PolicyState& policyState) {
m_buffer.fromVector(bufferData);
m_policy.setState(policyState);
}
};
// Filter delegates without knowledge of internals
class MovingAverageFilter {
auto getState() const { return m_filter.getState(); } // Clean delegation
};Benefits:
- ✅ Separation of concerns: Each layer owns its state
- ✅ Type safety: Policies can have different state types
- ✅ Extensibility: Adding new policies doesn't change SlidingWindowFilter
- ✅ DRY principle: No duplicate state assembly code in each filter
3. Layered Design
- TypeScript Layer: User-facing API, type safety, Redis integration
- N-API Bridge: Zero-copy data marshaling between JS and C++
- C++ Core: High-performance DSP algorithms with optimized memory management
4. State Management
State serialization uses a layered delegation pattern (see section 2.1):
- Policy Layer: Manages statistical state (running sums, variance accumulators, etc.)
- SlidingWindowFilter: Coordinates buffer contents + policy state
- Filter Wrapper: Delegates to SlidingWindowFilter without knowledge of internals
Full state serialization to JSON enables:
- ✅ Redis persistence for distributed processing
- ✅ Process continuity across restarts
- ✅ State migration between workers
- ✅ Data integrity validation on deserialization
Each filter's getState() returns a tuple {bufferData: T[], policyState: PolicyState} that can be JSON-serialized through the TypeScript layer.
5. Mode Architecture (MovingAverage, RMS, Variance, ZScoreNormalize)
- Batch Mode: Stateless processing, computes over entire input
- Moving Mode: Stateful processing with sliding window continuity
This separation enables:
- ✅ Unit testing of C++ algorithms independently
- ✅ Reuse of core DSP code in other projects
- ✅ Type-safe TypeScript API with IntelliSense
- ✅ Zero-copy performance through N-API
- ✅ Distributed processing with Redis state sharing
6. Native C++ Backend
- N-API Bindings: Direct TypedArray access for zero-copy processing
- Async Processing: Uses
Napi::AsyncWorkerto avoid blocking the event loop - Optimized Data Structures: Circular buffers with O(1) operations
- Template-Based: Generic implementation supports int, float, double
7. Redis State Persistence
The state serialization includes:
- Circular buffer contents: All samples in order (oldest to newest)
- Running sums/squares: Maintained for O(1) calculations (moving average uses
runningSum, RMS usesrunningSumOfSquares) - Per-channel state: Independent state for each audio channel
- Metadata: Window size, channel count, timestamps, filter type
State format examples:
Moving Average state:
{
"timestamp": 1761156820,
"stages": [
{
"index": 0,
"type": "movingAverage",
"state": {
"windowSize": 3,
"numChannels": 1,
"channels": [
{
"buffer": [3, 4, 5],
"runningSum": 12
}
]
}
}
],
"stageCount": 1
}RMS state:
{
"timestamp": 1761168608,
"stages": [
{
"index": 0,
"type": "rms",
"state": {
"windowSize": 3,
"numChannels": 1,
"channels": [
{
"buffer": [6, -7, 8],
"runningSumOfSquares": 149
}
]
}
}
],
"stageCount": 1
}8. Multi-Channel Processing
Each channel maintains its own independent filter state:
// 4-channel interleaved data: [ch1, ch2, ch3, ch4, ch1, ch2, ...]
const input = new Float32Array(4000); // 1000 samples × 4 channels
const pipeline = createDspPipeline();
pipeline.MovingAverage({ windowSize: 50 });
const output = await pipeline.process(input, {
sampleRate: 2000,
channels: 4,
});
// Each channel has its own circular buffer and running sum📊 Comparison with Alternatives
| Feature | dspx | scipy/numpy | dsp.js | Web Audio API | | ------------------ | ----------------- | ------------------- | ---------- | --------------- | | TypeScript Support | ✅ Native | ❌ Python-only | ⚠️ Partial | ✅ Browser-only | | Performance | ⚡⚡⚡ Native C++ | ⚡⚡⚡⚡ | ⚡ Pure JS | ⚡⚡⚡ | | State Persistence | ✅ Redis | ❌ Manual | ❌ None | ❌ None | | Multi-Channel | ✅ Built-in | ✅ NumPy arrays | ⚠️ Limited | ✅ AudioBuffer | | Node.js Backend | ✅ Designed for | ❌ Context switch | ✅ Yes | ❌ Browser | | Observability | ✅ Callbacks | ❌ Print statements | ❌ None | ⚠️ Limited |
🎯 Is This Library Right For You?
✅ Great Fit For:
- Node.js backends processing real-time biosignals (EMG, EEG, ECG)
- Audio streaming services applying filters at scale
- IoT gateways processing sensor data
- Distributed signal processing across multiple workers
- Teams wanting native C++ performance without leaving TypeScript
❌ Not Ideal For:
- Browser-only applications (use Web Audio API)
- Python-based ML pipelines (use SciPy/NumPy)
- Hard real-time embedded systems (use bare C/C++)
- Ultra-low latency (<1ms) requirements (Redis adds ~1-5ms)
⚡ Performance at a Glance
| Operation | Throughput | Production-Ready | | -------------------------------- | ---------------- | --------------------------------- | | Native processing (no callbacks) | 22M samples/sec | ✅ Maximum performance | | Batched callbacks | 3.2M samples/sec | ✅ Recommended for production | | Individual callbacks | 6.1M samples/sec | ⚠️ Development/debugging only |
SIMD Acceleration: Batch operations and rectification are 2-8x faster with AVX2/SSE2/NEON on x86_64. See SIMD_OPTIMIZATIONS.md for details.
Recommendation: Use batched callbacks in production. Individual callbacks benchmark faster but block the Node.js event loop and can't integrate with real telemetry systems (Datadog, Loki).
🎯 FFT Performance Note
Pipeline vs Direct Processing:
The DSP pipeline adds ~0.3-0.4ms overhead per operation for:
- Channel management and stage chaining
- N-API boundary crossings
- Output buffer allocation
For maximum FFT performance (when you don't need multi-stage processing or Redis persistence), use the direct FftProcessor:
// ⚡ FASTER: Direct FFT (no pipeline overhead)
import { FftProcessor } from "dspx";
const processor = new FftProcessor(1024);
const output = processor.rfft(signal); // ~4-14x faster than fft.js
// 🔧 FLEXIBLE: Pipeline FFT (with overhead)
const pipeline = createDspPipeline();
pipeline.fft({ size: 1024, type: "rfft" });
const result = pipeline.process(signal); // Still faster than pure JS, but ~0.3ms overheadWhen to use each:
- FftProcessor: Single FFT operations, maximum speed, batch processing
- Pipeline: Multi-stage processing (filter→FFT→analysis), Redis state, complex workflows
With recent optimizations (loop unrolling, memcpy, single-channel fast paths), dspx FFT is now 4-14x faster than fft.js across all sizes when measured correctly. See FFT_BENCHMARK_FIX.md for benchmark methodology.
⚠️ ARM/Mobile Platform Notice
ARM NEON optimizations are experimental. Mobile/ARM devices face unique challenges:
- Thermal throttling - Performance degrades under sustained load
- Power constraints - Aggressive frequency scaling impacts benchmarks
- Memory hierarchy - Different cache behavior than x86_64 desktop CPUs
Current Status:
- ✅ Moving mode (streaming) - FIR filters use optimized circular buffers with NEON
- ⚠️ Batch mode - May not show speedup vs. scalar code on mobile devices
- ⚠️ FFT operations - Naive implementations may outperform native on some ARM chips
We welcome contributions! If you have ARM optimization expertise or access to ARM development hardware, please open an issue or PR. Mobile DSP is challenging and community input helps improve real-world performance.
📖 Read more: ARM Platform Status & Performance Notes
📦 Installation
npm install dspxSmart Installation
dspx uses an intelligent installation strategy for optimal performance and ease of use:
x64/x86 Architectures (Intel/AMD): Uses prebuilt binaries
- ✅ No C++ compiler needed
- ⚡ Fast installation (~5 seconds)
- 🎯 Works out of the box
ARM Architectures (Apple Silicon, Raspberry Pi, AWS Graviton): Compiles locally
- 🔧 Requires C++ build tools (see below)
- 🚀 Optimized for your specific ARM CPU
- 💪 Better performance than generic prebuilds
Build Tools (ARM only)
If you're on ARM architecture, install build tools first:
macOS (Apple Silicon M1/M2/M3):
xcode-select --installLinux (Raspberry Pi, AWS Graviton):
sudo apt-get update
sudo apt-get install build-essentialWindows (ARM64):
npm install --global windows-build-toolsSupported Platforms
- ✅ Node.js 18, 20, 22 (LTS versions)
- ✅ Windows x64, Linux x64, macOS x64
- ✅ macOS arm64 (Apple Silicon)
- ✅ Linux arm64 (AWS Graviton, Raspberry Pi 4+)
- ✅ Linux armv7 (Raspberry Pi 3)
🚀 Quick Start
Optimal FIR Filters (NEW - 30-50% Faster!)
Use pre-computed Parks-McClellan optimal coefficients for maximum performance:
import { FirFilter } from "dspx";
import {
OPTIMAL_LOWPASS_COEFFS,
getPowerLineNotch,
} from "dspx/optimal-fir-tables";
// Method 1: Pre-computed optimal lowpass (87 taps vs 128 traditional = 32% faster)
const filter = new FirFilter(OPTIMAL_LOWPASS_COEFFS.cutoff_0_2, true);
const filtered = filter.process(signal);
// Method 2: Power line interference removal (60 Hz notch)
const notch = new FirFilter(getPowerLineNotch(1000, 60), true);
const clean = notch.process(noisySignal);
// Method 3: Use in pipeline with Convolution stage
import { createDspPipeline, Convolution } from "dspx";
const pipeline = createDspPipeline();
pipeline.addStage(
new Convolution({ kernel: OPTIMAL_LOWPASS_COEFFS.cutoff_0_2 })
);
// ✅ Zero Python dependency - coefficients ship with the library!
// ✅ 30-50% fewer operations than window-based designs
// ✅ Your SIMD optimizations (AVX2/SSE2/NEON) automatically benefit📖 Complete Optimal FIR Guide →
Basic Usage (Sample-Based)
import { createDspPipeline } from "dspx";
// Create a processing pipeline
const pipeline = createDspPipeline();
// Add filters using method chaining
pipeline.MovingAverage({ mode: "moving", windowSize: 100 });
// Process samples (modifies input in-place for performance)
const input = new Float32Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const output = await pipeline.process(input, {
sampleRate: 2000,
channels: 1,
});
console.log(output); // Smoothed signalNEW: Time-Series Processing with Timestamps
import { createDspPipeline } from "dspx";
// Create a pipeline with time-based window
const pipeline = createDspPipeline();
pipeline.MovingAverage({ mode: "moving", windowDuration: 5000 }); // 5 seconds
// Process data with explicit timestamps (e.g., from IoT sensors with network jitter)
const samples = new Float32Array([1.2, 3.4, 2.1, 4.5, 3.3]);
const timestamps = new Float32Array([0, 100, 250, 400, 500]); // milliseconds
const smoothed = await pipeline.process(samples, timestamps, { channels: 1 });
console.log(smoothed); // Time-aware smoothing📚 Complete Time-Series Guide →
Processing Without Modifying Input
// Use processCopy to preserve the original input
const input = new Float32Array([1, 2, 3, 4, 5]);
const output = await pipeline.processCopy(input, {
sampleRate: 2000,
channels: 1,
});
console.log(input); // [1, 2, 3, 4, 5] - unchanged
console.log(output); // [1, 1.5, 2, 3, 4] - smoothedLow state serialization overhead
import { createDspPipeline } from "dspx";
import { v4 as uuidv4 } from "uuid";
import { createClient } from "redis";
let redis;
let redisAvailable = false;
try {
redis = createClient({ url: "redis://localhost:6379" });
await redis.connect();
await redis.ping();
redisAvailable = true;
} catch {}
const stateKey = `dspx-state-${uuidv4()}`;
const pipeline = createDspPipeline();
pipeline
.MovingAverage({ mode: "batch", windowSize: 10, windowDuration: 2 })
.Rms({ mode: "batch", windowSize: 10, windowDuration: 2 })
.Rectify({ mode: "half" });
await pipeline.process(new Float32Array([1, -1, 2, -2, 3, -3]), {
channels: 1,
});
const serialized = await pipeline.saveState();
if (redisAvailable) await redis.set(stateKey, serialized);
const pipeline2 = createDspPipeline();
pipeline2
.MovingAverage({ mode: "batch", windowSize: 10, windowDuration: 2 })
.Rms({ mode: "batch", windowSize: 10, windowDuration: 2 })
.Rectify({ mode: "half" });
await pipeline2.loadState(serialized);
const output = await pipeline2.process(new Float32Array([4, -4, 5, -5]), {
channels: 1,
});
console.log(output);
if (redisAvailable) {
redis.destroy();
console.log("\n✓ Disconnected from Redis");
}Multi-Channel Processing
import { createDspPipeline } from "dspx";
const pipeline = createDspPipeline();
pipeline.MovingAverage({ windowSize: 50 });
// Process 4-channel EMG data
// Data format: [ch1_s1, ch2_s1, ch3_s1, ch4_s1, ch1_s2, ch2_s2, ...]
const fourChannelData = new Float32Array(4000); // 1000 samples × 4 channels
const output = await pipeline.process(fourChannelData, {
sampleRate: 2000,
channels: 4, // Each channel maintains its own filter state
});
// Each channel is processed independently with its own circular buffer🔗 API Reference
Creating a Pipeline
const pipeline = createDspPipeline();Process Methods
The library supports three processing modes:
// Mode 1: Legacy sample-based (auto-generates timestamps from sample rate)
await pipeline.process(samples: Float32Array, options: {
sampleRate: number; // Hz (required for timestamp generation)
channels: number;
});
// Mode 2: Time-based with explicit timestamps
await pipeline.process(samples: Float32Array, timestamps: Float32Array, options: {
channels: number; // sampleRate not needed
});
// Mode 3: Auto-sequential (no sample rate, generates [0,1,2,3,...])
await pipeline.process(samples: Float32Array, options: {
channels: number; // sampleRate omitted
});See Time-Series Guide for detailed examples.
Available Filters
Currently Implemented
Matrix Analysis (PCA, ICA, Whitening)
// 1. Training Phase: Calculate transformation matrices (batch)
import { calculatePca, calculateIca, calculateWhitening } from "dsp-ts-redis";
const pca = calculatePca(trainingData, numChannels);
const ica = calculateIca(mixedSignals, numChannels);
const whitening = calculateWhitening(correlatedData, numChannels);
// 2. Application Phase: Apply transforms in pipeline (real-time)
pipeline
.PcaTransform({
pcaMatrix: pca.pcaMatrix,
mean: pca.mean,
numChannels,
numComponents,
})
.IcaTransform({
icaMatrix: ica.icaMatrix,
mean: ica.mean,
numChannels,
numComponents,
})
.WhiteningTransform({
whiteningMatrix: whitening.whiteningMatrix,
mean,
numChannels,
numComponents,
});Matrix analysis algorithms for dimensionality reduction, blind source separation, and decorrelation:
| Algorithm | Purpose | Use Cases | | ---------------------------------------- | ------------------------------------------ | ----------------------------------------------------------------------- | | PCA (Principal Component Analysis) | Find directions of maximum variance | Dimensionality reduction (8ch→3ch), noise filtering, feature extraction | | ICA (Independent Component Analysis) | Separate statistically independent sources | EEG artifact removal, cocktail party problem, EMG decomposition | | Whitening (ZCA) | Transform to identity covariance | ICA preprocessing, feature normalization, decorrelation |
Architecture: Train-then-apply model
- Training Phase (batch): Compute transformation matrices from representative dataset
- Application Phase (real-time): Apply fixed matrices in pipeline for streaming data
Features:
- Eigen C++ library: Fast eigenvalue decomposition and matrix operations
- Dimensionality reduction: Extract top N principal components (e.g., 8 channels → 3 components)
- ICA convergence tracking: Monitor FastICA iterations and convergence status
- Regularization support: Whitening with configurable regularization parameter
- Multi-channel support: Process multi-channel biosignals, audio, sensor data
Quick Example - EEG Artifact Removal:
// Load 4-channel EEG with eye blink artifacts
const eegData = loadEegData(); // Float32Array, 10000 samples × 4 channels
// Train ICA to separate brain signals from artifacts
const ica = calculateIca(eegData, 4);
console.log(`Converged: ${ica.converged} in ${ica.iterations} iterations`);
// Apply ICA in real-time pipeline
const pipeline = createDspPipeline();
pipeline.IcaTransform({
icaMatrix: ica.icaMatrix,
mean: ica.mean,
numChannels: 4,
numComponents: 4,
});
const cleanEeg = await pipeline.process(streamingEeg, { channels: 4 });Performance:
- Training phase: O(C² × N) for PCA/Whitening, O(C² × N × I) for ICA
- Application phase: O(C²) per sample (matrix-vector multiplication)
- Typical training: 2-100ms for 4-8 channels with 1000-10000 samples
- Real-time processing: >100k samples/sec for 8 channels
📖 Full Guide: See docs/MATRIX_ANALYSIS_GUIDE.md for comprehensive examples, API reference, and real-world applications.
Moving Average Filter
// Batch Mode - Stateless, computes average over entire input
pipeline.MovingAverage({ mode: "batch" });
// Moving Mode - Stateful, sliding window with continuity
pipeline.MovingAverage({ mode: "moving", windowSize: number });
// NEW: Time-Based Window
pipeline.MovingAverage({ mode: "moving", windowDuration: number }); // millisecondsImplements a simple moving average (SMA) filter with two modes:
Modes:
| Mode | Description | State | Output | Use Case |
| ---------- | ---------------------------------- | --------- | ------------------------------------------- | ----------------------------------- |
| "batch" | Computes average over entire input | Stateless | All samples have same value (mean of input) | Quality metrics, summary statistics |
| "moving" | Sliding window across samples | Stateful | Each sample smoothed by window | Real-time smoothing, trend analysis |
Parameters:
mode:"batch"or"moving"- determines computation strategywindowSize: Number of samples to average over (optional for moving mode)windowDuration: Time-based window in milliseconds (optional for moving mode)- At least one of
windowSizeorwindowDurationmust be specified for moving mode
Features:
- Batch mode: O(n) computation, no state between calls
- Moving mode: O(1) per sample with circular buffer and running sum
- Per-channel state for multi-channel processing
- Full state serialization to Redis
Example:
// Batch mode: Average of [1,2,3,4,5] = 3
const batch = createDspPipeline().MovingAverage({ mode: "batch" });
const result1 = await batch.process(new Float32Array([1, 2, 3, 4, 5]));
// result1 = [3, 3, 3, 3, 3]
// Moving mode: Window size 3
const moving = createDspPipeline().MovingAverage({
mode: "moving",
windowSize: 3,
});
const result2 = await moving.process(new Float32Array([1, 2, 3, 4, 5]));
// result2 = [1, 1.5, 2, 3, 4] (sliding window averages)Use cases:
- Batch: Quality control metrics, batch statistics, data summarization
- Moving: Signal smoothing, noise reduction, trend analysis, low-pass filtering
Convolution Filter
// Batch Mode - Process entire buffer with kernel
pipeline.Convolution({ mode: "batch", kernel: Float32Array });
// Moving Mode - Stateful streaming convolution
pipeline.Convolution({
mode: "moving",
kernel: Float32Array,
method: "auto" | "direct" | "fft", // auto (default): switches at kernel size 64
});Implements high-performance 1D convolution with automatic FFT acceleration for large kernels. Supports arbitrary kernels for filtering, edge detection, feature extraction, and more.
Modes:
| Mode | Description | State | Output | Use Case |
| ---------- | ---------------------------------- | --------- | --------------------------------- | ---------------------------------------- |
| "batch" | Convolve kernel with entire buffer | Stateless | Full convolution of input | Offline processing, batch filtering |
| "moving" | Stateful streaming convolution | Stateful | Overlap-add with state continuity | Real-time filtering, streaming workloads |
Parameters:
mode:"batch"or"moving"- determines computation strategykernel: Float32Array - convolution kernel/filter coefficientsmethod(optional):"auto"(default),"direct", or"fft""auto": Uses direct SIMD for kernel ≤ 64, FFT for kernel > 64"direct": Always use direct convolution (SIMD-accelerated)"fft": Always use FFT-based convolution (overlap-add for moving mode)
Features:
- Automatic FFT acceleration: Auto-switches at kernel size 64 for optimal performance
- SIMD optimization: AVX2/SSE2 accelerated direct convolution (4-8x throughput)
- Zero allocations: Pre-allocated buffers eliminate per-call overhead
- Cache-friendly: De-interleaved processing for multi-channel signals
- Overlap-add: Efficient chunked processing for moving mode FFT
- Per-channel state: Independent convolution state for each channel
Performance Characteristics (Moving Mode):
| Kernel Size | Method Used | Throughput (samples/sec) | vs Naive JS | | ----------- | ------------- | ------------------------ | --------------------------------------- | | K=8 | Direct (SIMD) | ~19M | Naive JS faster (110M) - see note below | | K=32 | Direct (SIMD) | ~12M | Naive JS faster (36M) - see note below | | K=64 | Direct (SIMD) | ~7M | Naive JS faster (18M) - threshold point | | K=128 | FFT (overlap) | 82M | 8.5x faster than naive JS (9.6M) | | K=256 | FFT (overlap) | 77M | 15x faster than naive JS (5M) |
Example:
// Batch mode: Simple edge detection kernel
const edgeKernel = new Float32Array([-1, 0, 1]); // Horizontal gradient
const batch = createDspPipeline().Convolution({
mode: "batch",
kernel: edgeKernel,
});
const edges = await batch.process(signal);
// Moving mode: Gaussian smoothing (auto FFT for K=7)
const gaussianKernel = new Float32Array([0.06, 0.24, 0.4, 0.24, 0.06]);
const moving = createDspPipeline().Convolution({
mode: "moving",
kernel: gaussianKernel,
method: "auto", // Uses direct SIMD (K < 64)
});
const smoothed = await moving.process(noisySignal);
// Large kernel: Automatic FFT acceleration (K=128)
const largeKernel = new Float32Array(128).fill(1 / 128); // Moving average K=128
const fftConv = createDspPipeline().Convolution({
mode: "moving",
kernel: largeKernel,
method: "auto", // Auto-switches to FFT for 9x speedup
});
const filtered = await fftConv.process(data);Use Cases:
- Signal Processing: Custom FIR filters, bandpass/bandstop filters, matched filtering
- Computer Vision: Edge detection (Sobel, Prewitt), Gaussian blur, sharpening
- Feature Extraction: Wavelet convolution, pattern matching, template detection
- Audio Processing: Impulse response convolution, reverb, echo effects
- Time Series: Smoothing, differentiation, integration kernels
🚀 Performance Tip - Optimal FIR Filters:
Get 30-50% better performance using pre-computed Parks-McClellan coefficients that ship with the library:
// Method 1: Use pre-computed optimal coefficients (recommended)
import { FirFilter } from "dsp-ts-redis";
import { OPTIMAL_LOWPASS_COEFFS } from "dsp-ts-redis/optimal-fir-tables";
const filter = new FirFilter(OPTIMAL_LOWPASS_COEFFS.cutoff_0_2, true);
// ✅ 87 taps (optimal) vs 128 taps (window-based) = 32% faster!
// Method 2: Design custom optimal filters offline
// python scripts/design_optimal_fir.py --type lowpass --taps 87 --cutoff 0.2 --output lowpass.json
const coeffs = require("./lowpass.json");
const filter = new FirFilter(new Float32Array(coeffs), true);Available Pre-computed Filters:
- Lowpass: 4 filters (61-127 taps) covering cutoffs 0.1π-0.4π
- Highpass: 3 filters (81-129 taps) covering cutoffs 0.1π-0.3π
- Bandpass: 3 filters (73-101 taps) for narrow/moderate/wide bands
- Notch: 50/60 Hz power line interference removal
All filters provide 30-50% better performance than traditional window-based designs while maintaining equivalent frequency response quality.
📖 Complete guides:
- Pre-computed Optimal Coefficients Approach - Zero-dependency solution
- Parks-McClellan Custom Design - For non-standard filters
- C++ vs TS Storage Analysis - Architecture decisions
Mathematical Properties:
- Linearity: Conv(a·x₁ + b·x₂, h) = a·Conv(x₁, h) + b·Conv(x₂, h)
- Commutative: x ⊛ h = h ⊛ x
- Associative: (x ⊛ h₁) ⊛ h₂ = x ⊛ (h₁ ⊛ h₂)
- FFT Equivalence: Time-domain convolution = Frequency-domain multiplication
RMS (Root Mean Square) Filter
// Batch Mode - Stateless, computes RMS over entire input
pipeline.Rms({ mode: "batch" });
// Moving Mode - Stateful, sliding window with continuity
pipeline.Rms({ mode: "moving", windowSize: number });Implements an efficient RMS filter with two modes:
Modes:
| Mode | Description | State | Output | Use Case |
| ---------- | ------------------------------ | --------- | ------------------------------------------ | ----------------------------------- |
| "batch" | Computes RMS over entire input | Stateless | All samples have same value (RMS of input) | Power measurement, batch analysis |
| "moving" | Sliding window across samples | Stateful | Each sample is RMS of window | Envelope detection, real-time power |
Parameters:
mode:"batch"or"moving"- determines computation strategywindowSize: Number of samples to calculate RMS over (required for moving mode only)
Features:
- Batch mode: O(n) computation, no state between calls
- Moving mode: O(1) per sample with circular buffer and running sum of squares
- Per-channel state for multi-channel processing
- Full state serialization to Redis
- Always positive output (magnitude-based)
Example:
// Batch mode: RMS of [1, -2, 3, -4, 5] = sqrt((1² + 4 + 9 + 16 + 25)/5) = 3.31
const batch = createDspPipeline().Rms({ mode: "batch" });
const result1 = await batch.process(new Float32Array([1, -2, 3, -4, 5]));
// result1 = [3.31, 3.31, 3.31, 3.31, 3.31]
// Moving mode: Window size 3
const moving = createDspPipeline().Rms({ mode: "moving", windowSize: 3 });
const result2 = await moving.process(new Float32Array([1, -2, 3, -4, 5]));
// result2 = [1.0, 1.58, 2.16, 3.11, 4.08] - sliding window RMSUse cases:
- Batch: Overall signal power, quality metrics, batch statistics
- Moving: Signal envelope detection, amplitude tracking, power measurement, feature extraction
Mean Absolute Value (MAV) Filter
// Batch Mode - Stateless, computes MAV over entire input
pipeline.MeanAbsoluteValue({ mode: "batch" });
// Moving Mode - Stateful, sliding window with continuity
pipeline.MeanAbsoluteValue({ mode: "moving", windowSize: number });Implements an efficient Mean Absolute Value filter with two modes - commonly used in EMG signal analysis for muscle activity quantification.
Modes:
| Mode | Description | State | Output | Use Case |
| ---------- | ------------------------------ | --------- | ------------------------------------------ | ---------------------------------------- |
| "batch" | Computes MAV over entire input | Stateless | All samples have same value (MAV of input) | Global activity level, batch analysis |
| "moving" | Sliding window across samples | Stateful | Each sample is MAV of window | Real-time activity detection, transients |
Parameters:
mode:"batch"or"moving"- determines computation strategywindowSize: Number of samples to calculate MAV over (required for moving mode only)
Features:
- Batch mode: O(n) computation, no state between calls
- Moving mode: O(1) per sample with circular buffer and running sum of absolute values
- Per-channel state for multi-channel EMG processing
- Full state serialization to Redis
- Always non-negative output
- Scale-invariant: MAV(k·x) = k·MAV(x)
Example:
// Batch mode: MAV of [1, -2, 3, -4, 5] = (|1| + |-2| + |3| + |-4| + |5|)/5 = 3.0
const batch = createDspPipeline().MeanAbsoluteValue({ mode: "batch" });
const result1 = await batch.process(new Float32Array([1, -2, 3, -4, 5]));
// result1 = [3.0, 3.0, 3.0, 3.0, 3.0]
// Moving mode: Window size 3
const moving = createDspPipeline().MeanAbsoluteValue({
mode: "moving",
windowSize: 3,
});
const result2 = await moving.process(new Float32Array([1, -2, 3, -4, 5]));
// result2 = [1.0, 1.5, 2.0, 3.0, 4.0] - sliding window MAVUse cases:
- EMG Analysis: Muscle activity quantification, fatigue detection, gesture recognition
- Vibration Monitoring: Equipment health monitoring, anomaly detection
- Audio Processing: Envelope detection, dynamic range analysis
- Batch: Overall signal activity level, quality metrics
- Moving: Real-time transient detection, activity onset/offset detection, prosthetic control
Mathematical Properties:
- Non-negative: MAV(x) ≥ 0 for all signals
- Scale-invariant: MAV(k·x) = k·MAV(x)
- Bounded: MAV(x) ≤ max(|x|) for any window
- Reduces to mean: For all positive signals, MAV = mean
Rectify Filter
pipeline.Rectify(params?: { mode?: "full" | "half" });Implements signal rectification with two modes: full-wave (absolute value) and half-wave (zero out negatives). Stateless operation with no internal buffers.
Parameters:
mode(optional): Rectification mode"full"(default): Full-wave rectification (absolute value) - converts all samples to positive"half": Half-wave rectification - positive samples unchanged, negative samples → 0
Features:
- Zero overhead - simple sample-by-sample transformation
- No internal state/buffers (stateless)
- Mode is serializable for pipeline persistence
- Works independently on each sample (no windowing)
Use cases:
- EMG signal pre-processing before envelope detection
- AC to DC conversion in audio/biosignal processing
- Preparing signals for RMS or moving average smoothing
- Feature extraction requiring positive-only values
Examples:
// Full-wave rectification (default) - converts to absolute value
const pipeline1 = createDspPipeline();
pipeline1.Rectify(); // or Rectify({ mode: "full" })
const bipolar = new Float32Array([1, -2, 3, -4, 5]);
const fullWave = await pipeline1.process(bipolar, {
sampleRate: 1000,
channels: 1,
});
console.log(fullWave); // [1, 2, 3, 4, 5] - all positive
// Half-wave rectification - zeros out negatives
const pipeline2 = createDspPipeline();
pipeline2.Rectify({ mode: "half" });
const halfWave = await pipeline2.process(new Float32Array([1, -2, 3, -4, 5]), {
sampleRate: 1000,
channels: 1,
});
console.log(halfWave); // [1, 0, 3, 0, 5] - negatives become zero
// Common pipeline: Rectify → RMS for EMG envelope
const emgPipeline = createDspPipeline();
emgPipeline
.Rectify({ mode: "full" }) // Convert to magnitude
.Rms({ windowSize: 50 }); // Calculate envelopeVariance Filter
pipeline.Variance(params: { mode: "batch" | "moving"; windowSize?: number });Implements variance calculation to measure data spread and variability. Supports both stateless batch variance and stateful moving variance with a sliding window.
Parameters:
mode: Variance calculation mode"batch": Stateless - computes variance over entire batch, all output samples contain the same value"moving": Stateful - computes variance over a sliding window, maintains state across process() calls
windowSize: Required for"moving"mode - size of the sliding window
Features:
- Batch mode: O(1) space complexity, processes entire batch in two passes
- Moving mode: O(1) per-sample computation using circular buffer with running sums
- Maintains running sum and running sum of squares for efficient calculation
- Per-channel state for multi-channel processing
- Full state serialization to Redis including buffer contents and running values
- Variance is always non-negative (uses max(0, calculated) to handle floating-point errors)
Mathematical Note:
Variance is calculated as: Var(X) = E[X²] - (E[X])²
Where:
E[X]is the mean (average) of valuesE[X²]is the mean of squared values
Use cases:
- Signal quality monitoring (detect signal stability)
- Anomaly detection (identify unusual variability)
- Feature extraction for machine learning (EMG, EEG analysis)
- Real-time variability tracking in biosignals
- Data consistency validation in sensor streams
Examples:
// Batch variance - stateless, entire batch → single variance value
const pipeline1 = createDspPipeline();
pipeline1.Variance({ mode: "batch" });
const data = new Float32Array([1, 2, 3, 4, 5]);
const output1 = await pipeline1.process(data, {
sampleRate: 1000,
channels: 1,
});
console.log(output1); // [2, 2, 2, 2, 2] - all values are the batch variance
// Moving variance - stateful, sliding window
const pipeline2 = createDspPipeline();
pipeline2.Variance({ mode: "moving", windowSize: 3 });
const output2 = await pipeline2.process(data, {
sampleRate: 1000,
channels: 1,
});
console.log(output2);
// [0, 0.25, 0.67, 0.67, 0.67] - variance evolves as window slides
// Window: [1] → [1,2] → [1,2,3] → [2,3,4] → [3,4,5]
// EMG variability monitoring pipeline
const emgPipeline = createDspPipeline()
.Rectify({ mode: "full" }) // Convert to magnitude
.Variance({ mode: "moving", windowSize: 100 }); // Track variability
// Multi-channel signal quality monitoring
const qualityPipeline = createDspPipeline().Variance({ mode: "batch" });
const fourChannelData = new Float32Array(4000); // 1000 samples × 4 channels
const variances = await qualityPipeline.process(fourChannelData, {
sampleRate: 2000,
channels: 4,
});
// Each channel gets its own variance valueBatch vs Moving Mode:
| Mode | State | Output | Use Case |
| -------- | --------- | ----------------------------- | ------------------------------ |
| batch | Stateless | All samples = single variance | Per-chunk quality assessment |
| moving | Stateful | Variance per sample (sliding) | Real-time variability tracking |
Performance:
- Batch mode: Two-pass algorithm (sum calculation, then variance), O(n) time
- Moving mode: Single-pass with circular buffer, O(1) per sample after warmup
- State persistence includes full circular buffer + running sums (can be large for big windows)
Z-Score Normalization Filter
pipeline.ZScoreNormalize(params: {
mode: "batch" | "moving";
windowSize?: number;
epsilon?: number;
});Implements Z-Score normalization to standardize data to have mean 0 and standard deviation 1. Supports both stateless batch normalization and stateful moving normalization with a sliding window.
Z-Score Formula: (Value - Mean) / StandardDeviation
Parameters:
mode: Normalization calculation mode"batch": Stateless - computes mean and stddev over entire batch, normalizes all samples"moving": Stateful - computes mean and stddev over sliding window, maintains state across process() calls
windowSize: Required for"moving"mode - size of the sliding windowepsilon: Small value to prevent division by zero when standard deviation is 0 (default:1e-6)
Features:
- Batch mode: Normalizes entire dataset to mean=0, stddev=1 using global statistics
- Moving mode: Adaptive normalization using local window statistics
- Maintains running sum and running sum of squares for O(1) mean/stddev calculation
- Per-channel state for multi-channel processing
- Full state serialization including buffer contents and running values
- Epsilon handling prevents NaN when processing constant signals
Use cases:
- Machine learning preprocessing (feature standardization)
- Anomaly detection (outlier identification using ±3σ thresholds)
- Neural network input normalization
- EEG/EMG signal standardization for multi-channel processing
- Real-time data stream normalization with adaptive statistics
- Removing baseline drift and amplitude variations
Examples:
// Batch normalization - standardize entire dataset
const pipeline1 = createDspPipeline();
pipeline1.ZScoreNormalize({ mode: "batch" });
const data = new Float32Array([10, 20, 30, 40, 50]);
const output1 = await pipeline1.process(data, {
sampleRate: 1000,
channels: 1,
});
// All samples normalized to mean=0, stddev=1
// Output: [-1.414, -0.707, 0, 0.707, 1.414] (approximately)
// Moving normalization - adaptive standardization
const pipeline2 = createDspPipeline();
pipeline2.ZScoreNormalize({ mode: "moving", windowSize: 50 });
const streamData = new Float32Array(200); // Continuous stream
const output2 = await pipeline2.process(streamData, {
sampleRate: 1000,
channels: 1,
});
// Each sample normalized using local window statistics
// Adapts to changes in mean and variance over time
// Anomaly detection with z-score thresholds
const pipeline3 = createDspPipeline();
pipeline3.ZScoreNormalize({ mode: "moving", windowSize: 100 });
const sensorData = new Float32Array(500);
const zScores = await pipeline3.process(sensorData, {
sampleRate: 100,
channels: 1,
});
// Detect outliers (z-score > ±3 indicates anomaly)
const anomalies = [];
for (let i = 0; i < zScores.length; i++) {
if (Math.abs(zScores[i]) > 3.0) {
anomalies.push({ index: i, zScore: zScores[i] });
}
}
// Multi-channel EEG normalization
const eegPipeline = createDspPipeline();
eegPipeline.ZScoreNormalize({ mode: "moving", windowSize: 128 });
const fourChannelEEG = new Float32Array(2048); // 512 samples × 4 channels
const normalizedEEG = await eegPipeline.process(fourChannelEEG, {
sampleRate: 256,
channels: 4,
});
// Each EEG channel independently normalized to mean=0, stddev=1
// Custom epsilon for near-constant signals
const pipeline4 = createDspPipeline();
pipeline4.ZScoreNormalize({ mode: "batch", epsilon: 0.1 });
const nearConstant = new Float32Array([5.0, 5.001, 4.999, 5.0]);
const output4 = await pipeline4.process(nearConstant, {
sampleRate: 1000,
channels: 1,
});
// When stddev < epsilon, output is 0 (prevents division by tiny numbers)Batch vs Moving Mode:
| Mode | State | Output | Use Case |
| -------- | --------- | ----------------------------------------------- | --------------------------------------------------- |
| batch | Stateless | All samples normalized using global mean/stddev | ML preprocessing, dataset standardization |
| moving | Stateful | Each sample normalized using local window stats | Real-time anomaly detection, adaptive normalization |
Performance:
- Batch mode: Two-pass algorithm (calculate mean, then stddev, then normalize), O(n) time
- Moving mode: Single-pass with circular buffer, O(1) per sample after warmup
- Anomaly detection: O(1) threshold comparison after normalization
- State persistence includes full circular buffer + running sums
Mathematical Properties:
- Normalized data has mean = 0 (centered)
- Normalized data has standard deviation = 1 (scaled)
- Z-scores > ±3 represent outliers (>99.7% of data within ±3σ in normal distribution)
- Preserves relative distances and relationships in the data
- Linear transformation (reversible if original mean/stddev are known)
Waveform Length (WL) Filter
pipeline.WaveformLength({ windowSize: number });Implements waveform length calculation to measure the total path length traveled by a signal over a sliding window. WL is the sum of absolute differences between consecutive samples, commonly used in EMG signal analysis.
Parameters:
windowSize: Number of samples for the sliding window
Features:
- O(1) per-sample computation using circular buffer with running sum of differences
- Tracks previous sample to calculate absolute difference
- Per-channel state for multi-channel EMG processing
- Full state serialization including buffer and previous sample value
- Always non-negative output
- Sensitive to signal complexity and frequency content
Mathematical Definition:
For window of samples [x₁, x₂, ..., xₙ]:
WL = Σ|xᵢ₊₁ - xᵢ| for i = 1 to n-1
Use cases:
- EMG signal analysis and muscle activity quantification
- Detecting signal complexity and variability
- Gesture recognition and prosthetic control
- Fatigue detection (WL decreases with muscle fatigue)
- Feature extraction for classification algorithms
- Vibration monitoring and equipment diagnostics
Example:
// Basic waveform length computation
const pipeline = createDspPipeline();
pipeline.WaveformLength({ windowSize: 100 });
const signal = new Float32Array([1, 3, 2, 5, 4, 6]);
// Differences: |3-1|=2, |2-3|=1, |5-2|=3, |4-5|=1, |6-4|=2
// Cumulative WL: 0, 2, 3, 6, 7, 9
const result = await pipeline.process(signal, {
sampleRate: 1000,
channels: 1,
});
// result = [0, 2, 3, 6, 7, 9]
// EMG feature extraction pipeline
const emgPipeline = createDspPipeline();
emgPipeline
.Rectify({ mode: "full" }) // Convert to magnitude
.WaveformLength({ windowSize: 250 }); // 250ms window at 1kHz
// Multi-channel muscle monitoring
const multiChannelEMG = new Float32Array(4000); // 1000 samples × 4 channels
const wlFeatures = await emgPipeline.process(multiChannelEMG, {
sampleRate: 1000,
channels: 4,
});
// Each muscle gets independent WL calculationComparison with other features:
| Feature | Measures | Sensitivity | Use Case | | ------- | --------------------------- | ------------------ | ---------------------- | | WL | Total path length | High to complexity | Complexity/variability | | MAV | Average magnitude | Amplitude level | Activity level | | RMS | Root mean square | Power/energy | Signal strength | | SSC | Slope direction changes | Frequency content | Oscillation rate | | WAMP | Amplitude threshold crosses | Burst detection | Transient events |
Mathematical Properties:
- Non-negative: WL ≥ 0 for all signals
- Monotonic: WL increases with window filling
- Frequency-dependent: Higher frequency → higher WL for same amplitude
- Scale-variant: WL(k·x) = k·WL(x) for constant k
Slope Sign Change (SSC) Filter
pipeline.SlopeSignChange({ windowSize: number; threshold?: number });Implements slope sign change counting to measure frequency content by detecting how many times the signal changes direction (slope changes from positive to negative or vice versa). Commonly used in EMG analysis.
Parameters:
windowSize: Number of samples for the sliding windowthreshold: Minimum absolute difference to count as significant (default: 0)
Features:
- Counts direction changes (increasing→decreasing or decreasing→increasing)
- Threshold filtering to ignore noise and minor fluctuations
- Requires 2 previous samples for slope calculation
- O(1) per-sample computation using circular buffer
- Per-channel state for multi-channel processing
- Full state serialization including filter state (previous 2 samples, init count)
Mathematical Definition:
A slope sign change occurs at sample xᵢ if:
[(xᵢ - xᵢ₋₁) × (xᵢ - xᵢ₊₁)] ≥ threshold²
Where xᵢ₋₁ and xᵢ₊₁ are the previous and next samples.
Use cases:
- EMG frequency content analysis
- Detecting oscillations and vibrations
- Gesture recognition based on movement patterns
- Signal complexity measurement
- Muscle contraction rate estimation
- Feature extraction for pattern recognition
Example:
// Basic SSC with zero threshold
const pipeline = createDspPipeline();
pipeline.SlopeSignChange({ windowSize: 100, threshold: 0 });
const signal = new Float32Array([1, 3, 2, 4, 3, 5]);
// Slopes: + - + - +
// Sign changes at indices: 2, 3, 4, 5
const result = await pipeline.process(signal, {
sampleRate: 1000,
channels: 1,
});
// result = [0, 0, 1, 2, 3, 4]
// SSC with noise threshold
const filteredPipeline = createDspPipeline();
filteredPipeline.SlopeSignChange({ windowSize: 200, threshold: 0.1 });
const noisySignal = new Float32Array(500);
// Only counts sign changes where |difference| > 0.1
const sscCount = await filteredPipeline.process(noisySignal, {
sampleRate: 1000,
channels: 1,
});
// Multi-channel EMG analysis
const emgPipeline = createDspPipeline();
emgPipeline
.Rectify({ mode: "full" })
.SlopeSignChange({ windowSize: 250, threshold: 0.05 });
const fourChannelEMG = new Float32Array(4000);
const sscFeatures = await emgPipeline.process(fourChannelEMG, {
sampleRate: 1000,
channels: 4,
});Interpretation:
- Low SSC: Smooth, slow-changing signal (low frequency)
- High SSC: Rapidly oscillating signal (high frequency)
- Zero SSC: Monotonic signal (constantly increasing or decreasing)
- Threshold effect: Higher threshold → fewer counted changes (filters noise)
Mathematical Properties:
- Integer-valued: SSC is always a whole number (count)
- Non-negative: SSC ≥ 0
- Bounded: Max SSC = window_size - 2 (alternating signal)
- Frequency proxy: Approximately proportional to signal frequency
Willison Amplitude (WAMP) Filter
pipeline.WillisonAmplitude({ windowSize: number; threshold?: number });Implements Willison Amplitude calculation to count the number of times the absolute difference between consecutive samples exceeds a threshold. Useful for detecting burst activity and transient events in EMG signals.
Parameters:
windowSize: Number of samples for the sliding windowthreshold: Minimum absolute difference to count as significant (default: 0)
Features:
- Counts amplitude changes exceeding threshold
- Tracks previous sample for difference calculation
- O(1) per-sample computation using circular buffer
- Per-channel state for multi-channel processing
- Full state serialization including buffer and previous sample
- Sensitive to amplitude variations and burst activity
Mathematical Definition:
WAMP counts samples where:
|xᵢ - xᵢ₋₁| > threshold
Where xᵢ is the current sample and xᵢ₋₁ is the previous sample.
Use cases:
- EMG burst detection and muscle activation counting
- Transient event detection in sensor data
- Activity level quantification
- Feature extraction for gesture recognition
- Equipment vibration monitoring
- Signal quality assessment (detect dropouts/spikes)
Example:
// Basic WAMP with threshold
const pipeline = createDspPipeline();
pipeline.WillisonAmplitude({ windowSize: 100, threshold: 1.0 });
const signal = new Float32Array([0, 0.5, 2.5, 2.6, 1.0, 3.5]);
// Diffs: 0.5, 2.0, 0.1, -1.6, 2.5
// Exceeds: no, yes, no, yes, yes
const result = await pipeline.process(signal, {
sampleRate: 1000,
channels: 1,
});
// result = [0, 0, 1, 1, 2, 3]
// EMG burst detection
const burstPipeline = createDspPipeline();
burstPipeline
.Rectify({ mode: "full" })
.WillisonAmplitude({ windowSize: 200, threshold: 0.1 });
const emgData = new Float32Array(1000);
const burstCount = await burstPipeline.process(emgData, {
sampleRate: 1000,
channels: 1,
});
// High WAMP values indicate burst activity
// Multi-channel activity monitoring
const activityPipeline = createDspPipeline();
activityPipeline.WillisonAmplitude({ windowSize: 250, threshold: 0.05 });
const multiChannelData = new Float32Array(4000); // 4 channels
const activityLevel = await activityPipeline.process(multiChannelData, {
sampleRate: 1000,
channels: 4,
});
// Each channel independently tracks activity burstsInterpretation:
- Low WAMP: Smooth signal with gradual changes
- High WAMP: Signal with frequent amplitude variations or bursts
- Zero WAMP: Constant signal or all changes below threshold
- Threshold effect: Higher threshold → fewer counted events
Comparison with SSC:
| Feature | Measures | Sensitivity | Best For | | ------- | ----------------- | ---------------------- | --------------------- | | WAMP | Amplitude changes | Large amplitude shifts | Burst detection | | SSC | Direction changes | Frequency content | Oscillation counting | | Both | Signal activity | Different aspects | Combined EMG features |
Mathematical Properties:
- Integer-valued: WAMP is a count (whole number)
- Non-negative: WAMP ≥ 0
- Bounded: Max WAMP = window_size - 1 (all samples exceed threshold)
- Threshold-dependent: WAMP decreases as threshold increases
🚧 Coming Very Soon
Resampling Operations (Expected in next few days):
Decimate: Downsample by integer factor M with anti-aliasing filterInterpolate: Upsample by integer factor L with anti-imaging filterResample: Rational resampling (L/M) for arbitrary rate conversion- All implemented with efficient polyphase FIR filtering in C++
Other Planned Features:
- Transform Domain: STFT, Hilbert transform, wavelet transforms
- Feature Extraction: Zero-crossing rate, peak detection, autocorrelation
See the project roadmap for more details.
🔧 Advanced Features
For production deployments, the library provides comprehensive observability and monitoring capabilities:
Available Features
- Pipeline Callbacks - Monitor performance, errors, and samples with batched or individual callbacks
- Topic-Based Logging - Kafka-style hierarchical filtering for selective log subscription
- Topic Router - Fan-out routing to multiple backends (PagerDuty, Prometheus, Loki, etc.)
- Priority-Based Routing - 10-level priority system for fine-grained log filtering
.tap()Debugging - Inspect intermediate pipeline results without breaking the flow
Quick Example
import { createDspPipeline, createTopicRouter } from "dspx";
// Production-grade routing to multiple backends
const router = createTopicRouter()
.errors(async (log) => await pagerDuty.alert(log))
.performance(async (log) => await prometheus.record(log))
.debug(async (log) => await loki.send(log))
.build();
const pipeline = createDspPipeline()
.pipeline({ onLogBatch: (logs) => router.routeBatch(logs) })
.MovingAverage({ windowSize: 10 })
.Rms({ windowSize: 5 });📚 Full Advanced Features Documentation
Key highlights:
- Batched callbacks: 3.2M samples/sec (production-safe, non-blocking)
- Individual callbacks: 6.1M samples/sec (development only, blocks event loop)
- Native processing: 22M samples/sec (no callbacks)
Core Methods
process(input, options)
Process data in-place (modifies input buffer for performance):
const input = new Float32Array([1, 2, 3, 4, 5]);
const output = await pipeline.process(input, {
sampleRate: 2000,
channels: 1,
});
// input === output (same reference)processCopy(input, options)
Process a copy of the data (preserves original):
const input = new Float32Array([1, 2, 3, 4, 5]);
const output = await pipeline.processCopy(input, {
sampleRate: 2000,
channels: 1,
});
// input !== output (different references)saveState()
Serialize the current pipeline state to JSON:
const stateJson = await pipeline.saveState();
// Returns: JSON string with all filter states
await redis.set("dsp:state:key", stateJson);loadState(stateJson)
Deserialize and restore pipeline state from JSON:
const stateJson = await redis.get("dsp:state:key");
if (stateJson) {
await pipeline.loadState(stateJson);
}clearState()
Reset all filter states to initial values:
pipeline.clearState();
// All circular buffers cleared, running sums resetlistState()
Get a lightweight summary of the pipeline configuration (without full buffer data):
const pipeline = createDspPipeline()
.MovingAverage({ windowSize: 100 })
.Rectify({ mode: "full" })
.Rms({ windowSize: 50 });
// After processing some data
await pipeline.process(input, { sampleRate: 1000, channels: 1 });
const summary = pipeline.listState();
console.log(summary);
// {
// stageCount: 3,
// timestamp: 1761234567,
// stages: [
// {
// index: 0,
// type: 'movingAverage',
// windowSize: 100,
// numChannels: 1,
// bufferSize: 100,
// channelCount: 1
// },
// {
// index: 1,
// type: 'rectify',
// mode: 'full'
// },
// {
// index: 2,
// type: 'rms',
// windowSize: 50,
// numChannels: 1,
// bufferSize: 50,
// channelCount: 1
// }
// ]
// }Use Cases:
- Monitoring dashboards: Expose pipeline configuration via HTTP endpoint
- Health checks: Verify pipeline structure and configuration
- Debugging: Quick inspection without parsing full state JSON
- Logging: Log pipeline configuration changes
- Size efficiency: ~17-80% smaller than
saveState()depending on buffer sizes
Comparison with saveState():
| Method | Use Case | Contains Buffer Data | Size |
| ------------- | --------------------------- | -------------------- | ------- |
| listState() | Monitoring, debugging | No | Smaller |
| saveState() | Redis persistence, recovery | Yes | Larger |
📊 Use Cases
NEW: IoT Sensor Processing with Irregular Timestamps
import { createDspPipeline } from "dspx";
const pipeline = createDspPipeline();
pipeline.MovingAverage({ mode: "moving", windowDuration: 10000 }); // 10 second window
// Sensor data with network jitter (irregular intervals)
const sensorReadings = [
{ value: 23.5, timestamp: 1698889200000 },
{ value: 24.1, timestamp: 1698889200150 }, // 150ms later
{ value: 23.8, timestamp: 1698889200380 }, // 230ms later (jitter!)
{ value: 24.5, timestamp: 1698889200500 }, // 120ms later
];
const samples = new Float32Array(sensorReadings.map((r) => r.value));
const timestamps = new Float32Array(sensorReadings.map((r) => r.timestamp));
const smoothed = await pipeline.process(samples, timestamps, { channels: 1 });
// Properly handles irregular sampling intervals!Streaming Data with Crash Recovery
import { createDspPipeline } from "dspx";
import { createClient } from "redis";
const redis = await createClient({ url: "redis://localhost:6379" }).connect();
const stateKey = "dsp:stream:sensor01";
const pipeline = createDspPipeline();
pipeline.MovingAverage({ windowSize: 100 });
// Restore state if processing was interrupted
await redis.set(stateKey, savedState);
const savedState = await redis.get(stateKey);
if (savedState) {
await pipeline.loadState(savedState);
console.log("Resumed from saved state");
}
// Process streaming chunks
for await (const chunk of sensorStream) {
const smoothed = await pipeline.process(chunk, {
sampleRate: 1000,
channels: 1,
});
// Save state after each chunk for crash recovery
const state = await pipeline.saveState();
await redis.set(stateKey, state);
await sendToAnalytics(smoothed);
}Multi-Channel EMG Processing
import { createDspPipeline } from "dspx";
// Process 4-channel EMG with rectification + RMS envelope detection
const pipeline = createDspPipeline();
pipeline
.Rectify({ mode: "full" }) // Convert bipolar EMG to magnitude
.Rms({ windowSize: 50 }); // Calculate RMS envelope
// Interleaved 4-channel data
const emgData = new Float32Array(4000); // 1000 samples × 4 channels
const envelope = await pipeline.process(emgData, {
sampleRate: 2000,
channels: 4,
});
// Each channel maintains independent filter states
// Output is smooth envelope tracking muscle activationDistributed Processing Across Workers
// Worker 1 processes first part
const worker1 = createDspPipe