[JAVA-2858] ChangeStream cursor raises a MongoSocketReadException when there is no event since last replica set election Created: 10/May/18  Updated: 28/Oct/23  Resolved: 14/May/18

Status: Closed
Project: Java Driver
Component/s: Query Operations
Affects Version/s: 3.6.0
Fix Version/s: 3.6.4, 3.7.1

Type: Bug Priority: Major - P3
Reporter: Wan Bachtiar Assignee: Jeffrey Yemin
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified
Environment:

openjdk version "1.8.0_151"
ubuntu v16.04


Issue Links:
Related
is related to CSHARP-2267 ChangeStream cursor raises resumable ... Closed

 Description   

While testing the fix in JAVA-2821 for v3.7.0 (and v3.8.0-beta2) I noticed that change streams cursor resumes when the primary member fails, however only if there has been an event since the last election.

For example:
If a replica set with three members (PSS) 27017, 27018, 27019.
The primary member is currently on `27017`

  1. Insert on the primary 27017 -> change stream event sent
  2. Stop the primary 27017 -> the cursor now works in v3.7.0+ and resumes correctly
  3. Wait for a new primary to be elected.
  4. Insert on the new primary 27018 -> change stream event sent
  5. Stop the current primary 27018 -> cursor resumes correctly.
  6. Wait for a new primary to be elected.
  7. This time do not trigger any events on the collection (you can perform events on another collection though). Stop the new primary 27019.
  8. The cursor will throw an exception com.mongodb.MongoSocketReadException

Example code:

        MongoClient mongoClient;
        String mongoURI = "mongodb://localhost:27017,localhost:27018,localhost:27019/test?replicaSet=changestream";
        mongoClient = new MongoClient(new MongoClientURI(mongoURI));
 
        MongoDatabase database = mongoClient.getDatabase("dbname");
        MongoCollection<Document> collection = database.getCollection("collname");
        Block<ChangeStreamDocument<Document>> printBlock = new 
        Block<ChangeStreamDocument<Document>>() {
            @Override
            public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
                System.out.println(changeStreamDocument);
            }
        };
        MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
        ChangeStreamDocument<Document> next = cursor.next();
        System.out.println(next);
        while(cursor.hasNext()){
            next = cursor.next();
            System.out.println(next);
        }

The stack trace:

INFO: Closed connection [connectionId{localValue:35, serverValue:4}] to ironhide.local:27019 because there was a socket exception raised by this connection.
Exception in thread "main" com.mongodb.MongoSocketReadException: Prematurely reached end of stream
	at com.mongodb.connection.SocketStream.read(SocketStream.java:87)
	at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:538)
	at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:409)
	at com.mongodb.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:281)
	at com.mongodb.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:247)
	at com.mongodb.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:98)
	at com.mongodb.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:441)
	at com.mongodb.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:70)
	at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:192)
	at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:264)
	at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
	at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)
	at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:222)
	at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:115)
	at com.mongodb.operation.ChangeStreamBatchCursor$1.apply(ChangeStreamBatchCursor.java:58)
	at com.mongodb.operation.ChangeStreamBatchCursor$1.apply(ChangeStreamBatchCursor.java:55)
	at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:142)
	at com.mongodb.operation.ChangeStreamBatchCursor.hasNext(ChangeStreamBatchCursor.java:55)
	at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54)
	at com.tour.ChangeStream.main(ChangeStream.java:53)



 Comments   
Comment by Wan Bachtiar [ 12/Jun/18 ]

jeff.yemin,
I missed the ping notification for some reason, apologies.

FWIW, just tested with v3.7.1 and able to confirm that it's working as expected now.
Thanks.

Comment by Githook User [ 18/May/18 ]

Author:

{'username': 'jyemin', 'name': 'Jeff Yemin', 'email': 'jeff.yemin@10gen.com'}

Message: JAVA-2858: Ensure all change stream getMore commands are retried

This commit ensures that change streams are resumable even when watching
a collection that has no activity between failovers.
Branch: 3.6.x
https://github.com/mongodb/mongo-java-driver/commit/83ecdc23dc689aa59abd13f3fc7b8e6b10306a17

Comment by Jeffrey Yemin [ 14/May/18 ]

wan.bachtiar thanks again for the report. If you're willing to check the fix against your reproducer that would be much appreciated.

Comment by Githook User [ 14/May/18 ]

Author:

{'name': 'Jeff Yemin', 'email': 'jeff.yemin@10gen.com', 'username': 'jyemin'}

Message: JAVA-2858: Ensure all change stream getMore commands are retried

This commit ensures that change streams are resumable even when watching
a collection that has no activity between failovers.
Branch: 3.7.x
https://github.com/mongodb/mongo-java-driver/commit/84b5a3476fbb4666e7e5412c0a49251193a33670

Comment by Githook User [ 14/May/18 ]

Author:

{'name': 'Jeff Yemin', 'email': 'jeff.yemin@10gen.com', 'username': 'jyemin'}

Message: JAVA-2858: Ensure all change stream getMore commands are retried

This commit ensures that change streams are resumable even when watching
a collection that has no activity between failovers.
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/0da4dc67f614ca1fb9da5923effe798bcb545a42

Comment by Jeffrey Yemin [ 10/May/18 ]

What's happening is probably this:

  1. App calls ChangeStreamBatchCursor#hasNext
  2. ChangeStreamBatchCursor#resumableOperation issues a getMore command via QueryBatchCursor, which fails because the server has been killed
  3. ChangeStreamBatchCursor#resumableOperation catches that exception and successfully re-executes the aggregate command on the new primary, which returns a cursor with an empty list of documents (since there were no changes in the collection this time)
  4. ChangeStreamBatchCursor#resumableOperation issues a getMore command via QueryBatchCursor, which fails because the server has been killed, but this time it's not executed in the try-catch so the exception escapes, and it's not resumed again.

One way to fix this is to loop in resumableOperation:

    <R> R resumeableOperation(final Function<BatchCursor<RawBsonDocument>, R> function) {
        while (true) {
            try {
                return function.apply(wrapped);
            } catch (MongoNotPrimaryException e) {
                // Ignore
            } catch (MongoCursorNotFoundException w) {
                // Ignore
            } catch (MongoSocketException e) {
                // Ignore
            }
            wrapped.close();
            wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.resumeAfter(resumeToken).execute(binding)).getWrapped();
            binding.release(); // release the new change stream batch cursor's reference to the binding
        }
    }

Same thing in the async version, though the code will be very different.

Generated at Thu Feb 08 08:58:14 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.