npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

stream-proc

v1.0.0

Published

A high-performance data stream processor with features like event sourcing, sliding window aggregations, and concurrency methods

Readme

Stream Processor

npm version Build Status Coverage Status Downloads License: MIT

A high-performance data stream processor with features like event sourcing, sliding window aggregations, and concurrency methods for real-time processing of high-throughput event streams.

Table of Contents

Features

  • Event Sourcing: Capture and process events in real-time from multiple sources
  • Windowed Aggregations: Support for tumbling, sliding, and session windows
  • Concurrency: Parallel processing with configurable parallelism levels
  • DSL Interface: Fluent API for building stream processing pipelines
  • Extensible: Custom sources, sinks, transformations, and aggregations
  • High Performance: Optimized for throughput using modern concurrency patterns
  • Back-pressure Handling: Built-in mechanisms to handle varying data rates
  • Fault Tolerance: Recovery strategies for handling failures gracefully
  • Metrics & Monitoring: Built-in instrumentation for observability

Installation

npm install stream-processor

Or using yarn:

yarn add stream-processor

Quick Start

import { StreamDSL } from 'stream-processor';
import { HttpSource } from 'stream-processor/sources';
import { ConsoleSink } from 'stream-processor/sinks';

// Create a simple pipeline that processes HTTP events
const pipeline = new StreamDSL()
  .from(new HttpSource({ port: 3000 }))
  .map(event => ({ ...event, timestamp: Date.now() }))
  .window('tumbling', 1000)
  .count()
  .to(new ConsoleSink());

// Start processing
pipeline.start();

Usage Examples

Using the DSL

The Domain Specific Language (DSL) approach provides a fluent, chainable API for building processing pipelines:

import { StreamDSL } from 'stream-processor';
import { KafkaSource } from 'stream-processor/sources';
import { ElasticsearchSink } from 'stream-processor/sinks';

const pipeline = new StreamDSL()
  // Consume from Kafka topic
  .from(new KafkaSource({ 
    brokers: ['localhost:9092'], 
    topic: 'user-events',
    groupId: 'processor-group'
  }))
  
  // Parse JSON events
  .map(event => JSON.parse(event.value))
  
  // Add processing timestamp
  .map(event => ({ 
    ...event, 
    processedAt: Date.now() 
  }))
  
  // Filter out test events
  .where(event => !event.isTest)
  
  // Group by user ID
  .keyBy(event => event.userId)
  
  // Count events per user in 5-minute sliding windows
  .window('sliding', {
    size: 5 * 60 * 1000,    // 5 minutes
    slide: 60 * 1000        // Slide every 1 minute
  })
  .count()
  
  // Alert on high activity
  .where(counts => counts.value > 100)
  
  // Send to Elasticsearch
  .to(new ElasticsearchSink({
    node: 'http://localhost:9200',
    index: 'user-activity-alerts'
  }));

pipeline.start();

Using the Core API

For more complex scenarios or finer control, the Core API provides direct access to the underlying components:

import { StreamProcessor } from 'stream-processor';
import { KafkaSource } from 'stream-processor/sources';
import { ElasticsearchSink } from 'stream-processor/sinks';
import { map, filter } from 'stream-processor/operators';

// Create a processor with a Kafka source
const processor = new StreamProcessor(
  new KafkaSource({
    brokers: ['localhost:9092'],
    topic: 'user-events',
    groupId: 'processor-group'
  }), 
  {
    name: 'user-activity-monitor',
    parallelism: 4,
    bufferSize: 10000,
    checkpointInterval: 1000,
    errorPolicy: 'retry'
  }
);

