npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

mongodb-cursor-observable

v0.1.0

Published

Mongodb cursor iterator using observable

Readme

Overview

This library offers 4 different observables to traverse the results of MongoDB cursors.

It offers 4 observable categories, each one with a higher level of functionality than the last:

  • next$
  • iterator$
  • cursor$
  • bufferedCursor$

next$

Overview

The next$ observable simply emit a batch of documents (each document being emitted separately) from the cursor that is passed to it before completing.

It is a warm observable: it will wait until it is subscribed on before emiting, but as it operates directly on the cursor passed to it, separate subscribers will just iterate further on the same cursor rather than each get a fresh one.

Signature

next$(cursor, batchSize)

It will return an observable that emits documents.

  • cursor: The MongoDB cursor that will be iterated on
  • batchSize: The maximum number of documents to return before completing (note that if the cursor has fewer documents left to iterate on, fewer documents will be emitted)

Example

const mongodb = require('mongodb');
const Rx = require('rxjs');
const lib = require('mongodb-cursor-observable');
const next$ = lib.next$;

mongodb.MongoClient.connect('mongodb://database:27017').then((conn) => {
    let collection = conn.db('test').collection('someCollection');

    console.log('We will now display the first 100 results of the cursor');
    Rx.Observable.of(collection.find({}).batchSize(100))
        .mergeMap((cursor) => next$(cursor, 100))
        .subscribe(
            (doc) => {console.log(doc);},
            (err) => {console.log(err);},
            () => {console.log('Iteration complete!');}
        );
})

iterator$

Overview

The iterator$ allows you to iterate over a mongoDB cursor, returing a batch of results and allowing you to trigger the next batch via an eventEmitter.

It will provide a cue when it is at the end of a batch by emitting the value 'more' and it will also emit the value 'end' (and then complete) when the cursor is exhausted.

While it technically relies on the cursor that is passed to it as an argument, it makes a separate copy of that cursor before iterating and thus behave like a cold observable (ie, will only start emitting after subscription and different subscribers will get fresh results and not interfere with one another)

Also note that this observable will not automatically emit the first batch of results after subscription (the first batch has to be triggered with the eventEmitter also). It you want to first batch to auto-emit, you can wrap this observable in another one that emits the first batch after subscription (as shown in the example below).

Signature

iterator$(cursor, batchSize, nextEmitter)

