npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@aid-on/nagare

v0.0.2

Published

WASM-first, stream-centric library for edge computing

Readme

🌊 Nagare (流れ)

npm version License: MIT TypeScript Rust Tests


nagare (流れ) - "flow" in Japanese
Like a nagare carving its path through mountains, Nagare guides your data streams with grace and power

Nagare is a next-generation stream processing library that delivers 5-10x performance improvements over traditional JavaScript stream libraries. Built with Rust/WASM for compute-heavy workloads, SIMD acceleration, and designed specifically for edge computing environments like Cloudflare Workers.

Benchmarks

We include a checksum-verified benchmark suite to compare Nagare, RxJS and native loops on common streaming patterns.

  • Map + Filter (array sources)
  • Reduce (sum) over arrays
  • ConcatMap (outer × inner)

Run:

npm run build:ts && node benches/standard-suite.mjs

Notes:

  • The suite validates outputs via checksums to prevent “too good to be true” results.
  • You can toggle array fusion (JIT-compiled kernels) programmatically:
    • Nagare.setFusionEnabled(true|false)
    • Default runs report both fusion on/off.
  • For WebAssembly tests, use npm run test:wasm to run the Vitest suite with real WASM.

Fusion Toggle

Nagare.setFusionEnabled(false); // disable array kernels (semantic, slower)
Nagare.setFusionEnabled(true);  // enable array kernels (fast)
Nagare.setJitMode('fast');      // or 'off' to disable codegen

⚡ Performance First

🏆 Benchmark Results (vs RxJS, after WASM warmup)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Map + Filter     : 8.78x faster ⚡
Scan/Reduce      : 7.32x faster ⚡  
Complex Pipeline : 1.52x faster ⚡
Average Speedup  : 5.87x 🚀

Note: Performance gains are measured after WASM initialization. Initial cold-start includes ~50ms WASM loading overhead, making Nagare optimal for sustained processing workloads rather than one-off operations.

✨ Why Nagare?

🎯 Purpose-Built for Edge

  • First-class Cloudflare Durable Objects support
  • WebSocket hibernation for efficient real-time streams
  • Optimized for V8 isolates and edge runtimes

🚀 WASM-Powered Performance

  • Rust core compiled to WebAssembly
  • SIMD acceleration for numeric operations
  • Zero-copy BYOB streaming
  • JIT compilation and operator fusion

🛡️ Production Ready

  • Full TypeScript support with strict typing
  • Comprehensive error handling and recovery
  • Credit-based backpressure control
  • Battle-tested in production environments

🔧 Developer Experience

  • Familiar RxJS-like API
  • Tree-shakeable and optimized bundles
  • Extensive documentation and examples
  • Works in Node.js, browsers, and edge

📦 Installation

npm install @aid-on/nagare
yarn add @aid-on/nagare
pnpm add @aid-on/nagare

🚀 Quick Start

Basic Stream Processing

import { Nagare } from '@aid-on/nagare';

// Create and transform a stream
const stream = Nagare.from([1, 2, 3, 4, 5])
  .map(x => x * 2)
  .filter(x => x > 5)
  .scan((acc, x) => acc + x, 0);

// Subscribe with automatic cleanup
using subscription = stream.observe(
  value => console.log('Value:', value),
  {
    onComplete: () => console.log('Complete!'),
    onError: error => console.error('Error:', error)
  }
);

### Common Aggregates (JIT-optimized)

```typescript
// Numbers
const values = Nagare.from([3, 1, 4, 1, 5]);

const s = await values.sum();   // 14
const mn = await values.min();  // 1
const mx = await values.max();  // 5
const avg = await values.mean();// 2.8

// Find with predicate
const firstEven = await values.find(x => x % 2 === 0); // 4

These aggregates use the same array fast paths / fused operator chains as reduce, avoiding intermediate arrays.


### SIMD-Accelerated Processing

```typescript
// Process large Float32Arrays with SIMD
const audioData = new Float32Array(1_000_000);

const processed = await Nagare
  .from([audioData])
  .mapWasm('fft_transform')      // Fast Fourier Transform
  .mapWasm('noise_reduction')    // SIMD noise reduction
  .toArray();

