[JAVA-4432] The Subscriber of a publisher never gets its onError method called after an MongoTimeoutException Created: 04/Jan/22  Updated: 28/Oct/23  Resolved: 28/Feb/22

Status: Closed
Project: Java Driver
Component/s: Change Streams
Affects Version/s: 4.3.0
Fix Version/s: 4.5.1

Type: Bug Priority: Major - P3
Reporter: Filipe Tavares Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
is duplicated by JAVA-4546 Reactive change stream stuck after co... Closed

 Description   

Summary

After version 4.2.3 of mongodb java driver a bug was inserted that makes the `onError` method of a subscriber subscribed to a `ChangeStreamPublisher` never being called when a `MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}.` occurs.

It is expected that the `Subscriber` method `onError` is called after the exception timeout (30sec) occurs when waiting for the mongo service (one mongo node in replicaset mode) to be up again.

This is extremaly critical because we have code based on this behavior. 

 

How to Reproduce

You can easily reproduce this bug by using the code below. You only need to start the main program and then stop the mongo (in replica) service. For version 4.2.3 or bellow the `onError` will be called after the default timeout (30sec), for versions above this the same method in never called.

 

import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
 
import java.util.Collections;
import java.util.List;
 
public class Main {
 
    public static void main(String[] args) throws InterruptedException {
        MongoClient reactiveMongoClient = MongoClients.create("mongodb://localhost:27017");
 
        Bson projection = Projections.include("operationType", "documentKey", "fullDocument", "updateDescription");
        List<? extends Bson> pipeline = Collections.singletonList(Aggregates.project(projection));
        final ChangeStreamPublisher<Document> publisher = reactiveMongoClient.getDatabase("test")
                .getCollection("collectionTest")
                .watch(pipeline)
                .fullDocument(FullDocument.UPDATE_LOOKUP);
 
        publisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
            @Override
            public void onSubscribe(final Subscription s) {
                s.request(Long.MAX_VALUE);
            }
 
            @Override
            public void onNext(ChangeStreamDocument<Document> documentChangeStreamDocument) {
                onNextEvent(documentChangeStreamDocument);
            }
 
            @Override
            public void onError(final Throwable t) {
                System.out.println("Failed");
                System.out.println(t);
            }
 
            @Override
            public void onComplete() {
                System.out.println("Completed");
            }
        });
 
        Thread.sleep(Integer.MAX_VALUE);
    }
 
    private static void onNextEvent(ChangeStreamDocument<Document> changeStreamDocument) {
        switch (changeStreamDocument.getOperationType()) {
            case INSERT:
            case UPDATE:
                processInsertOrUpdateEvent(changeStreamDocument);
                break;
            case DELETE:
            case REPLACE:
                processDeleteOrReplaceEvent(changeStreamDocument);
                break;
            // For change streams opened up against a collection,
            // a drop event, rename event, or dropDatabase event
            // that affects the watched collection leads to an invalidate event.
            case INVALIDATE:
                break;
            default:
                break;
        }
    }
 
 
    private static void processInsertOrUpdateEvent(ChangeStreamDocument<Document> changeStreamDocument) {
        System.out.println("Insert or update event");
    }
 
    private static void processDeleteOrReplaceEvent(ChangeStreamDocument<Document> changeStreamDocument) {
        System.out.println("Delete or replace event");
    }
}

Additional Background

This bug was detected in version 4.3.3 (the one we are using) and reproduced in the others mentioned.



 Comments   
Comment by Jeffrey Yemin [ 23/Mar/22 ]

FYI, 4.5.1 has been released with a fix for this issue.

