blog
Real Time Data Streaming with MongoDB Change Streams
Recently, MongoDB released a new feature starting from version 3.6, Change Streams. This gives you the instantaneous access to your data which helps you to stay up to date with your data changes. In today’s world, everyone wants instant notifications rather than getting it after some hours or minutes. For some applications, it’s critical to push real time notifications to all subscribed users for each and every updates. MongoDB made this process really easy by introducing this feature. In this article, we will learn about MongoDB change stream and its applications with some examples.
Defining Change Streams
Change streams are nothing but the real time stream of any changes that occur in the database or collection or even in deployments. For example, whenever any update (Insert, Update or Delete) occurs in a specific collection, MongoDB triggers a change event with all the data which has been modified.
You can define change streams on any collection just like any other normal aggregation operators using $changeStream operator and watch() method. You can also define change stream using MongoCollection.watch() method.
Example
db.myCollection.watch()
Change Streams Features
-
Filtering Changes
You can filter the changes to get event notifications for some targeted data only.
Example:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
This code will make sure that you get updates only for records which has name equals to Bob. This way you can write any pipelines to filter the change streams.
-
Resuming Change Streams
This feature ensures that there is no data loss in case of any failures. Each response in the stream contains the resume token which can be used to restart the stream from a specific point. For some frequent network failures, mongodb driver will try to re-establish the connection with the subscribers using the most recent resume token. Although, in case of complete application failure, resume token should be maintained by the clients to resume the stream.
-
Ordered Change Streams
MongoDB uses a global logical clock to order all the change stream events across all the replicas and shards of any cluster so, the receiver will always receive the notifications in the same order the commands were applied on the database.
-
Events with full documents
MongoDB returns the part of the matching documents by default. But, you can modify the change stream config to receive a full document. To do so, pass { fullDocument: “updateLookup”} to watch method.
Example:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Durability
Change streams will only notify for the data which are committed to the majority of the replicas. This will make sure that events are generated by majority persistence data ensuring the message durability.
-
Security/Access Control
Change streams are very secure. Users can create change streams only on the collections on which they have read permissions. You can create change streams based on user roles.
Example of Change Streams
In this example, we will create change streams on the Stock collection to get notified when any stock price go above any threshold.
-
Setup the cluster
To use change streams, we have to create replica set first. Run the following command to create single node replica set.
mongod --dbpath ./data --replSet “rs”
-
Insert some records in the Stocks collection
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Setup node environment and install dependencies
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Subscribe for the changes
Create one index.js file and put the following code in it.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Now run this file:
node index.js
-
Insert a new record in db to receive an update
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Now check your console, you will receive an update from MongoDB.
Example response:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Here you can change the value of operationType parameter with following operations to listen for different types of changes in a collection:
- Insert
- Replace (Except unique Id)
- Update
- Delete
- Invalidate (Whenever Mongo returns invalid cursor)
Other Modes of Changes Streams
You can start change streams against a database and deployment same way as against collection. This feature has been released from MongoDB version 4.0. Here are the commands to open a change stream against database and deployments.
Against DB: db.watch()
Against deployment: Mongo.watch()
Conclusion
MongoDB Change Streams simplifies the integration between frontend and backend in a realtime and seamless manner. This feature can help you to use MongoDB for pubsub model so you don’t need to manage Kafka or RabbitMQ deployments anymore. If your application requires real time information then you must check out this feature of MongoDB. I hope this post will get you started with MongoDB change streams.