// Build the processing pipeline
const pipeline = processor
  // Parse and enrich events
  .addTransformation({
    name: 'parse-and-enrich',
    transform: input => input.pipe(
      map(event => {
        const parsed = JSON.parse(event.value);
        return {
          ...parsed,
          processedAt: Date.now(),
          region: getRegionFromIp(parsed.ipAddress)
        };
      })
    )
  })
  
  // Filter out test events
  .addFilter({
    name: 'production-only',
    filter: input => input.pipe(
      filter(event => !event.isTest && event.environment === 'production')
    )
  })
  
  // Add custom business logic
  .addTransformation({
    name: 'risk-scoring',
    transform: input => input.pipe(
      map(event => ({
        ...event,
        riskScore: calculateRiskScore(event)
      }))
    )
  })
  
  // Use a bloom filter to detect duplicate events
  .addFilter({
    name: 'deduplication',
    filter: new WindowedBloomFilter({
      size: 10000,
      errorRate: 0.001,
      windowSize: 60 * 60 * 1000  // 1 hour
    })
  })
  
  // Aggregate by user ID
  .addAggregator(
    {
      name: 'user-activity-count',
      keyBy: event => event.userId,
      aggregate: input => input.pipe(count())
    },
    {
      type: 'sliding',
      size: 5 * 60 * 1000,  // 5 minutes
      slide: 60 * 1000      // Slide every 1 minute
    }
  );

// Add a sink for high-activity users
pipeline.addSink(
  new ElasticsearchSink({
    node: 'http://localhost:9200',
    index: 'user-activity-metrics',
    batchSize: 100
  }),
  input => input.pipe(filter(metric => metric.value > 100))
);

// Add monitoring sink for all metrics
pipeline.addSink(
  new PrometheusSink({
    port: 9090,
    path: '/metrics',
    metricName: 'user_activity_count'
  })
);

// Start processing
pipeline.start();

API Reference

Sources

Stream Processor includes several built-in sources:

| Source | Description | Options | |--------|-------------|---------| | HttpSource | Listens for HTTP requests | { port, path, method } | | KafkaSource | Consumes from Kafka topics | { brokers, topic, groupId, ... } | | FileSource | Reads from file(s) | { path, watch, encoding } | | S3Source | Reads from AWS S3 | { bucket, prefix, credentials } | | WebSocketSource | Listens for WebSocket connections | { port, path } | | SQLSource | Polls a database table | { connection, query, interval } |

Creating a custom source:

import { Source } from 'stream-processor';

class MyCustomSource extends Source {
  constructor(options) {
    super(options);
  }
  
  initialize() {
    // Setup your source
  }
  
  start() {
    // Start emitting events
  }
  
  stop() {
    // Clean up resources
  }
}

Transformations

Transformations modify each event in the stream:

| Transformation | Description | |----------------|-------------| | map | Changes each event using a mapping function | | flatMap | Maps and flattens the results | | deduplicate | Removes duplicate events | | enrich | Adds data to events from external sources |

Example:

// Using the DSL
pipeline
  .map(event => ({ ...event, processed: true }))
  .flatMap(event => event.items.map(item => ({ ...item, parentId: event.id })))
  .deduplicate(event => event.id)
  .enrich(async event => {
    const userData = await userService.getUser(event.userId);
    return { ...event, userData };
  });

// Using the Core API
processor.addTransformation({
  name: 'enrich-events',
  transform: input => input.pipe(
    map(event => ({ ...event, processed: true })),
    flatMap(event => event.items.map(item => ({ ...item, parentId: event.id }))),
    deduplicate(event => event.id),
    enrich(async event => {
      const userData = await userService.getUser(event.userId);
      return { ...event, userData };
    })
  )
});

Filters

Filters determine which events continue in the pipeline:

| Filter | Description | |--------|-------------| | where | Keeps events that match a predicate | | sample | Samples events at a specified rate | | throttle | Limits the rate of events | | WindowedBloomFilter | Efficient approximate deduplication |

Example:

// Using the DSL
pipeline
  .where(event => event.value > 10)
  .sample(0.1)  // Sample 10% of events
  .throttle(1000);  // Max 1000 events per second

