Severalnines Blog
The automation and management blog for open source databases
Answer
Close
Polls
How are you currently backing up your data?
Submit
Custom backup scripts scheduled with CRON
ClusterControl
On-prem DB backup tool
Storage snapshot backup
Online backup service
Other
Thank you for submiting your answer!
Please select another item to change your answer.
Results so far (Total Voters: 21):
  • Custom backup scripts scheduled with CRON (43%, 9 Votes)

  • ClusterControl (0%, 0 Votes)

  • On-prem DB backup tool (14%, 3 Votes)

  • Storage snapshot backup (14%, 3 Votes)

  • Online backup service (10%, 2 Votes)

  • Other (19%, 4 Votes)

Real Time Data Streaming with MongoDB Change Streams

Recently, MongoDB released a new feature starting from version 3.6, Change Streams. This gives you the instantaneous access to your data which helps you to stay up to date with your data changes. In today’s world, everyone wants instant notifications rather than getting it after some hours or minutes. For some applications, it’s critical to push real time notifications to all subscribed users for each and every updates. MongoDB made this process really easy by introducing this feature. In this article, we will learn about MongoDB change stream and its applications with some examples.

Defining Change Streams

Change streams are nothing but the real time stream of any changes that occur in the database or collection or even in deployments. For example, whenever any update (Insert, Update or Delete) occurs in a specific collection, MongoDB triggers a change event with all the data which has been modified.

You can define change streams on any collection just like any other normal aggregation operators using $changeStream operator and watch() method. You can also define change stream using MongoCollection.watch() method.

Example

db.myCollection.watch()

Change Streams Features

  • Filtering Changes

    You can filter the changes to get event notifications for some targeted data only.

    Example:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    This code will make sure that you get updates only for records which has name equals to Bob. This way you can write any pipelines to filter the change streams.

  • Resuming Change Streams

    This feature ensures that there is no data loss in case of any failures. Each response in the stream contains the resume token which can be used to restart the stream from a specific point. For some frequent network failures, mongodb driver will try to re-establish the connection with the subscribers using the most recent resume token. Although, in case of complete application failure, resume token should be maintained by the clients to resume the stream.

  • Ordered Change Streams

    MongoDB uses a global logical clock to order all the change stream events across all the replicas and shards of any cluster so, the receiver will always receive the notifications in the same order the commands were applied on the database.

  • Events with full documents

    MongoDB returns the part of the matching documents by default. But, you can modify the change stream config to receive a full document. To do so, pass { fullDocument: “updateLookup”} to watch method.
    Example:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Durability

    Change streams will only notify for the data which are committed to the majority of the replicas. This will make sure that events are generated by majority persistence data ensuring the message durability.

  • Security/Access Control

    Change streams are very secure. Users can create change streams only on the collections on which they have read permissions. You can create change streams based on user roles.

Severalnines
 
Become a MongoDB DBA - Bringing MongoDB to Production
Learn about what you need to know to deploy, monitor, manage and scale MongoDB

Example of Change Streams

In this example, we will create change streams on the Stock collection to get notified when any stock price go above any threshold.

  • Setup the cluster

    To use change streams, we have to create replica set first. Run the following command to create single node replica set.

    mongod --dbpath ./data --replSet “rs”
  • Insert some records in the Stocks collection

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Setup node environment and install dependencies

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Subscribe for the changes

    Create one index.js file and put the following code in it.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Now run this file:

    node index.js
  • Insert a new record in db to receive an update

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Now check your console, you will receive an update from MongoDB.
    Example response:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Here you can change the value of operationType parameter with following operations to listen for different types of changes in a collection:

  • Insert
  • Replace (Except unique Id)
  • Update
  • Delete
  • Invalidate (Whenever Mongo returns invalid cursor)

Other Modes of Changes Streams

You can start change streams against a database and deployment same way as against collection. This feature has been released from MongoDB version 4.0. Here are the commands to open a change stream against database and deployments.

Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusion

MongoDB Change Streams simplifies the integration between frontend and backend in a realtime and seamless manner. This feature can help you to use MongoDB for pubsub model so you don’t need to manage Kafka or RabbitMQ deployments anymore. If your application requires real time information then you must check out this feature of MongoDB. I hope this post will get you started with MongoDB change streams.