npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

mongo-realtime

v2.0.1

Published

A Node.js package that combines Socket.IO and MongoDB Change Streams to deliver real-time database updates to your WebSocket clients.

Readme

Mongo Realtime

A Node.js package that combines Socket.IO and MongoDB Change Streams to deliver real-time database updates to your WebSocket clients.

Banner

🚀 Features

  • Real-time updates: Automatically detects changes in MongoDB and broadcasts them via Socket.IO
  • Granular events: Emits specific events by operation type, collection, and document
  • Connection management: Customizable callbacks for socket connections/disconnections
  • TypeScript compatible: JSDoc annotations for better development experience

📦 Installation

npm install mongo-realtime

Setup

Prerequisites

  • MongoDB running as a replica set (required for Change Streams)
  • Node.js HTTP server (See below how to configure an HTTP server with Express)

Example setup

const express = require("express");
const http = require("http");
const MongoRealtime = require("mongo-realtime");

const app = express();
const server = http.createServer(app);

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  onDbConnect: (conn) => {
    console.log("Connected to db", conn.name);
  },
  onDbError: (err) => {
    console.log(err.message);
  },
  server: server,
  ignore: ["posts"], // ignore 'posts' collection
  onSocket: (socket) => {
    console.log(`Client connected: ${socket.id}`);
    socket.emit("welcome", { message: "Connection successful!" });
  },
  offSocket: (socket, reason) => {
    console.log(`Client disconnected: ${socket.id}, reason: ${reason}`);
  },
});

server.listen(3000, () => {
  console.log("Server started on port 3000");
});

Breaking Changes since v2.x.x

In older versions, streams receive all documents and send them to sockets. This can lead to performance issues with large collections.
From version 2.x.x, streams now require a limit (default 50 documents) from the client side to avoid overloading the server and network.
Receiving streamed docs now works like this:

socket.emit("realtime", {
  limit: 100,
  streamId: "users",
  reverse: true,
  registerId: "random-client-generated-id",
});
/*
 * Request 100 documents from 'users' stream (streamId defined server-side or collection name if autoStream)
 * 'reverse' to get the latest documents first
 * 'registerId' is a unique id generated by the client to identify this request. It will be used to listen to the response event.
 * The response return an object {results: [...], count: number, total:number, remaining:number, coll:string}
 */

socket.on("realtime:users:random-client-generated-id", (data) => {
  console.log("Received streamed documents:", data.results); // 100 max per
});

socket.emit("realtime", {
  limit: 50,
  streamId: "usersWithEmail", // make sure this stream is defined server-side
  registerId: "specific-request-id",
});

socket.on("realtime:usersWithEmail:specific-request-id", (data) => {
  console.log("Received streamed documents with email:", data.results); // 50 max per
});

// Make sure to listen before emitting the request to avoid missing the response
// To stop receiving streamed documents for a specific request just use socket.off

📋 API

MongoRealtime.init(options)

Initializes the socket system and MongoDB Change Streams.

Parameters

* means required

| Parameter | Type | Description | | --------------------- | ----------------- | -------------------------------------------------------------------------------- | | options.dbUri | String* | Database URI | | options.dbOptions | Object | Mongoose connection options | | options.onDbConnect | Function | Callback on successful database connection | | options.onDbError | Function | Callback on database connection error | | options.server | http.Server* | HTTP server to attach Socket.IO | | options.authentify | Function | Function to authenticate socket connections. Should return true if authenticated | | options.middlewares | Array[Function] | Array of Socket.IO middlewares | | options.onSocket | Function | Callback on socket connection | | options.offSocket | Function | Callback on socket disconnection | | options.watch | Array[String] | Collections to only watch. Listen to all when is empty | | options.ignore | Array[String] | Collections to only ignore. Overrides watch array | | options.autoStream | Array[String] | Collections to automatically stream to clients. Default is all | | options.debug | Boolean | Enable debug logs |