// Using the Core API
processor
  .addFilter({
    name: 'high-value-events',
    filter: input => input.pipe(filter(event => event.value > 10))
  })
  .addFilter({
    name: 'sampling',
    filter: input => input.pipe(sample(0.1))
  })
  .addFilter({
    name: 'rate-limiting',
    filter: input => input.pipe(throttle(1000))
  });

Windows and Aggregations

Windows group events for aggregation:

| Window Type | Description | |-------------|-------------| | tumbling | Fixed-size, non-overlapping windows | | sliding | Fixed-size, overlapping windows | | session | Dynamic windows based on activity | | global | Single window for all events |

Aggregation functions:

| Aggregation | Description | |-------------|-------------| | count | Counts events | | sum | Sums a specified field | | avg | Calculates average of a field | | min | Finds minimum value | | max | Finds maximum value | | topK | Gets top K elements | | distinct | Counts distinct values | | percentile | Calculates percentiles (approximate) |

Example:

// Using the DSL
pipeline
  .keyBy(event => event.userId)
  .window('sliding', {
    size: 5 * 60 * 1000,    // 5 minutes
    slide: 60 * 1000        // Slide every 1 minute
  })
  .aggregate([
    count().as('eventCount'),
    sum(e => e.value).as('totalValue'),
    avg(e => e.duration).as('avgDuration'),
    percentile(e => e.responseTime, 95).as('p95ResponseTime')
  ]);

// Using the Core API
processor.addAggregator(
  {
    name: 'user-metrics',
    keyBy: event => event.userId,
    aggregate: input => input.pipe(
      aggregate([
        count().as('eventCount'),
        sum(e => e.value).as('totalValue'),
        avg(e => e.duration).as('avgDuration'),
        percentile(e => e.responseTime, 95).as('p95ResponseTime')
      ])
    )
  },
  {
    type: 'sliding',
    size: 5 * 60 * 1000,
    slide: 60 * 1000
  }
);

Sinks

Sinks output processed data:

| Sink | Description | Options | |------|-------------|---------| | ConsoleSink | Outputs to console | { format } | | FileSink | Writes to file | { path, format, append } | | KafkaSink | Produces to Kafka | { brokers, topic, ... } | | ElasticsearchSink | Writes to Elasticsearch | { node, index, ... } | | HttpSink | Makes HTTP requests | { url, method, headers } | | S3Sink | Writes to AWS S3 | { bucket, prefix, credentials } | | PrometheusSink | Exposes metrics | { port, path, metricName } |

Creating a custom sink:

import { Sink } from 'stream-processor';

class MyCustomSink extends Sink {
  constructor(options) {
    super(options);
  }
  
  initialize() {
    // Setup your sink
  }
  
  write(event) {
    // Write the event
  }
  
  close() {
    // Clean up resources
  }
}

Configuration Options

The StreamProcessor constructor accepts these configuration options:

{
  // Basic settings
  name: string;              // Name of the processor
  parallelism: number;       // Number of parallel workers (default: # of CPU cores)
  bufferSize: number;        // Event buffer size (default: 1000)
  
  // Advanced settings
  maxRetries: number;        // Maximum retries for failed events (default: 3)
  backoffPolicy: 'fixed' | 'exponential'; // Retry backoff strategy (default: exponential)
  errorPolicy: 'fail' | 'skip' | 'retry'; // How to handle errors (default: retry)
  
  // Checkpointing & recovery
  checkpointInterval: number; // Milliseconds between checkpoints (default: 60000)
  checkpointStorage: 'memory' | 'file' | 'redis'; // Where to store checkpoints
  checkpointConfig: object;   // Configuration for checkpoint storage
  
  // Monitoring & metrics
  metrics: boolean;           // Enable metrics (default: true)
  metricsPort: number;        // Port for metrics server (default: 9090)
  logLevel: 'debug' | 'info' | 'warn' | 'error'; // Logging level (default: info)
}

Advanced Use Cases

Fraud Detection System

