Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-4432

The Subscriber of a publisher never gets its onError method called after an MongoTimeoutException

      Summary

      After version 4.2.3 of mongodb java driver a bug was inserted that makes the `onError` method of a subscriber subscribed to a `ChangeStreamPublisher` never being called when a `MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}.` occurs.

      It is expected that the `Subscriber` method `onError` is called after the exception timeout (30sec) occurs when waiting for the mongo service (one mongo node in replicaset mode) to be up again.

      This is extremaly critical because we have code based on this behavior. 

       

      How to Reproduce

      You can easily reproduce this bug by using the code below. You only need to start the main program and then stop the mongo (in replica) service. For version 4.2.3 or bellow the `onError` will be called after the default timeout (30sec), for versions above this the same method in never called.

       

      import com.mongodb.client.model.Aggregates;
      import com.mongodb.client.model.Projections;
      import com.mongodb.client.model.changestream.ChangeStreamDocument;
      import com.mongodb.client.model.changestream.FullDocument;
      import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
      import com.mongodb.reactivestreams.client.MongoClient;
      import com.mongodb.reactivestreams.client.MongoClients;
      import org.bson.BsonDocument;
      import org.bson.Document;
      import org.bson.conversions.Bson;
      import org.reactivestreams.Subscriber;
      import org.reactivestreams.Subscription;
      
      import java.util.Collections;
      import java.util.List;
      
      public class Main {
      
          public static void main(String[] args) throws InterruptedException {
              MongoClient reactiveMongoClient = MongoClients.create("mongodb://localhost:27017");
      
              Bson projection = Projections.include("operationType", "documentKey", "fullDocument", "updateDescription");
              List<? extends Bson> pipeline = Collections.singletonList(Aggregates.project(projection));
              final ChangeStreamPublisher<Document> publisher = reactiveMongoClient.getDatabase("test")
                      .getCollection("collectionTest")
                      .watch(pipeline)
                      .fullDocument(FullDocument.UPDATE_LOOKUP);
      
              publisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
                  @Override
                  public void onSubscribe(final Subscription s) {
                      s.request(Long.MAX_VALUE);
                  }
      
                  @Override
                  public void onNext(ChangeStreamDocument<Document> documentChangeStreamDocument) {
                      onNextEvent(documentChangeStreamDocument);
                  }
      
                  @Override
                  public void onError(final Throwable t) {
                      System.out.println("Failed");
                      System.out.println(t);
                  }
      
                  @Override
                  public void onComplete() {
                      System.out.println("Completed");
                  }
              });
      
              Thread.sleep(Integer.MAX_VALUE);
          }
      
          private static void onNextEvent(ChangeStreamDocument<Document> changeStreamDocument) {
              switch (changeStreamDocument.getOperationType()) {
                  case INSERT:
                  case UPDATE:
                      processInsertOrUpdateEvent(changeStreamDocument);
                      break;
                  case DELETE:
                  case REPLACE:
                      processDeleteOrReplaceEvent(changeStreamDocument);
                      break;
                  // For change streams opened up against a collection,
                  // a drop event, rename event, or dropDatabase event
                  // that affects the watched collection leads to an invalidate event.
                  case INVALIDATE:
                      break;
                  default:
                      break;
              }
          }
      
      
          private static void processInsertOrUpdateEvent(ChangeStreamDocument<Document> changeStreamDocument) {
              System.out.println("Insert or update event");
          }
      
          private static void processDeleteOrReplaceEvent(ChangeStreamDocument<Document> changeStreamDocument) {
              System.out.println("Delete or replace event");
          }
      }

      Additional Background

      This bug was detected in version 4.3.3 (the one we are using) and reproduced in the others mentioned.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            filipe-m-tavares@alticelabs.com Filipe Tavares
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: