npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@todorivanov/rx-mongo

v1.0.0

Published

Reactive MongoDB library using RxJS

Readme

RxMongo - Reactive MongoDB with RxJS

npm version license downloads

RxMongo is a TypeScript/JavaScript library that wraps MongoDB operations in RxJS observables, allowing for reactive, event-driven interactions with MongoDB. It is designed to provide a seamless way of working with MongoDB using the reactive programming paradigm. With RxMongo, you can perform MongoDB operations such as querying, updating, inserting, and aggregating in a reactive way with support for RxJS streams.

Features

  • Reactive MongoDB operations: Perform CRUD operations, aggregations, and more using RxJS.
  • Observable-based APIs: Get data streams from MongoDB collections and work with them reactively.
  • Easy to integrate: Supports integration with Node.js, Express.js, and NestJS environments.
  • TypeScript support: Fully typed for better development experience.-

Installation

To install the package, you need to add rxjs and mongodb as dependencies along with @todorivanov/rx-mongo:

npm install @todorivanov/rx-mongo rxjs mongodb

Integration

  1. Node.js Integration Here's how to integrate RxMongo into a basic Node.js application:
import { RxMongo, RxMongoCollection } from "@todorivanov/rx-mongo";
import { Observable } from "rxjs";
import { MongoClient, Document } from "mongodb";

// Define an interface that extends Document
interface User extends Document {
  name: string;
  email: string;
  age: number;
}

// Initialize RxMongo and get the collection
async function main() {
  const client = await MongoClient.connect("mongodb://localhost:27017");
  const db = client.db("myDatabase");
  const rxMongo = new RxMongo(db);
  
  // Get collection as Observable
  rxMongo.getCollection<User>("users").subscribe((userCollection) => {

    // Example: Insert a user
    const newUser: User = {
      name: "John Doe",
      email: "[email protected]",
      age: 30,
    };
    userCollection.insertOne(newUser).subscribe((result) => {
      console.log("User inserted:", result);
    });

    // Example: Query users
    userCollection.find({ age: { $gt: 25 } }).subscribe((users) => {
      console.log("Users older than 25:", users);
    });
  });
}

main();
  1. Express.js Integration

You can also use RxMongo in an Express.js environment. Here's an example of integrating it into an API:

import express from "express";
import { RxMongo, RxMongoCollection } from "@todorivanov/rx-mongo";
import { MongoClient, Document } from "mongodb";
import { Observable } from "rxjs";

// Define a generic interface for a Product document
interface Product extends Document {
  name: string;
  price: number;
  stock: number;
}

const app = express();
app.use(express.json());

let productCollection: RxMongoCollection<Product>;

// Initialize MongoDB and set up the collection
async function initMongo() {
  const client = await MongoClient.connect("mongodb://localhost:27017");
  const db = client.db("myDatabase");
  const rxMongo = new RxMongo(db);
  
  rxMongo.getCollection<Product>("products").subscribe((collection) => {
    productCollection = collection;
  });
}

// Endpoint to add a product
app.post("/products", (req, res) => {
  const newProduct: Product = req.body;
  productCollection.insertOne(newProduct).subscribe((result) => {
    res.status(201).send(result);
  });
});

// Endpoint to get all products
app.get("/products", (req, res) => {
  productCollection.find({}).subscribe((products) => {
    res.json(products);
  });
});

// Start the server
app.listen(3000, () => {
  initMongo().then(() => {
    console.log("Server and MongoDB initialized on port 3000");
  });
});
  1. NestJs Integration

You can easily integrate RxMongo into a NestJS project. Below is an example module and service using RxMongo with NestJS.

Create a DatabaseModule:

import { Module } from "@nestjs/common";
import { MongoClient } from "mongodb";
import { RxMongoService } from "./rx-mongo.service";