// Float64 support
import { processFloat64Batch, simdMapMulAdd64 } from '@aid-on/nagare';
const doubles = new Float64Array(1_000_000);
const squared = await processFloat64Batch(doubles, 'square');

Real-time WebSocket Streams

import { createFromWebSocket } from '@aid-on/nagare';

const wsStream = createFromWebSocket(socket, { 
  binary: true,
  reconnect: true 
});

wsStream
  .map(msg => JSON.parse(msg.data))
  .filter(event => event.type === 'trade')
  .scan((volume, trade) => volume + trade.amount, 0)
  .observe(volume => {
    console.log('Total volume:', volume);
  });

🎨 Core Concepts

The Nagare Type

Nagare<T, E> is the core abstraction - a lazy, composable stream that can be transformed, merged, and observed.

// Multiple ways to create nagares
const fromArray = Nagare.from([1, 2, 3]);
const fromPromise = Nagare.from(Promise.resolve(42));
const fromInterval = nagare.interval(1000);
const fromWebStream = Nagare.fromReadableStream(stream);

// Chainable operators
const pipeline = source
  .map(transform)
  .filter(predicate)
  .debounce(300)
  .distinctUntilChanged()
  .scan(reducer, initial);

Backpressure Management

import { CreditController, AdaptiveBackpressure } from '@aid-on/nagare';

// Credit-based flow control
const credits = new CreditController({
  initialCredits: 100,
  lowWaterMark: 20,
  highWaterMark: 80
});

// Adaptive backpressure based on latency
const adaptive = new AdaptiveBackpressure({
  targetLatency: 50,  // ms
  initialRate: 100,
  minRate: 10,
  maxRate: 1000
});

Error Recovery

const resilient = nagare
  .map(riskyOperation)
  .rescue(error => {
    logger.error('Operation failed:', error);
    return fallbackValue;  // Recover with default
  })
  .retry(3, 1000)  // Retry up to 3 times
  .terminateOnErrorMode();  // Stop on critical errors

🌐 Edge Computing Features

Cloudflare Durable Objects

export class StreamProcessor extends DurableObject {
  private nagare?: Nagare<any>;

  async fetch(request: Request) {
    // Create a stateful stream processor
    this.nagare = Nagare
      .fromWebSocket(request)
      .map(this.processMessage)
      .buffer(100)
      .scan(this.aggregate, {});
      
    return new Response('Stream initialized');
  }
  
  async alarm() {
    // Process buffered data periodically
    const batch = await this.nagare?.take(100).toArray();
    await this.processBatch(batch);
  }
}

WebSocket with Hibernation

// Efficient WebSocket handling with hibernation
export class WebSocketDO extends DurableObject {
  async webSocketMessage(ws: WebSocket, message: string) {
    // Only wake up when messages arrive
    const result = await this.processMessage(message);
    ws.send(JSON.stringify(result));
  }
  
  async webSocketClose(ws: WebSocket) {
    // Clean up on disconnect
    this.state.deleteWebSocket(ws);
  }
}

📊 Advanced Examples

Financial Data Processing

// Real-time trade aggregation
const trades = Nagare.fromEventSource('/trades')
  .map(e => JSON.parse(e.data))
  .filter(t => t.symbol === 'BTC/USD')
  .windowedAggregate(100, 'mean')  // 100-trade moving average
  .scan((stats, price) => ({
    ...stats,
    vwap: (stats.vwap * stats.count + price) / (stats.count + 1),
    count: stats.count + 1
  }), { vwap: 0, count: 0 })
  .observe(stats => {
    dashboard.update(stats);
  });

IoT Sensor Data

// Process sensor telemetry with SIMD
const telemetry = Nagare
  .fromMQTT(mqttClient, 'sensors/+/data')
  .map(msg => new Float32Array(msg.payload))
  .mapWasm('kalman_filter')  // SIMD Kalman filtering
  .mapWasm('anomaly_detection')  // SIMD anomaly detection
  .filter(reading => reading.anomalyScore > 0.8)
  .observe(anomaly => {
    alerts.send(anomaly);
  });

Stream Orchestration

// Merge and combine multiple streams
const temperature = Nagare.fromSensor('temp');
const humidity = Nagare.fromSensor('humidity');
const pressure = Nagare.fromSensor('pressure');

