@hasan-akbari/ngx-stream-transport
v1.0.3
Published
A robust, signal-based Angular library for Server-Sent Events (SSE) and stream transport with auto-reconnect, state management, custom headers, and query params support.
Maintainers
Readme
ngx-stream-transport
A modern, robust, and highly reactive Server-Sent Events (SSE) client library designed exclusively for Angular 19+.
Built entirely around Angular's new Signals API, it provides a strictly-typed, standalone-compatible, and memory-safe approach to managing real-time data streams without relying on RxJS or third-party SSE polyfills for core logic.
Key Value Propositions
- 100% Signal-Based: Exposes
state,messages, anderroras reactive Signals. No.subscribe()orasyncpipe required. - Standalone Ready: Ships with
provideStreamTransport()for modern Angular bootstrapping. - Robust Auto-Reconnection: Configurable retry logic (delay, max retries) seamlessly handled under the hood.
- Dynamic Authentication: Supports static tokens, custom headers, and dynamic async token providers (e.g., refreshing JWTs before connecting).
- HTTP Methods & Body: Built-in support for sending Request Body and Query Parameters via
POST,PUT, etc. - Interceptors: Powerful middleware to intercept requests, responses, and errors locally or globally.
- Strictly Typed: Zero
anytypes. Full generic support for state schemas and message payloads. - Memory Safe: Built-in integration with Angular's
DestroyRefto prevent lingering sockets and memory leaks when components are destroyed.
🚀 Quick Start Guide
1. Installation
Install the library via npm:
npm install @hasan-akbari/ngx-stream-transport2. Configuration (Bootstrap)
Register the library providers in your application's bootstrap phase:
import { bootstrapApplication } from '@angular/platform-browser';
import { AppComponent } from './app/app.component';
import { provideStreamTransport } from 'ngx-stream-transport';
bootstrapApplication(AppComponent, {
providers: [
provideStreamTransport()
]
}).catch(err => console.error(err));3. Usage in a Component
Inject the StreamClientFactory, create an instance bound to the component's lifecycle, and map the signals directly to your template.
import { Component, OnInit, inject, DestroyRef } from '@angular/core';
import { CommonModule } from '@angular/common';
import { StreamClientFactory } from 'ngx-stream-transport';
@Component({
selector: 'app-live-feed',
standalone: true,
imports: [CommonModule],
template: `
<div class="feed">
<h3>Connection State: {{ stream.state() }}</h3>
<div *ngIf="stream.error() as err" style="color: red">
Error: {{ err.message }}
<button (click)="stream.reconnect()">Retry Now</button>
</div>
<ul>
<!-- Directly map signals in template -->
<li *ngFor="let msg of stream.messages()">
{{ msg.event }}: {{ msg.data | json }}
</li>
</ul>
</div>
`
})
export class LiveFeedComponent implements OnInit {
private factory = inject(StreamClientFactory);
// Creates an isolated stream client that auto-disconnects when the component is destroyed
public stream = this.factory.create(inject(DestroyRef));
ngOnInit() {
this.stream.connect(
'https://api.example.com/events',
false, // withCredentials
3, // maxRetries
2000, // retryDelay (ms)
{
staticToken: 'your-auth-token'
}
);
}
}📚 Public API Summary
StreamClient<TState, TMessage>
The core service managing the SSE lifecycle.
Properties (Signals & Observables)
| Property | Type | Description |
|---|---|---|
| state | Signal<ConnectionState> | Tracks current connection state (idle, connecting, connected, paused, reconnecting, error, disconnecting, disconnected). |
| messages | Signal<StreamMessage<TMessage>[]> | Immutable array of successfully parsed and stored messages. Perfect for *ngFor in templates. |
| message$ | Observable<StreamMessage<TMessage>> | RxJS Observable emitting each message individually. Recommended for high-speed streams (like AI chat tokens) to prevent Angular Signal coalescing. |
| lastMessage | Signal<StreamMessage<TMessage> \| undefined> | Emits the most recently received message. Warning: May skip intermediate messages during high-speed streaming due to Signal coalescing. |
| error | Signal<Error \| null \| undefined> | Emits the latest Error encountered during connection or parsing. |
Methods
| Method | Description |
|---|---|
| connect(url, withCredentials, maxRetries, retryDelay, authConfig, method, body, queryParams) | Initiates connection. Supports auto-retry, dynamic auth configurations, HTTP methods (requires polyfill), request body (requires polyfill), and dynamically appending query parameters to the URL. Validates HTTP methods and query parameters. |
| reconnect() | Manually drops the active connection, resets retry counters, and immediately attempts to reconnect using the last provided config. |
| disconnect() | Gracefully closes the connection, clears internal buffers/state, and transitions to Idle. |
| pause() | Gracefully drops the connection to pause stream, changing state to Paused. |
| resume() | Restores connection from a Paused state using the previous configuration. |
StreamClientFactory
Used to generate isolated instances of StreamClient.
| Method | Description |
|---|---|
| create<TState, TMessage>(destroyRef?: DestroyRef): StreamClient | Generates a new, isolated client instance. Passing Angular's DestroyRef ensures disconnect() is automatically called upon destruction. |
Configuration Interfaces
AuthConfig
interface AuthConfig {
staticToken?: string;
tokenProvider?: () => string | Promise<string>;
customHeaders?: Record<string, string>; // Requires EventSource polyfill in browser
}🛠 Advanced Features
Dynamic Authentication
If your tokens expire, you can use tokenProvider to dynamically fetch a fresh token before every connection or reconnection attempt.
this.stream.connect('...', false, 5, 2000, {
tokenProvider: async () => {
return await this.authService.getFreshToken();
}
});State Tracking via Effects
Since the library is signal-based, you can use Angular's effect() to run side effects based on connection state:
import { effect, inject } from '@angular/core';
import { ConnectionState } from 'ngx-stream-transport';
constructor() {
effect(() => {
if (this.stream.state() === ConnectionState.Reconnecting) {
this.toastService.warning('Network unstable. Reconnecting...');
}
});
}Interceptors (Middleware)
Interceptors provide a powerful way to modify outgoing requests, intercept incoming messages before they hit the store, or format errors.
The library supports two levels of interceptors: Global Interceptors (applied to all streams) and Local Interceptors (applied to a specific stream via the Factory). If both are used, Global interceptors run before Local interceptors.
1. Global Interceptors
Use Case: Adding a global Authentication token, logging all network traffic, or prepending a base URL.
Step A: Create an Interceptor
import { Injectable } from '@angular/core';
import { StreamInterceptor, StreamRequestConfig, StreamMessage } from 'ngx-stream-transport';
@Injectable()
export class GlobalStreamInterceptor implements StreamInterceptor {
// Modify request before it is sent
interceptRequest(req: StreamRequestConfig): StreamRequestConfig {
return {
...req,
headers: { ...req.headers, 'Authorization': 'Bearer YOUR_TOKEN' }
};
}
// Intercept messages. Return null to drop the message.
interceptMessage(msg: StreamMessage): StreamMessage | null {
if (msg.event === 'ignore-this') return null;
return msg;
}
// Intercept and format errors
interceptError(err: Error): Error {
return new Error(`[Global Error]: ${err.message}`);
}
}Step B: Register Globally Provide it during application bootstrap:
bootstrapApplication(AppComponent, {
providers: [
provideStreamTransport({
interceptors: [GlobalStreamInterceptor]
})
]
});2. Local (Factory-Level) Interceptors
Use Case: Modifying queries or headers for one specific component (e.g., adding a timestamp to a specific chat stream) without affecting other streams in the application.
How to use:
Pass an array of interceptors directly into the StreamClientFactory.create() method inside your component:
import { Component, inject, DestroyRef } from '@angular/core';
import { StreamClientFactory, StreamRequestConfig } from 'ngx-stream-transport';
@Component({
selector: 'app-chat',
template: `...`
})
export class ChatComponent {
private factory = inject(StreamClientFactory);
// Create an isolated stream with its own specific interceptor
public stream = this.factory.create(inject(DestroyRef), [
{
interceptRequest: (req: StreamRequestConfig) => {
console.log('This only runs for the ChatComponent stream!');
return {
...req,
queryParams: {
...req.queryParams,
chatRoomId: 'room-123'
}
};
}
}
]);
}Advantages of Local Interceptors:
- Isolation: Changes don't leak to other stream connections in your app.
- Context-Aware: You have direct access to component state or specific variables when defining the interceptor locally.
🛑 Troubleshooting & Type Safety Notes
- EventSource Headers (Resolved): Previously, standard browser
EventSourcedid not support custom headers (likeAuthorization), HTTP methods other thanGET, or a request body. As of version1.0.1, the library internally bundles and utilizessse.js. You no longer need to manually polyfillEventSourcein your project; custom headers, authentication tokens, and HTTPPOSTmethods work out of the box automatically. - High-Speed Streaming & Signal Coalescing: Angular
effect()andcomputed()signals coalesce multiple synchronous updates into a single tick. If you are building high-frequency event streams (like AI streaming tokens), relying solely on thelastMessagesignal may cause you to drop intermediate tokens. For these specific use-cases, subscribe to themessage$RxJS Observable instead, which guarantees sequential, uncoalesced delivery of every event. - Strict Typing: By default,
messages()has a payload type ofunknown. Always pass generic types to the factory:this.factory.create<MyState, MyPayloadType>(). - Memory Leaks: If you do not pass a
DestroyRefto the factory, you must manually callthis.stream.disconnect()in your component'sngOnDestroyhook. - Request Body & HTTP Methods: Only use
POST,PUT, orPATCHif you are sending abody. The client actively validates and blocks requests that attempt to send a body withGET,HEAD,OPTIONS, orTRACE. - Query Params URL Encoding: Query params are securely encoded and validated against invalid characters to prevent injection issues.
🔗 Links & Resources
- Repository: https://github.com/hasan-akbari/ngx-stream-transport
- NPM Package: https://www.npmjs.com/package/@hasan-akbari/ngx-stream-transport
- Issue Tracker: GitHub Issues
🤝 Contribution Guidelines
We welcome pull requests!
- Ensure all new code utilizes Angular Signals.
- Run
npm run test:libto verify unit tests pass. - Verify strict TypeScript compilation.