const fraudDetectionPipeline = new StreamDSL()
  .from(new KafkaSource({ 
    brokers: ['kafka:9092'], 
    topic: 'transactions'
  }))
  .map(event => JSON.parse(event.value))
  .keyBy(tx => tx.userId)
  
  // Calculate velocity: how many transactions in last 5 minutes
  .window('sliding', { size: 5 * 60 * 1000, slide: 10 * 1000 })
  .count().as('txVelocity')
  
  // Detect anomalous behavior
  .where(metric => metric.value > 10) // More than 10 transactions in 5 minutes
  
  // Enrich with historical data
  .map(async alert => {
    const userHistory = await getUserTransactionHistory(alert.key);
    return {
      ...alert,
      historicalAverage: userHistory.avgTxPerHour,
      riskScore: calculateRiskScore(alert.value, userHistory)
    };
  })
  
  // High risk alerts go to security team
  .where(alert => alert.riskScore > 0.8)
  .to(new AlertSink({ 
    endpoint: 'https://security-api/fraud-alerts',
    headers: { 'Authorization': 'Bearer token' }
  }));

IoT Sensor Monitoring

const iotMonitoringPipeline = new StreamDSL()
  .from(new MQTTSource({ 
    brokerUrl: 'mqtt://broker.hivemq.com', 
    topic: 'sensors/#' 
  }))
  .map(event => JSON.parse(event.payload))
  
  // Add timestamp and device metadata
  .enrich(async reading => {
    const deviceInfo = await deviceRegistry.getDevice(reading.deviceId);
    return {
      ...reading,
      timestamp: Date.now(),
      deviceType: deviceInfo.type,
      location: deviceInfo.location
    };
  })
  
  // Group by device type and location
  .keyBy(reading => `${reading.deviceType}:${reading.location}`)
  
  // Monitor for anomalies in 1-minute windows
  .window('tumbling', 60 * 1000)
  .aggregate([
    avg(r => r.temperature).as('avgTemp'),
    min(r => r.temperature).as('minTemp'),
    max(r => r.temperature).as('maxTemp'),
    stdDev(r => r.temperature).as('tempStdDev')
  ])
  
  // Detect anomalies
  .where(stats => {
    // If standard deviation is high or temps are outside normal range
    return stats.tempStdDev > 10 || 
           stats.maxTemp > 100 || 
           stats.minTemp < 0;
  })
  
  // Send to time-series database
  .to(new InfluxDBSink({
    url: 'http://influxdb:8086',
    database: 'sensor_metrics',
    measurement: 'temperature_anomalies'
  }))
  
  // Also trigger alerts
  .to(new WebhookSink({
    url: 'https://monitoring-service/alerts',
    method: 'POST'
  }));

Log Processing and Analytics

const logAnalyticsPipeline = new StreamDSL()
  .from(new FileSource({ 
    path: '/var/log/application*.log', 
    watch: true 
  }))
  
  // Parse log lines
  .map(line => parseLogLine(line))
  
  // Filter out healthy responses
  .where(log => log.level === 'ERROR' || log.responseTime > 500)
  
  // Group by API endpoint
  .keyBy(log => log.endpoint)
  
  // Analyze in 5-minute windows
  .window('tumbling', 5 * 60 * 1000)
  .aggregate([
    count().as('errorCount'),
    avg(l => l.responseTime).as('avgResponseTime'),
    percentile(l => l.responseTime, 95).as('p95ResponseTime'),
    percentile(l => l.responseTime, 99).as('p99ResponseTime'),
    countDistinct(l => l.userId).as('affectedUsers')
  ])
  
  // Store metrics
  .to(new ElasticsearchSink({
    node: 'http://elasticsearch:9200',
    index: 'api_metrics',
    idField: 'endpoint'
  }))
  
  // Expose metrics for Grafana
  .to(new PrometheusSink({
    port: 9090,
    metrics: {
      errorCount: 'counter',
      avgResponseTime: 'gauge',
      p95ResponseTime: 'gauge',
      p99ResponseTime: 'gauge',
      affectedUsers: 'gauge'
    }
  }));