It will return an observable that emits documents.

  • cursor: The MongoDB cursor that will be iterated on (note that a separate copy of that cursor is made so if you set the original cursor to never timeout, you'll have to clean it manually afterwards)
  • batchSize: The maximum number of documents that will be emitted per batch before waiting for nextEmitter to emit next (again, the number of results emitted in the final batch may be smaller than this amount if the cursor has fewer results to return before exhaustion)
  • nextEmitter: The eventEmitter instance that the observable will listen to for 'next' events in order to emit each batch of documents.

Example

const mongodb = require('mongodb');
const Rx = require('rxjs');

const lib = require('mongodb-cursor-observable');
const iterator$ = lib.iterator$;
const CursorEmitter = lib.CursorEmitter;

mongodb.MongoClient.connect('mongodb://database:27017').then((conn) => {
    let collection = conn.db('test').collection('someCollection');

    //We user the eventEmitter derivative provided by the library there,
    //but any derivative of eventEmitter would do
    const nextEmitter = new CursorEmitter();

    //iterator that will emit 'next' right after subscription for the first batch of results
    const autoStartIterator$ = (cursor, batchSize, nextEmitter) => {
        return Rx.Observable.create((subscriber) => {
            iterator$(cursor, batchSize,  nextEmitter)
                .subscribe(
                    subscriber.next.bind(subscriber),
                    subscriber.error.bind(subscriber),
                    subscriber.complete.bind(subscriber)
                );
            nextEmitter.emit('next');
        });
    }

    console.log('We will now display all the results of the cursor');
    Rx.Observable.of(collection.find({}).batchSize(100))
        .mergeMap((cursor) => autoStartIterator$(cursor, 100, nextEmitter))
        .subscribe(
            (doc) => {
                if(doc === 'more') {
                    console.log('100 results traversed already. On to the next 100...');
                    nextEmitter.emit('next');
                } else if(doc === 'end') {
                    console.log('The cursor is now exhausted. The observable will now complete.');
                } else {
                    console.log(doc);
                }
            },
            (err) => {console.log(err);},
            () => {console.log('Iteration complete!');}
        );
})

cursor$

Overview

The cursor$ observable will emit documents from the provided cursor argument until exhaustion.

Like the iterator$ observable, it is cold, but unlike the iterator$ observable, the documents will be emitted automatically until exhaustion without the need to manually trigger batches.

However, documents are still iterated in batches under the hood and this observable optionally allows the caller to introduce a delay (in milliseconds) between batches which will result in a corresponding pause between the emittion of the last document of a batch and the first document of the next batch.

The optional delay might come in handy if, for example, you have a background job that needs to perform more database operations (updates, deletions, etc) based on cursor results and you don't want to risk the database becoming less responsive to other traffic during that processing.

Signature

cursor$(cursor, batchSize, batchInterval)

It will return an observable that emits documents.

  • cursor: The MongoDB cursor that will be iterated on (note that a separate copy of that cursor is made so if you set the original cursor to never timeout, you'll have to clean it manually afterwards)
  • batchSize: The number of documents that will be emitted for each batch. Note that if batchInterval is 0 (ie, no delay between batches), this value will have no visible outward effect.
  • batchInterval: The number of milliseconds to pause between the emittion of the last document in a batch and the first document in the next batch. If the value is 0, then there is no pause.

Example

const mongodb = require('mongodb');
const Rx = require('rxjs');

const lib = require('mongodb-cursor-observable');
const cursor$ = lib.cursor$;

mongodb.MongoClient.connect('mongodb://database:27017').then((conn) => {
    let collection = conn.db('test').collection('someCollection');

    console.log('We will now display all the results of the cursor');
    console.log('There will be a pause of 200ms for every 500th document displayed...')
    Rx.Observable.of(collection.find({}).batchSize(500))
        .mergeMap((cursor) => cursor$(cursor, 500, 200))
        .subscribe(
            (doc) => {console.log(doc);},
            (err) => {console.log(err);},
            () => {console.log('Iteration complete!');}
        );
})

bufferedCursor$

Overview

The bufferedCursor$ observable is almost the same as the cursor$ observable, with one notable difference: instead of emitting individual documents, it emits arrays of documents.

The number of documents in each emitted array is the batch size, except probably for the last emitted array (due to cursor exhaustion when the batch size is not a divisor of the number of documents returned by the cursor).

Signature

bufferedCursor$(cursor, batchSize, batchInterval)

It will return an observable that emits arrays of documents.

  • cursor: The MongoDB cursor that will be iterated on (note that a separate copy of that cursor is made so if you set the original cursor to never timeout, you'll have to clean it manually afterwards)
  • batchSize: The number of documents that will be present in each emitted array (except probably the last emitted array before cursor exhaustion which is likely to contain fewer documents)
  • batchInterval: The number of milliseconds to pause between emittions. If the value is 0, then there is no pause.

Example

const mongodb = require('mongodb');
const Rx = require('rxjs');

const lib = require('mongodb-cursor-observable');
const bufferedCursor$ = lib.bufferedCursor$;

mongodb.MongoClient.connect('mongodb://database:27017').then((conn) => {
    let collection = conn.db('test').collection('someCollection');

    console.log('We will now display all the results of the cursor grouped in arrays of size 20');
    console.log('There will be a pause of 50ms between every emittion...')
    Rx.Observable.of(collection.find({}).batchSize(20))
        .mergeMap((cursor) => bufferedCursor$(cursor, 20, 50))
        .subscribe(
            (docs) => {console.log(docs);},
            (err) => {console.log(err);},
            () => {console.log('Iteration complete!');}
        );
})