Comment by Githook User [ 28/Feb/22 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Ensure callback using cluster description is guarded (#878)

Getting the description from a cluster can throw a MongoTimeoutException
or a MongoInterruptedException, so in that case the exception should be
passed to the callback.

JAVA-4432

Co-authored-by: Valentin Kovalenko <valentin.kovalenko@mongodb.com>
Branch: 4.5.x
https://github.com/mongodb/mongo-java-driver/commit/59b61bc882b60d60247e00e385e41ef5d832e65d

Comment by Githook User [ 28/Feb/22 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Ensure callback using cluster description is guarded (#878)

Getting the description from a cluster can throw a MongoTimeoutException
or a MongoInterruptedException, so in that case the exception should be
passed to the callback.

JAVA-4432

Co-authored-by: Valentin Kovalenko <valentin.kovalenko@mongodb.com>
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/ca95c53633de378e3f7f6e333adda62319e19030

Comment by Ross Lawley [ 11/Feb/22 ]

Hi filipe-m-tavares@alticelabs.com,

Thanks for the extra information and for persevering with this ticket. I've scheduled it for the next release.

Ross
 

Comment by Filipe Tavares [ 10/Feb/22 ]

Hi ross.lawley,

The problem still happens in closed version 4.4.0. I already try three different things: 1. killed the docker container, 2. Killed the mongo process inside the docker container and 3. Run the command db.shutdown() in the mongo cli. And in these 3 ways the `onError` is never called.

This is the stacktrace for version 4.3.4

com.mongodb.MongoNodeIsRecoveringException: Command failed with error 11600 (InterruptedAtShutdown): 'interrupted at shutdown' on server 192.168.1.6:27017. The full response is {"operationTime": {"$timestamp": {"t": 1644488576, "i": 1}}, "ok": 0.0, "errmsg": "interrupted at shutdown", "code": 11600, "codeName": "InterruptedAtShutdown", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1644488576, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}}
	at com.mongodb.internal.connection.ProtocolHelper.createSpecialException(ProtocolHelper.java:263)
	at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:191)
	at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:400)
	at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:345)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:226)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:151)
	at java.lang.Thread.run(Thread.java:748)
[Thread-3] INFO org.mongodb.driver.cluster - Cluster description not yet available. Waiting for 5000 ms before timing out
[cluster-ClusterId{value='6204e7379ab49a3a92d5a855', description='null'}-192.168.1.6:27017] INFO org.mongodb.driver.cluster - Exception in monitor thread while connecting to server 192.168.1.6:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$OpenCompletionHandler.failed(AsynchronousSocketChannelStream.java:124)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:279)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:198)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
	at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:293)
	at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Ligação recusada
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:252)
	... 7 more
[Thread-3] ERROR org.mongodb.driver.operation - Callback onResult call produced an error
com.mongodb.MongoTimeoutException: Timed out after 5000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=192.168.1.6:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Ligação recusada}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.internal.async.client.ClientSessionBinding.isConnectionSourcePinningRequired(ClientSessionBinding.java:134)
	at com.mongodb.internal.async.client.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:59)
	at com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection(OperationHelper.java:674)
	at com.mongodb.internal.operation.AsyncChangeStreamBatchCursor.retryOperation(AsyncChangeStreamBatchCursor.java:225)
	at com.mongodb.internal.operation.AsyncChangeStreamBatchCursor.access$400(AsyncChangeStreamBatchCursor.java:44)
	at com.mongodb.internal.operation.AsyncChangeStreamBatchCursor$2.onResult(AsyncChangeStreamBatchCursor.java:214)
	at com.mongodb.internal.operation.AsyncChangeStreamBatchCursor$2.onResult(AsyncChangeStreamBatchCursor.java:194)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:395)
	at com.mongodb.internal.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:375)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:282)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:92)
	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:644)
	at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:156)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:510)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:485)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:791)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:758)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:627)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:624)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
	at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:555)
	at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
	at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
	at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
	at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:624)
	at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:81)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:748)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:733)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:627)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:624)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:432)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
	at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:293)
	at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

And this is the stacktrace for the 4.2.3 version (in which the `onError` is called):

com.mongodb.MongoSocketReadException: Prematurely reached end of stream
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:247)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
	at sun.nio.ch.Invoker$2.run(Invoker.java:218)
	at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[Thread-1] INFO org.mongodb.driver.connection - Closed connection [connectionId{localValue:3, serverValue:8}] to 192.168.1.6:27017 because there was a socket exception raised on another connection from this pool.
[cluster-ClusterId{value='6204e7e6b4b1261b7180aa9e', description='null'}-192.168.1.6:27017] INFO org.mongodb.driver.cluster - Exception in monitor thread while connecting to server 192.168.1.6:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$OpenCompletionHandler.failed(AsynchronousSocketChannelStream.java:124)
	at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
	at sun.nio.ch.Invoker$2.run(Invoker.java:218)
	at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Ligação recusada
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:252)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:198)
	at sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
	at sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:293)
	... 1 more