// Merge all readings (interleaved)
const allReadings = nagare.merge(temperature, humidity, pressure);

// Combine latest values from each
const combined = nagare.combine(temperature, humidity, pressure)
  .map(([t, h, p]) => ({
    temperature: t,
    humidity: h,
    pressure: p,
    timestamp: Date.now()
  }));

🏗️ Architecture

┌─────────────────────────────────────────────────┐
│            TypeScript API Layer                  │
│         (Reactive operators, type safety)        │
├─────────────────────────────────────────────────┤
│           WASM Bridge (wasm-bindgen)            │
│        (Automatic memory management)             │
├─────────────────────────────────────────────────┤
│             Rust Core Engine                     │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐      │
│   │   SIMD   │ │  Buffer  │ │   Flow   │      │
│   │ Kernels  │ │   Pool   │ │ Control  │      │
│   └──────────┘ └──────────┘ └──────────┘      │
├─────────────────────────────────────────────────┤
│            Runtime Adapters                      │
│     (Node.js / Browser / Edge Workers)          │
└─────────────────────────────────────────────────┘

📚 Complete API Reference

Stream Creation

Nagare.from(source)           // From iterable/async iterable
Nagare.of(...values)          // From values
Nagare.empty()                // Empty stream
nagare.range(0, 100)          // Numeric range
nagare.interval(1000)         // Periodic emissions
createFromWebSocket(ws)      // WebSocket stream
createFromEventSource(url)   // Server-sent events
createFromFetch(url, opts)   // HTTP polling

Transformation Operators

map(fn)                      // Transform values
filter(predicate)            // Filter values
scan(reducer, seed)          // Accumulate values
take(n)                      // Take first n
skip(n)                      // Skip first n
distinctUntilChanged()       // Emit on change
debounce(ms)                 // Debounce emissions
throttle(ms)                 // Throttle emissions
buffer(size)                 // Buffer values
bufferTime(ms)              // Time-based buffer
pairwise()                   // Emit consecutive pairs
startWith(...values)         // Prepend values

Combination Operators

merge(...nagares)             // Merge streams
fork(predicate)             // Split stream
concatMap(fn)               // Sequential flatten
switchMap(fn)               // Cancel previous
combineLatest(...nagares)    // Combine latest values
withLatestFrom(nagare)       // Sample other stream

Error Handling

rescue(handler)             // Recover from errors
retry(count, delay)         // Retry on failure
terminateOnErrorMode()      // Stop on error

WASM Operators

mapWasm(kernel, params)     // Apply WASM kernel
windowedAggregate(n, op)    // SIMD aggregation

🧪 Testing

# Run all tests
npm test

# Run benchmarks
npm run bench

# Run specific benchmark
npm run bench:rxjs
npm run bench:wasm

# Test in browser
npm run test:browser

🚢 Deployment

Cloudflare Workers

# wrangler.toml
name = "nagare-app"
main = "dist/worker.js"
compatibility_date = "2024-01-01"

[[durable_objects.bindings]]
name = "STREAM_DO"
class_name = "StreamProcessor"
npm run build:worker
wrangler deploy

Node.js

// Automatic WASM loading
import { nagare } from '@aid-on/nagare';

const result = await nagare
  .from(data)
  .mapWasm('simd_kernel')
  .toArray();

Browser

<script type="module">
  import { Nagare } from 'https://cdn.skypack.dev/@aid-on/nagare';
  
  const stream = Nagare.from([1, 2, 3])
    .map(x => x * 2)
    .observe(console.log);
</script>

🔗 Links

📄 License

MIT © Aid-On


combineLatest

nagare.combineLatest(a, b, ...) behaves like RxJS combineLatest: it emits only after all sources have produced at least one value, then re-emits when any source updates.

const ab = nagare.combineLatest(streamA, streamB);
const out = await ab.toArray();

nagare.combine(...) remains a zip-like pairing of next values.

concatMapArray (fast path)

When each outer element expands to a small, fixed-size array, concatMapArray avoids generator overhead and can preallocate output:

import { concatMapArray, Nagare } from '@aid-on/nagare';

const out = await concatMapArray((x: number) => [x, x + 1])(Nagare.from([1,2,3])).toArray();
// => [1,2, 2,3, 3,4]