[JAVA-4283] Calling watch() for changestream will send getMore command forever Created: 30/Aug/21  Updated: 27/Oct/23  Resolved: 03/Sep/21

Status: Closed
Project: Java Driver
Component/s: Change Streams
Affects Version/s: None
Fix Version/s: None

Type: Task Priority: Unknown
Reporter: James Chen Assignee: Valentin Kavalenka
Resolution: Works as Designed Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Documentation Changes: Not Needed

 Description   

Env: mongo-java-reactivestreams v4.3.1

Reproduce steps:

Just Call

com.mongodb.reactivestreams.client.internal.MongoCollectionImpl#watch(java.lang.Class<TResult>)

to watch a collection for changestream

 

Logs:

 

2021-08-30 03:40:24.773 DEBUG S famuhnax [ ] Thread-18 o.m.d.p.command : Sending command '{"getMore": 4558344685700738737, "collection": "leader", "batchSize": 2147483647, "$db": "turms-config-dev", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1630294824, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "vmhYdKf2Q3ah5F7PmGpIQA==", "subType": "04"}}}}' with request id 274 to database turms-config-dev on connection [connectionId{localValue:32, serverValue:168}] to server localhost:29017
2021-08-30 03:40:24.773 DEBUG S famuhnax [ ] Thread-15 o.m.d.p.command : Execution of command with request id 274 completed successfully in 7.26 ms on connection [connectionId{localValue:32, serverValue:168}] to server localhost:29017
2021-08-30 03:40:27.151 DEBUG S famuhnax [ ] tter-2-thread-1 o.m.d.p.command : Sending command '{"getMore": 4558344685700738737, "collection": "leader", "batchSize": 2147483647, "$db": "turms-config-dev", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1630294825, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "vmhYdKf2Q3ah5F7PmGpIQA==", "subType": "04"}}}}' with request id 276 to database turms-config-dev on connection [connectionId{localValue:32, serverValue:168}] to server localhost:29017
2021-08-30 03:40:27.152 DEBUG S famuhnax [ ] Thread-7 o.m.d.p.command : Execution of command with request id 276 completed successfully in 10.02 ms on connection [connectionId{localValue:32, serverValue:168}] to server localhost:29017
2021-08-30 03:40:28.026 DEBUG S famuhnax [ ] tter-2-thread-1 o.m.d.p.command : Sending command '{"getMore": 4558344685700738737, "collection": "leader", "batchSize": 2147483647, "$db": "turms-config-dev", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1630294825, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "vmhYdKf2Q3ah5F7PmGpIQA==", "subType": "04"}}}}' with request id 282 to database turms-config-dev on connection [connectionId{localValue:32, serverValue:168}] to server localhost:29017

As you can see from the logs, it sent getMore commands with the same cursor ID everytime ("getMore": 4558344685700738737 ), which makes it never stop.

And I guess the bug is caused by:

com.mongodb.internal.operation.AsyncQueryBatchCursor.CommandResultSingleResultCallback#onResult

**, which parses the query cursor ID from the response and uses the same cursor ID to query next batch and just loop forever.



 Comments   
Comment by Valentin Kavalenka [ 03/Sep/21 ]

Hi eurekajameschen@gmail.com,

What you described is the intended behavior of a ChangeStreamPublisher object you get from com.mongodb.reactivestreams.client.MongoCollection.watch(Class): it watches for changes in the collection and publishes corresponding change events. The way ChangeStreamPublisher, or ChangeStreamIterable in synchronous API, does this is by issuing getMore commands, usually until the corresponding org.reactivestreams.Subscription / MongoChangeStreamCursor is cancelled / closed.

Each getMore command is blocked by the server until either its maxTimeMS expires, or there are new change events to return. You may change this duration via ChangeStreamPublisher.maxAwaitTime(long, TimeUnit) / ChangeStreamIterable.maxAwaitTime(long, TimeUnit), as per the description of maxAwaitTimeMS in https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#driver-api.

Comment by James Chen [ 30/Aug/21 ]

The version of MongoDB is: v4.4.8

Generated at Thu Feb 08 09:01:41 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.