[Thread-1] INFO org.mongodb.driver.cluster - No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=192.168.1.6:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Ligação recusada}}]}. Waiting for 5000 ms before timing out
[Thread-2] INFO org.mongodb.driver.connection - Closed connection [connectionId{localValue:5}] to 192.168.1.6:27017 because there was a socket exception raised on another connection from this pool.
Failed  <------ ON ERROR CALLED

I did a little debugging and in version 4.2.3 the following code inside `recurseCursor()` method of `BatchCursorFlux`  is triggered

try {
    this.closeCursor();
} finally {
    this.sink.error(e); <----- This here triggers the `onError` of the subscriber
}

In version 4.3 and above this code snippet is never called. And as you can see in the stacktrace the driver takes a completely different path when this error (the mongo goes down) occurs.

Thnks,

Filipe Tavares

Comment by Ross Lawley [ 08/Feb/22 ]

Hi filipe-m-tavares@alticelabs.com,

Thanks for the extra information - as the 4.4.x driver has been released there are no plans to backport fixes to the 4.3.x series of the driver. There is nothing immediately obvious in the changes between 4.2.3 and 4.3.0 to provide insight as to why this error started in the 4.3.0 release.

As I've not been able to replicate the issue can I ask for some more information:

Did you kill docker or just the mongod process? Does setting a socket timeout in the driver configuration impact the results? Could you paste in the full logging stacktrace?

Many Thanks,

Ross

Note: As, this scenario can't be reproduced on a multi node replicaset and because a single node replicaset is not a normal deployment strategy, I've reduced the priority to P4.

Comment by Filipe Tavares [ 08/Feb/22 ]

Hi ross.lawley,

 

But why did you not try in one closed driver version, like 4.3.4 for example? It is happening in this version and it  would be important to understand why it occurs. Also because it may be necessary to release a corrective version?

 

Just one note: I killed the mongo process (in this case running on a docker container) instead of doing a db.shutdown().

 

Thanks,
Filipe

Comment by Ross Lawley [ 07/Feb/22 ]

Hi filipe-m-tavares@alticelabs.com,

I'm unable to replicate on the master branch - doing a db.shutdown() on a single node replicaset although outputs the server monitor errors it does trigger the onError callback as expected.

16:34:07.518 [main] INFO  org.mongodb.driver.cluster - Cluster created with settings {hosts=[localhost:27017], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='5000 ms', requiredReplicaSetName='repl'}
16:34:07.520 [main] INFO  org.mongodb.driver.cluster - Adding discovered server localhost:27017 to client view of cluster
16:34:07.573 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.connection - Opened connection [connectionId{localValue:2, serverValue:13}] to localhost:27017
16:34:07.573 [cluster-rtt-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.connection - Opened connection [connectionId{localValue:1, serverValue:14}] to localhost:27017
16:34:07.577 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=13, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=16633486, setName='repl', canonicalAddress=localhost:27017, hosts=[localhost:27017], passives=[], arbiters=[], primary='localhost:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000001, setVersion=2, topologyVersion=TopologyVersion{processId=620149fcbcdcdc294558ac08, counter=7}, lastWriteDate=Mon Feb 07 16:34:05 GMT 2022, lastUpdateTimeNanos=24417038188142}
16:34:07.580 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Setting max election id to 7fffffff0000000000000001 from replica set primary localhost:27017
16:34:07.580 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Setting max set version to 2 from replica set primary localhost:27017
16:34:07.581 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Discovered replica set primary localhost:27017
16:34:07.644 [Thread-3] INFO  org.mongodb.driver.connection - Opened connection [connectionId{localValue:3, serverValue:15}] to localhost:27017
OperationType{value='insert'}
16:34:22.569 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Exception in monitor thread while connecting to server localhost:27017
com.mongodb.MongoNodeIsRecoveringException: Command failed with error 11600 (InterruptedAtShutdown): 'interrupted at shutdown' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "interrupted at shutdown", "code": 11600, "codeName": "InterruptedAtShutdown", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1644251657, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1644251657, "i": 1}}}
	at com.mongodb.internal.connection.ProtocolHelper.createSpecialException(ProtocolHelper.java:266) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:194) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:418) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:374) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:216) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:152) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