Static Properties and Methods

  • MongoRealtime.addStream(streamId, collection, filter): Manually add a list stream for a specific collection and filter
  • MongoRealtime.connection: MongoDB connection
  • MongoRealtime.collections: Array of database collections
  • MongoRealtime.io: Socket.IO server instance
  • MongoRealtime.init(options): Initialize the package with options
  • MongoRealtime.listen(event, callback): Add an event listener for database changes
  • MongoRealtime.notifyListeners(event, data): Manually emit an event to all listeners
  • MongoRealtime.removeStream(streamId): Remove a previously added stream by id
  • MongoRealtime.removeListener(event, callback): Remove a specific event listener or all listeners for an event
  • MongoRealtime.removeAllListeners(): Remove all event listeners
  • MongoRealtime.sockets(): Returns an array of connected sockets

🎯 Emitted Events

The package automatically emits six types of events for each database change:

Event Types

| Event | Description | Example | | ---------------------------------- | ----------------- | ------------------------------------------ | | db:change | All changes | Any collection change | | db:{type} | By operation type | db:insert, db:update, db:delete | | db:change:{collection} | By collection | db:change:users, db:change:posts | | db:{type}:{collection} | Type + collection | db:insert:users, db:update:posts | | db:change:{collection}:{id} | Specific document | db:change:users:507f1f77bcf86cd799439011 | | db:{type}:{collection}:{id} | Type + document | db:insert:users:507f1f77bcf86cd799439011 | | realtime:{streamId}:{registerId} | By stream | realtime:myStreamId:registerId |

Event listeners

You can add serverside listeners to those db events to trigger specific actions on the server:

function sendNotification(change) {
  const userId = change.docId; // or change.documentKey._id
  NotificationService.send(userId, "Welcome to DB");
}

MongoRealtime.listen("db:insert:users", sendNotification);

Adding many callback to one event

MongoRealtime.listen("db:insert:users", anotherAction);
MongoRealtime.listen("db:insert:users", anotherAction2);

Removing event listeners

MongoRealtime.removeListener("db:insert:users", sendNotification); // remove this specific action from this event
MongoRealtime.removeListener("db:insert:users"); // remove all actions from this event
MongoRealtime.removeAllListeners(); // remove all listeners

Event Payload Structure

Each event contains the full MongoDB change object:

{
  "_id": {...},
  "col":"users", // same as ns.coll
  "docId":"...", // same as documentKey._id
  "operationType": "insert|update|delete|replace",
  "documentKey": { "_id": "..." },
  "ns": { "db": "mydb", "coll": "users" },
  "fullDocument": {...},
  "fullDocumentBeforeChange": {...}
}

🔨 Usage Examples

Server-side - Listening to specific events

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  onSocket: (socket) => {
    socket.on("subscribe:users", () => {
      socket.join("users-room");
    });
  },
});

MongoRealtime.io.to("users-room").emit("custom-event", data);

Client-side - Receiving updates

<!DOCTYPE html>
<html>
  <head>
    <script src="/socket.io/socket.io.js"></script>
  </head>
  <body>
    <script>
      const socket = io();

      socket.on("db:change", (change) => {
        console.log("Detected change:", change);
      });

      socket.on("db:insert:users", (change) => {
        console.log("New user:", change.fullDocument);
      });

      const userId = "507f1f77bcf86cd799439011";
      socket.on(`db:update:users:${userId}`, (change) => {
        console.log("Updated user:", change.fullDocument);
      });

      socket.on("db:delete", (change) => {
        console.log("Deleted document:", change.documentKey);
      });
    </script>
  </body>
</html>

Error Handling

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  onSocket: (socket) => {
    socket.on("error", (error) => {
      console.error("Socket error:", error);
    });
  },
  offSocket: (socket, reason) => {
    if (reason === "transport error") {
      console.log("Transport error detected");
    }
  },
});

🔒 Security

Socket Authentication

You can provide an authentify function in the init options to authenticate socket connections.
The function receives the token (from socket.handshake.auth.token or socket.handshake.headers.authorization) and the socket object.
When setted, it rejects connections based on this logic:

  • Token not provided -> error NO_TOKEN_PROVIDED
  • Token invalid or returns false -> error UNAUTHORIZED
  • Any other error -> error AUTH_ERROR
  • Return true to accept the connection
function authenticateSocket(token, socket) {
  const verify = AuthService.verifyToken(token);
  if (verify) {
    socket.user = verify.user; // attach user info to socket
    return true; // should return true to accept the connection
  }
  return false;
}

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  authentify: authenticateSocket,
  middlewares: [
    (socket, next) => {
      console.log(`User is authenticated: ${socket.user.email}`);
      next();
    },
  ],
  offSocket: (socket, reason) => {
    console.log(`Socket ${socket.id} disconnected: ${reason}`);
  },
});

Setup streams

The server will automatically emit a list of filtered documents from the specified collections after each change.
Each list stream requires an unique streamId, the collection name, and an optional filter function that returns a boolean or a promise resolving to a boolean. Clients receive the list on the event db:stream:{streamId}.\

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  autoStream: ["users"], // automatically stream users collection only
});

MongoRealtime.addStream("users", "users", (doc) => !!doc.email); // will throw an error as streamId 'users' already exists

MongoRealtime.removeStream("users"); // remove the previous stream
MongoRealtime.addStream("users", "users", (doc) => !!doc.email); // client can listen to db:stream:users

MongoRealtime.addStream("usersWithEmail", "users", (doc) => !!doc.email); // client can listen to db:stream:usersWithEmail

⚠️ NOTICE

When autoStream is not set, all collections are automatically streamed and WITHOUT any filter.
That means that if you have a posts collection, all documents from this collection will be sent to the clients on each change.\

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  autoStream: [], // stream no collection automatically (you can add your own filtered streams later)
});
// or
MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  // Stream all collections automatically but you can override them
}):

MongoRealtime.addStream("postsWithTitle", "posts", (doc) => !!doc.title); // client can listen to db:stream:posts
MongoRealtime.addStream("users", "users", (doc) => !!doc.email); // will not throw an error cause 'users' is already streamed but with no filter

Usecase for id based streams

MongoRealtime.init({
  dbUri: "mongodb://localhost:27017/mydb",
  server: server,
  authentify: (token, socket) => {
    try {
      socket.uid = decodeToken(token).uid; // setup user id from token
      return true;
    } catch (error) {
      return false;
    }
  },
  onSocket: (socket) => {
    // setup a personal stream for each connected user
    MongoRealtime.addStream(
      `userPost:${socket.uid}`,
      "posts",
      (doc) => doc._id == socket.uid
    );
  },
  offSocket: (socket) => {
    // clean up when user disconnects
    MongoRealtime.removeStream(`userPost:${socket.uid}`);
  },
});

// ...
// or activate stream from a controller or middleware
app.get("/my-posts", (req, res) => {
  const { user } = req;
  MongoRealtime.addStream(
    `userPosts:${user._id}`,
    "posts",
    (doc) => doc.authorId === user._id
  );

  res.send("Stream activated");
});

Usecase with async filter

// MongoRealtime.init({...});

MongoRealtime.addStream("authorizedUsers", "users", async (doc) => {
  const isAdmin = await UserService.isAdmin(doc._id);
  return isAdmin && doc.email.endsWith("@mydomain.com");
});

MongoRealtime.addStream(
  "bestPosts",
  "posts",
  async (doc) => doc.likes > (await PostService.getLikesThreshold())
);

📚 Dependencies

  • socket.io: WebSocket management
  • mongoose: MongoDB ODM with Change Streams support

🐛 Troubleshooting

MongoDB must be in Replica Set mode

To use Change Streams, MongoDB must be running as a replica set. For local development, you can initiate a single-node replica set:

mongod --replSet rs0

rs.initiate()

For any other issues, open an issue on the GitHub repository.

📄 License

MIT

🤝 Contributing

Contributions are welcome! Feel free to open an issue or submit a pull request.