-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: 4.3.0
-
Component/s: Change Streams
Summary
After version 4.2.3 of mongodb java driver a bug was inserted that makes the `onError` method of a subscriber subscribed to a `ChangeStreamPublisher` never being called when a `MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}.` occurs.
It is expected that the `Subscriber` method `onError` is called after the exception timeout (30sec) occurs when waiting for the mongo service (one mongo node in replicaset mode) to be up again.
This is extremaly critical because we have code based on this behavior.
How to Reproduce
You can easily reproduce this bug by using the code below. You only need to start the main program and then stop the mongo (in replica) service. For version 4.2.3 or bellow the `onError` will be called after the default timeout (30sec), for versions above this the same method in never called.
import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Projections; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.reactivestreams.client.ChangeStreamPublisher; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoClients; import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import java.util.Collections; import java.util.List; public class Main { public static void main(String[] args) throws InterruptedException { MongoClient reactiveMongoClient = MongoClients.create("mongodb://localhost:27017"); Bson projection = Projections.include("operationType", "documentKey", "fullDocument", "updateDescription"); List<? extends Bson> pipeline = Collections.singletonList(Aggregates.project(projection)); final ChangeStreamPublisher<Document> publisher = reactiveMongoClient.getDatabase("test") .getCollection("collectionTest") .watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); publisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() { @Override public void onSubscribe(final Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(ChangeStreamDocument<Document> documentChangeStreamDocument) { onNextEvent(documentChangeStreamDocument); } @Override public void onError(final Throwable t) { System.out.println("Failed"); System.out.println(t); } @Override public void onComplete() { System.out.println("Completed"); } }); Thread.sleep(Integer.MAX_VALUE); } private static void onNextEvent(ChangeStreamDocument<Document> changeStreamDocument) { switch (changeStreamDocument.getOperationType()) { case INSERT: case UPDATE: processInsertOrUpdateEvent(changeStreamDocument); break; case DELETE: case REPLACE: processDeleteOrReplaceEvent(changeStreamDocument); break; // For change streams opened up against a collection, // a drop event, rename event, or dropDatabase event // that affects the watched collection leads to an invalidate event. case INVALIDATE: break; default: break; } } private static void processInsertOrUpdateEvent(ChangeStreamDocument<Document> changeStreamDocument) { System.out.println("Insert or update event"); } private static void processDeleteOrReplaceEvent(ChangeStreamDocument<Document> changeStreamDocument) { System.out.println("Delete or replace event"); } }
Additional Background
This bug was detected in version 4.3.3 (the one we are using) and reproduced in the others mentioned.
- is duplicated by
-
JAVA-4546 Reactive change stream stuck after connection lost for longer than server selection timeout
- Closed