npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@bemedev/rx-pausable

v1.2.0

Published

A pausable wrapper for RxJS observables with start, stop, pause, and resume controls

Downloads

137

Readme

@bemedev/rx-pausable

A pausable wrapper for RxJS observables with start, stop, pause, and resume controls.

License: MIT Node.js

Features

  • 🎛️ State Management: Control observable streams with start, stop, pause, and resume
  • 🔁 Restartable: Call start() again after stop() to fully restart the stream with a clean slate
  • Fixed Source: The source observable is bound at creation time and never changes — createPausable cannot be replayed with a different source
  • �🗺️ State Inspector: Read the current state ('stopped' | 'running' | 'paused') via the state property
  • 🔄 Command Interface: Programmatic control via command() method
  • 🧩 Smart Buffering: Values emitted during pause are buffered and re-emitted after resume() with correct relative timing
  • 📦 Flexible Observers: Support for function, partial, full, or no observer
  • 🔗 Dynamic Subscribe: Add extra observers at any time via subscribe() — returns an RxJS Subscription for granular control
  • 🔌 Proper Lifecycle: Automatic subscription management and cleanup per start() call
  • 💪 TypeScript: Full type safety with TypeScript support
  • Well Tested: 100% test coverage with comprehensive test suite

Installation

npm install @bemedev/rx-pausable
pnpm add @bemedev/rx-pausable
yarn add @bemedev/rx-pausable

Usage

Basic Example

import { interval } from 'rxjs';
import createPausable from '@bemedev/rx-pausable';

// Create a pausable wrapper around an observable
const pausable = createPausable(interval(1000), value =>
  console.log('Value:', value),
);

// Start emitting values
pausable.start();
// Output: Value: 0, Value: 1, Value: 2...

console.log(pausable.state); // 'running'

// Pause the stream (buffers incoming values)
pausable.pause();

console.log(pausable.state); // 'paused'

// Resume the stream (re-emits buffered values with correct timing)
pausable.resume();

// Stop completely
pausable.stop();

console.log(pausable.state); // 'stopped'

With Full Observer

import { Subject } from 'rxjs';
import createPausable from '@bemedev/rx-pausable';

const source$ = new Subject<number>();

const pausable = createPausable(source$, {
  next: value => console.log('Next:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Complete!'),
});

pausable.start();
source$.next(1); // Output: Next: 1
source$.complete(); // Output: Complete!

Using Command Interface

import { of } from 'rxjs';
import createPausable from '@bemedev/rx-pausable';

const pausable = createPausable(of(1, 2, 3), console.log);

// Control via commands
pausable.command('start');
pausable.command('pause');
pausable.command('resume');
pausable.command('stop');

Without Observer (Side Effects Only)

import { interval } from 'rxjs';
import { tap } from 'rxjs/operators';
import createPausable from '@bemedev/rx-pausable';

const source$ = interval(1000).pipe(
  tap(value => console.log('Side effect:', value)),
);

const pausable = createPausable(source$);
pausable.start();

API

createPausable<T>(source$: Observable<T>, observer?: Observer<T> | ((value: T) => void))

Creates a pausable wrapper around an RxJS observable.

Note: The source$ observable is fixed at creation time and cannot be changed afterwards. If you need to switch to a different source, create a new createPausable instance. The source is never mutated or replaced internally.

Parameters:

  • source$: The source Observable to wrap. Bound permanently to this instance — cannot be swapped after creation.
  • observer (optional): Can be:
    • A function (value: T) => void
    • A partial observer { next?: ..., error?: ..., complete?: ... }
    • A full observer { next: ..., error: ..., complete: ... }
    • undefined (for observables with side effects only)

Returns: An object with control methods:

start(): void

Starts (or restarts) the stream. Only works from the stopped state.

Each call fully resets internal state: a fresh internal Subject is created, the event buffer is cleared, and timing references are reset. This means calling start() after stop() is safe and produces a clean new subscription to the source observable.

stop(): void

Stops the stream. Cancels any pending resume timers and sets the instance back to the stopped state, from which it can be restarted with start(). Can be called from any non-stopped state.

pause(): void

Pauses the stream. Incoming values from the source are buffered. Only works from running state.

resume(): void

Resumes the stream and re-emits all buffered values (in order, with relative timing). Only works from paused state.

subscribe(observer: ((value: T) => void) | Partial<Observer<T>>): Subscription

Subscribes an additional observer to the internal subject. Unlike the observer passed to createPausable, this subscriber is registered dynamically and receives the same forwarded events subject to the same pause/resume/stop lifecycle controls.

