npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@hasan-akbari/ngx-stream-transport

v1.0.3

Published

A robust, signal-based Angular library for Server-Sent Events (SSE) and stream transport with auto-reconnect, state management, custom headers, and query params support.

Readme

ngx-stream-transport

npm version License: MIT

A modern, robust, and highly reactive Server-Sent Events (SSE) client library designed exclusively for Angular 19+.

Built entirely around Angular's new Signals API, it provides a strictly-typed, standalone-compatible, and memory-safe approach to managing real-time data streams without relying on RxJS or third-party SSE polyfills for core logic.

Key Value Propositions

  • 100% Signal-Based: Exposes state, messages, and error as reactive Signals. No .subscribe() or async pipe required.
  • Standalone Ready: Ships with provideStreamTransport() for modern Angular bootstrapping.
  • Robust Auto-Reconnection: Configurable retry logic (delay, max retries) seamlessly handled under the hood.
  • Dynamic Authentication: Supports static tokens, custom headers, and dynamic async token providers (e.g., refreshing JWTs before connecting).
  • HTTP Methods & Body: Built-in support for sending Request Body and Query Parameters via POST, PUT, etc.
  • Interceptors: Powerful middleware to intercept requests, responses, and errors locally or globally.
  • Strictly Typed: Zero any types. Full generic support for state schemas and message payloads.
  • Memory Safe: Built-in integration with Angular's DestroyRef to prevent lingering sockets and memory leaks when components are destroyed.

🚀 Quick Start Guide

1. Installation

Install the library via npm:

npm install @hasan-akbari/ngx-stream-transport

2. Configuration (Bootstrap)

Register the library providers in your application's bootstrap phase:

import { bootstrapApplication } from '@angular/platform-browser';
import { AppComponent } from './app/app.component';
import { provideStreamTransport } from 'ngx-stream-transport';

bootstrapApplication(AppComponent, {
  providers: [
    provideStreamTransport()
  ]
}).catch(err => console.error(err));

3. Usage in a Component

Inject the StreamClientFactory, create an instance bound to the component's lifecycle, and map the signals directly to your template.

import { Component, OnInit, inject, DestroyRef } from '@angular/core';
import { CommonModule } from '@angular/common';
import { StreamClientFactory } from 'ngx-stream-transport';

@Component({
  selector: 'app-live-feed',
  standalone: true,
  imports: [CommonModule],
  template: `
    <div class="feed">
      <h3>Connection State: {{ stream.state() }}</h3>
      
      <div *ngIf="stream.error() as err" style="color: red">
        Error: {{ err.message }}
        <button (click)="stream.reconnect()">Retry Now</button>
      </div>

      <ul>
        <!-- Directly map signals in template -->
        <li *ngFor="let msg of stream.messages()">
          {{ msg.event }}: {{ msg.data | json }}
        </li>
      </ul>
    </div>
  `
})
export class LiveFeedComponent implements OnInit {
  private factory = inject(StreamClientFactory);
  
  // Creates an isolated stream client that auto-disconnects when the component is destroyed
  public stream = this.factory.create(inject(DestroyRef));

  ngOnInit() {
    this.stream.connect(
      'https://api.example.com/events', 
      false, // withCredentials
      3,     // maxRetries
      2000,  // retryDelay (ms)
      {
        staticToken: 'your-auth-token'
      }
    );
  }
}

📚 Public API Summary

StreamClient<TState, TMessage>

The core service managing the SSE lifecycle.

Properties (Signals & Observables)

| Property | Type | Description | |---|---|---| | state | Signal<ConnectionState> | Tracks current connection state (idle, connecting, connected, paused, reconnecting, error, disconnecting, disconnected). | | messages | Signal<StreamMessage<TMessage>[]> | Immutable array of successfully parsed and stored messages. Perfect for *ngFor in templates. | | message$ | Observable<StreamMessage<TMessage>> | RxJS Observable emitting each message individually. Recommended for high-speed streams (like AI chat tokens) to prevent Angular Signal coalescing. | | lastMessage | Signal<StreamMessage<TMessage> \| undefined> | Emits the most recently received message. Warning: May skip intermediate messages during high-speed streaming due to Signal coalescing. | | error | Signal<Error \| null \| undefined> | Emits the latest Error encountered during connection or parsing. |

Methods

| Method | Description | |---|---| | connect(url, withCredentials, maxRetries, retryDelay, authConfig, method, body, queryParams) | Initiates connection. Supports auto-retry, dynamic auth configurations, HTTP methods (requires polyfill), request body (requires polyfill), and dynamically appending query parameters to the URL. Validates HTTP methods and query parameters. | | reconnect() | Manually drops the active connection, resets retry counters, and immediately attempts to reconnect using the last provided config. | | disconnect() | Gracefully closes the connection, clears internal buffers/state, and transitions to Idle. | | pause() | Gracefully drops the connection to pause stream, changing state to Paused. | | resume() | Restores connection from a Paused state using the previous configuration. |

StreamClientFactory

Used to generate isolated instances of StreamClient.

| Method | Description | |---|---| | create<TState, TMessage>(destroyRef?: DestroyRef): StreamClient | Generates a new, isolated client instance. Passing Angular's DestroyRef ensures disconnect() is automatically called upon destruction. |