Performance Tuning

Memory Management

Control memory usage by adjusting buffer sizes and backpressure:

const pipeline = new StreamProcessor(mySource, {
  bufferSize: 5000,                  // Increase buffer for bursty workloads
  maxPendingEvents: 10000,           // Set max pending events before backpressure
  backpressureStrategy: 'drop',      // 'drop' or 'block'
  gcThreshold: 0.8                   // Trigger GC when heap usage reaches 80%
});

Parallelism and Partitioning

Optimize for multi-core systems:

const pipeline = new StreamProcessor(mySource, {
  parallelism: 8,                    // Process on 8 parallel workers
  partitioningStrategy: 'consistent-hash', // How to distribute work
  partitioningKey: event => event.userId, // Key for partitioning
  workerAffinityEnabled: true        // Pin workers to CPU cores
});

Batching and Windowing

Configure batch processing for higher throughput:

const pipeline = new StreamDSL()
  .from(mySource)
  .batch({
    size: 1000,                      // Process up to 1000 events at once
    timeout: 100                     // Or every 100ms, whichever comes first
  })
  .map(batch => processBatch(batch))
  .window('tumbling', {
    size: 60000,                     // 1-minute windows
    allowedLateness: 10000,          // Handle late events up to 10s
    watermarkStrategy: 'system-time' // Or 'event-time' with extractTimestamp
  })
  .aggregate(myAggregation);

Troubleshooting

Common Issues and Solutions

| Issue | Possible Causes | Solutions | |-------|----------------|-----------| | Memory leaks | Unclosed resources, reference cycles | Use pipeline.cleanup(), check for event listeners | | High latency | Buffer overflow, GC pauses | Adjust buffer sizes, tune JVM settings | | Data loss | Unreliable sources, error handling | Enable checkpointing, implement retry logic | | Pipeline stalls | Blocking operations | Use async operations, increase parallelism | | Slow sinks | Network issues, back pressure | Implement batching, monitor sink performance |

Debugging Tools

The library provides tools to help diagnose issues:

// Enable debug logging
const pipeline = new StreamDSL({ logLevel: 'debug' });

// Use the built-in metrics to identify bottlenecks
pipeline.metrics.enable({
  console: true,           // Print metrics to console
  prometheus: 9090,        // Expose Prometheus metrics
  statsd: {                // Send to StatsD
    host: 'localhost',
    port: 8125,
    prefix: 'stream-processor'
  }
});

// Add checkpoint for recovery in case of failures
pipeline.checkpoint({
  interval: 30000,         // Checkpoint every 30 seconds
  storage: 'redis',        // Use Redis for checkpoint storage
  config: {
    host: 'localhost',
    port: 6379
  }
});

// Debug specific components
pipeline.debug('transformation1');   // Debug a specific transformation

Error Handling Strategies

// Global error handler
const pipeline = new StreamDSL({
  errorPolicy: 'retry',
  maxRetries: 3,
  backoffPolicy: 'exponential',
  deadLetterQueue: new KafkaSink({
    topic: 'error-events',
    brokers: ['localhost:9092'] 
  })
});

// Component-specific error handling
pipeline
  .from(mySource)
  .map(event => {
    try {
      return transform(event);
    } catch (error) {
      pipeline.logger.warn('Transform error', { error, event });
      return { ...event, error: error.message, status: 'failed' };
    }
  })
  .to(mySink);

Contributing

We welcome contributions to the Stream Processor project!

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Development Setup

# Clone the repository
git clone https://github.com/yourusername/stream-processor.git
cd stream-processor

# Install dependencies
npm install

# Run tests
npm test

# Build the project
npm run build

# Run the linter
npm run lint

Code Style

We use ESLint and Prettier to maintain code quality. Please run:

npm run lint
npm run format

before submitting your PR.

License

This project is licensed under the MIT License - see the LICENSE file for details.