Returns an RxJS Subscription that can be used to unsubscribe.

import { Subject } from 'rxjs';
import { createPausable } from '@bemedev/rx-pausable';

const source$ = new Subject<number>();
const pausable = createPausable(source$, v => console.log('primary:', v));

const sub = pausable.subscribe(v => console.log('secondary:', v));

pausable.start();
source$.next(1); // primary: 1 / secondary: 1

sub.unsubscribe();
source$.next(2); // primary: 2 (secondary no longer receives)

command(action: 'start' | 'stop' | 'pause' | 'resume'): void

Executes a control action programmatically.

state: 'stopped' | 'running' | 'paused' (read-only)

Returns the current state of the pausable wrapper.

State Machine

The pausable wrapper implements a simple state machine:

  ┌─────────────────────────────────────────────────┐
  │                    start()                       │
  ▼                                                  │
stopped ──start()──> running ──pause()──> paused     │
            │                                │        │
            │           stop()               │        │
            └──────────────────────────────► │        │
                                             ▼        │
                                           stopped ───┘
                                             ▲
                                             │ stop()
                                           paused ──resume()──> running

States:

  • stopped: Initial state (or after stop()); not forwarding values. Calling start() resets all internal state and begins a fresh subscription.
  • running: Actively forwarding values from source to observer
  • paused: Stream suspended; source values are buffered until resume()

Valid transitions:

| Current state | Action | Next state | | ------------- | ---------- | ---------- | | stopped | start() | running | | running | pause() | paused | | running | stop() | stopped | | paused | resume() | running | | paused | stop() | stopped |

Invalid transitions are silently ignored:

  • start() when already running or paused
  • pause() when stopped or already paused
  • resume() when stopped or running
  • stop() when already stopped

Behavior Notes

Restart Semantics

Calling stop() followed by start() performs a full restart: all internal state (event buffer, timing references, internal subject) is discarded and recreated fresh. The observer originally passed to createPausable is re-subscribed automatically.

import { interval } from 'rxjs';
import { createPausable } from '@bemedev/rx-pausable';

const values: number[] = [];
const pausable = createPausable(interval(1000), v =>
  values.push(v as number),
);

pausable.start();
// ... after some time: values = [0, 1, 2]

pausable.stop();
// values stay as-is; no more emissions

pausable.start(); // fresh restart — buffer and timing reset
// ... after some time: values = [0, 1, 2, 0, 1, 2, ...]
//                                ↑ first run   ↑ second run (restarts from 0)

Pause/Resume Semantics

When you call pause(), the wrapper buffers all values emitted by the source. When you call resume(), all buffered values are re-emitted to the observer in order, with their original relative timestamps, before resuming normal forwarding.

This means no values are lost during a pause, regardless of whether the source is hot or cold:

const subject$ = new Subject<number>();
const pausable = createPausable(subject$, console.log);
pausable.start();
subject$.next(1); // Output: 1
pausable.pause();
subject$.next(2); // Buffered (no output yet)
subject$.next(3); // Buffered (no output yet)
pausable.resume();
// Output: 2, 3 (buffered values re-emitted)
subject$.next(4); // Output: 4

Errors emitted during pause are also buffered and propagated after resume():

const subject$ = new Subject<number>();
const pausable = createPausable(subject$, {
  next: v => console.log('Next:', v),
  error: e => console.error('Error:', e),
});
pausable.start();
subject$.next(1); // Output: Next: 1
pausable.pause();
subject$.error(new Error('oops')); // Buffered
pausable.resume();
// Output: Error: Error: oops (propagated after resume)

TypeScript

Full TypeScript support with generic types:

import { Observable } from 'rxjs';
import createPausable from '@bemedev/rx-pausable';

interface MyData {
  id: number;
  value: string;
}

const source$: Observable<MyData> = ...;
const pausable = createPausable(source$, (data: MyData) => {
  console.log(data.id, data.value);
});

Requirements

  • Node.js >= 22 (LTS)
  • RxJS >= 7.0.0 (peer dependency)

Development

# Install dependencies
pnpm install

# Run tests
pnpm test

# Run tests in watch mode
pnpm test:watch

# Build
pnpm build

# Lint
pnpm lint

Contributing

Contributions are welcome! Please follow the test numerotation rules defined in .github/rules/test.rules.md when adding tests.

License

MIT

Author

chlbri ([email protected])

Links

Changelog

See CHANGELOG.md for details.