[SERVER-42885] Registering multiple change stream watchers over single db connection Created: 19/Aug/19  Updated: 06/Dec/22

Status: Backlog
Project: Core Server
Component/s: None
Affects Version/s: None
Fix Version/s: None

Type: Improvement Priority: Major - P3
Reporter: Piyush Katariya Assignee: Backlog - Query Execution
Resolution: Unresolved Votes: 3
Labels: change-streams-improvements
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
related to SERVER-36835 High mongod CPU usage on multiple cha... Backlog
is related to DOCS-11270 [Server] Large number of change strea... Closed
Assigned Teams:
Query Execution
Participants:

 Description   

Currently, change stream requires 1 connection for each watcher to be notified through MongoDB driver.

In a fairly medium complex reactive application or in micro service architecture having hundreds of watchers is common but since MongoDB driver consumes 1 exclusive DB connection for each watcher registered it becomes a scaling problem.

A more reasonable design would allow registering/multiplexing multiple watchers over single DB connection.



 Comments   
Comment by Shane Harvey [ 12/Oct/20 ]

Most drivers have an alternative change stream iteration pattern which avoids the need to monopolize a connection per change stream cursor. For example in pymongo the naive iteration pattern looks like this:

with coll.watch(pipeline=[]) as stream:
    try:
        for change in stream:
            # Change received
            print(change)
    except pymongo.errors.PyMongoError as e:
        self.logger.error("Unrecoverable watch error: %s", e)
        raise

The above loop will hog a connection from the driver's connection pool, especially when no changes are available from the server. This essentially limits the number of concurrent change streams to the max number of connections (configured with maxPoolSize). However, we can reduce the contention on connections by using ChangeStream.try_next() with a lower maxAwaitTimeMS and a client side sleep as discussed in MOTOR-625:

with coll.watch(pipeline=[],
                max_await_time_ms=1) as stream:
    try:
        while stream.alive:
            change = stream.try_next()
            if change is None:
                # Sleep to reduce contention on connections and avoid flooding
                # the server with getMores when no changes are available.
                time.sleep(1)
                continue
            # Change received
            print(change)
    except pymongo.errors.PyMongoError as e:
        self.logger.error("Unrecoverable watch error: %s", e)
        raise

With the above approach, many change stream cursors can share a driver single connection.

Comment by NOVALUE Mitar [ 18/Sep/19 ]

Related: https://jira.mongodb.org/browse/DOCS-11270

Comment by Piyush Katariya [ 09/Sep/19 ]

MongoDB as single database has been immensely helpful to fit many sorts of requirements.

IMHO This feature has a great implication on the growth of MongoDB and its attitude of innovation in NoSQL world.

Often times while architecting many sorts of shared nothing and microservice applications which needs to be responsive and interacting with each other (pull, push, pub-sub) depending upon the user data received I have to include and maintain Message Queues and Topics tools like Kafka/RabbitMQ/Redis and again gracefully integrate them to achieve reactive data behaviour, streaming ETL, background jobs etc. If this feature gets implemented it will greatly reduce the dependence on messaging middleware tool to a great extent.

Not everyone is at Google or Facebook or Netflix scale, in fact majority of software have small to medium complex data ingestion/growth rate. 

Comment by Piyush Katariya [ 09/Sep/19 ]

MongoDB core team can decide the optimized way of doing it as they know the internals of it.

But I have one suggestion which might be helpful.

We can have 2 dedicated TCP DB connections from driver to each node in ReplicaSet to take care of any number of watchers. One is for receiving all updates and a second connection for acknowledgement back to Mongo server.

To avoid the exaggerated behaviour from driver client, mongo server can have a configurative parameter something like "maxWatchers" with default size of 100. Depending upon the size of machine (CPU cores + RAM) it can be increased or decreased manually.

 

Generated at Thu Feb 08 05:01:41 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.