@Module({
  providers: [
    {
      provide: "DATABASE_CONNECTION",
      useFactory: async () => {
        const client = new MongoClient("mongodb://localhost:27017");
        await client.connect();
        return client.db("mydatabase"); // Use your MongoDB database
      },
    },
    RxMongoService,
  ],
  exports: [RxMongoService],
})
export class DatabaseModule {}

Create RxMongoService:

import { Injectable, Inject } from "@nestjs/common";
import { RxMongo } from "@todorivanov/rx-mongo";
import { Db } from "mongodb";

@Injectable()
export class RxMongoService {
  private rxMongo: RxMongo;

  constructor(@Inject("DATABASE_CONNECTION") db: Db) {
    this.rxMongo = new RxMongo(db); // Initialize RxMongo
  }

  getUsers() {
    return this.rxMongo.getCollection("users"); // Return users collection as Observable
  }
}
  1. API Overview

RxMongo

  • constructor(db: Db): Initialize RxMongo with a MongoDB Db instance.
  • getCollection<T>(collectionName: string, options?: CollectionOptions): Observable<RxMongoCollection<T>>: Returns an observable for accessing a specific collection wrapped with reactive operations.

RxMongoCollection<T>

All MongoDB collection methods are wrapped as RxJS observables:

  • countDocuments(filter: Filter<T>, options?: CountDocumentsOptions): Observable<number>: Count the documents matching the filter.
  • find(filter: Filter<T>, options?: FindOptions): Observable<WithId<T>[]>: Find documents in the collection that match the provided filter.
  • findOne(filter: Filter<T>, options?: FindOptions): Observable<T | null>: Find a single document.
  • insertOne(document: OptionalUnlessRequiredId<T>): Observable<InsertOneResult<T>>: Insert a single document into the collection.
  • insertMany(docs: ReadonlyArray<OptionalUnlessRequiredId<T>>, options?: BulkWriteOptions): Observable<InsertManyResult<T>>: Insert multiple documents.
  • updateOne(filter: Filter<T>, update: UpdateFilter<T>, options?: UpdateOptions): Observable<UpdateResult<T>>: Update a single document using update operators ($set, $inc, etc.).
  • replaceOne(filter: Filter<T>, replacement: WithoutId<T>, options?: ReplaceOptions): Observable<UpdateResult<T>>: Replace an entire document.
  • updateMany(filter: Filter<T>, update: UpdateFilter<T> | T[], options?: UpdateOptions): Observable<UpdateResult<T>>: Update multiple documents.
  • deleteOne(filter?: Filter<T>, options?: DeleteOptions): Observable<DeleteResult>: Delete a single document from the collection.
  • deleteMany(filter?: Filter<T>, options?: DeleteOptions): Observable<DeleteResult>: Delete multiple documents.
  • distinct(key: string, filter: Filter<T>, options?: DistinctOptions): Observable<any[]>: Get distinct values for a field.
  • aggregate(pipeline?: T[], options?: AggregateOptions): Observable<AggregationCursor<T>>: Perform aggregation operations.
  • bulkWrite(operations: ReadonlyArray<AnyBulkWriteOperation<T>>, options?: BulkWriteOptions): Observable<BulkWriteResult>: Perform bulk write operations.
  • Index management methods: createIndex, createIndexes, dropIndex, dropIndexes, indexes, listIndexes, indexExists

For a complete list of available operations, see the MongoDB Node.js driver documentation and the corresponding RxJS observables used in this library.

Troubleshooting

TypeScript Errors

Make sure you're using MongoDB driver v6.0.0 or higher, which includes built-in TypeScript types. The @types/mongodb package is deprecated and should not be used.

Peer Dependency Warnings

This library requires rxjs (^7.0.0) and mongodb (^6.0.0) as peer dependencies. Install them if you haven't already:

npm install @todorivanov/rx-mongo rxjs mongodb

Connection Issues

Ensure your MongoDB connection string is correct and the MongoDB server is running. For development, you can use MongoDB locally or a cloud service like MongoDB Atlas.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

Todor Ivanov