Configuration Interfaces

AuthConfig

interface AuthConfig {
  staticToken?: string;
  tokenProvider?: () => string | Promise<string>;
  customHeaders?: Record<string, string>; // Requires EventSource polyfill in browser
}

🛠 Advanced Features

Dynamic Authentication

If your tokens expire, you can use tokenProvider to dynamically fetch a fresh token before every connection or reconnection attempt.

this.stream.connect('...', false, 5, 2000, {
  tokenProvider: async () => {
    return await this.authService.getFreshToken();
  }
});

State Tracking via Effects

Since the library is signal-based, you can use Angular's effect() to run side effects based on connection state:

import { effect, inject } from '@angular/core';
import { ConnectionState } from 'ngx-stream-transport';

constructor() {
  effect(() => {
    if (this.stream.state() === ConnectionState.Reconnecting) {
      this.toastService.warning('Network unstable. Reconnecting...');
    }
  });
}

Interceptors (Middleware)

Interceptors provide a powerful way to modify outgoing requests, intercept incoming messages before they hit the store, or format errors.

The library supports two levels of interceptors: Global Interceptors (applied to all streams) and Local Interceptors (applied to a specific stream via the Factory). If both are used, Global interceptors run before Local interceptors.

1. Global Interceptors

Use Case: Adding a global Authentication token, logging all network traffic, or prepending a base URL.

Step A: Create an Interceptor

import { Injectable } from '@angular/core';
import { StreamInterceptor, StreamRequestConfig, StreamMessage } from 'ngx-stream-transport';

@Injectable()
export class GlobalStreamInterceptor implements StreamInterceptor {
  
  // Modify request before it is sent
  interceptRequest(req: StreamRequestConfig): StreamRequestConfig {
    return {
      ...req,
      headers: { ...req.headers, 'Authorization': 'Bearer YOUR_TOKEN' }
    };
  }

  // Intercept messages. Return null to drop the message.
  interceptMessage(msg: StreamMessage): StreamMessage | null {
    if (msg.event === 'ignore-this') return null; 
    return msg;
  }

  // Intercept and format errors
  interceptError(err: Error): Error {
    return new Error(`[Global Error]: ${err.message}`);
  }
}

Step B: Register Globally Provide it during application bootstrap:

bootstrapApplication(AppComponent, {
  providers: [
    provideStreamTransport({
      interceptors: [GlobalStreamInterceptor]
    })
  ]
});

2. Local (Factory-Level) Interceptors

Use Case: Modifying queries or headers for one specific component (e.g., adding a timestamp to a specific chat stream) without affecting other streams in the application.

How to use: Pass an array of interceptors directly into the StreamClientFactory.create() method inside your component:

import { Component, inject, DestroyRef } from '@angular/core';
import { StreamClientFactory, StreamRequestConfig } from 'ngx-stream-transport';

@Component({
  selector: 'app-chat',
  template: `...`
})
export class ChatComponent {
  private factory = inject(StreamClientFactory);
  
  // Create an isolated stream with its own specific interceptor
  public stream = this.factory.create(inject(DestroyRef), [
    {
      interceptRequest: (req: StreamRequestConfig) => {
        console.log('This only runs for the ChatComponent stream!');
        return {
          ...req,
          queryParams: {
            ...req.queryParams,
            chatRoomId: 'room-123'
          }
        };
      }
    }
  ]);
}

Advantages of Local Interceptors:

  • Isolation: Changes don't leak to other stream connections in your app.
  • Context-Aware: You have direct access to component state or specific variables when defining the interceptor locally.

🛑 Troubleshooting & Type Safety Notes

  • EventSource Headers (Resolved): Previously, standard browser EventSource did not support custom headers (like Authorization), HTTP methods other than GET, or a request body. As of version 1.0.1, the library internally bundles and utilizes sse.js. You no longer need to manually polyfill EventSource in your project; custom headers, authentication tokens, and HTTP POST methods work out of the box automatically.
  • High-Speed Streaming & Signal Coalescing: Angular effect() and computed() signals coalesce multiple synchronous updates into a single tick. If you are building high-frequency event streams (like AI streaming tokens), relying solely on the lastMessage signal may cause you to drop intermediate tokens. For these specific use-cases, subscribe to the message$ RxJS Observable instead, which guarantees sequential, uncoalesced delivery of every event.
  • Strict Typing: By default, messages() has a payload type of unknown. Always pass generic types to the factory: this.factory.create<MyState, MyPayloadType>().
  • Memory Leaks: If you do not pass a DestroyRef to the factory, you must manually call this.stream.disconnect() in your component's ngOnDestroy hook.
  • Request Body & HTTP Methods: Only use POST, PUT, or PATCH if you are sending a body. The client actively validates and blocks requests that attempt to send a body with GET, HEAD, OPTIONS, or TRACE.
  • Query Params URL Encoding: Query params are securely encoded and validated against invalid characters to prevent injection issues.

🔗 Links & Resources

🤝 Contribution Guidelines

We welcome pull requests!

  1. Ensure all new code utilizes Angular Signals.
  2. Run npm run test:lib to verify unit tests pass.
  3. Verify strict TypeScript compilation.