16:34:22.585 [Thread-14] INFO  org.mongodb.driver.cluster - No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=REPLICA_SET, connectionMode=MULTIPLE, serverDescriptions=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoNodeIsRecoveringException: Command failed with error 11600 (InterruptedAtShutdown): 'interrupted at shutdown' on server localhost:27017. The full response is {"ok": 0.0, "errmsg": "interrupted at shutdown", "code": 11600, "codeName": "InterruptedAtShutdown", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1644251657, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1644251657, "i": 1}}}}}]}. Waiting for 5000 ms before timing out
16:34:23.073 [cluster-ClusterId{value='620149ff816e98340e1033df', description='null'}-localhost:27017] INFO  org.mongodb.driver.cluster - Exception in monitor thread while connecting to server localhost:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
	at com.mongodb.internal.connection.AsynchronousSocketChannelStream$OpenCompletionHandler.failed(AsynchronousSocketChannelStream.java:124) ~[mongodb-driver-core-4.5.0-SNAPSHOT.jar:na]
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:285) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:200) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:215) ~[na:na]
	at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306) ~[na:na]
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.net.ConnectException: Connection refused
Caused by: java.net.ConnectException: Connection refused
 
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.checkConnect(Native Method) ~[na:na]
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:254) ~[na:na]
	... 7 common frames omitted
 onError  <--- on Error triggered.

It maybe that the issue has been fixed or I'm simply unable to replicate the issue.

Ross

Comment by Filipe Tavares [ 24/Jan/22 ]

Hi ross.lawley,

 

The problem that i reported only occurs when you have only one node in replicaset (in your example you have 3 instances of mongo), in my case i only have one as I mentioned before. 

 

I also tested with you scenario (3 nodes in replicaset) and when I stop the primary one the `onError` is indeed called. But when I have only one node in replicaset and I shutdown that node the `onError` is never called.

Thank you,
Filipe

Comment by Ross Lawley [ 19/Jan/22 ]

Hi filipe-m-tavares@alticelabs.com,

I tested on the latest 4.3 release (4.3.4) and on the latest release (4.4.1) and wasn't able to replicate the issue. I tweaked your test code like so:

public static void main(String[] args) throws InterruptedException {
        MongoClient reactiveMongoClient = MongoClients.create(
                MongoClientSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost:27017,localhost:27018,localhost:27019"))
                        .applyToClusterSettings(b -> b.serverSelectionTimeout(5, TimeUnit.SECONDS))  // Faster server selection timeouts
                        .build()
                );
 
        Bson projection = Projections.include("operationType", "documentKey", "fullDocument", "updateDescription");
        List<? extends Bson> pipeline = Collections.singletonList(Aggregates.project(projection));
        final ChangeStreamPublisher<Document> publisher = reactiveMongoClient.getDatabase("test")
                .getCollection("collectionTest")
                .watch(pipeline)
                .fullDocument(FullDocument.UPDATE_LOOKUP);
 
        CountDownLatch latch = new CountDownLatch(1);  // Use a latch so the main program completes when the latch is counted down.
 
        publisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
            @Override
            public void onSubscribe(final Subscription s) {
                s.request(Long.MAX_VALUE);
            }
 
            @Override
            public void onNext(ChangeStreamDocument<Document> documentChangeStreamDocument) {
                onNextEvent(documentChangeStreamDocument);
            }
 
            @Override
            public void onError(final Throwable t) {
                latch.countDown();
                System.out.println(" onError");
                System.out.println(t.getMessage());
            }
 
            @Override
            public void onComplete() {
                latch.countDown();
                System.out.println("Completed");
            }
        });
 
        latch.await();
    }

 

I was able to see the program working correctly and when I stopped the replicaset I observed the latch countdown via calling onError after the server selection timeout of 5 seconds.

Could you confirm that it works as expected for you?

Ross
 

Comment by Ross Lawley [ 04/Jan/22 ]

Hi filipe-m-tavares@alticelabs.com,

Thanks for the ticket, I'll review and update with any findings.

Ross

Generated at Thu Feb 08 09:02:04 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.