Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-3467

Connection failure after onNext will skip onError and makes the Flux cannot complete

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 3.11.2
    • Affects Version/s: None
    • Component/s: Async
    • Labels:
      None
    • Minor Change

      When using the reactive client to query records more than batch size, if the connection failed when calling next batch cursor, IllegalStateException ("state should be: open") will be thrown but cannot propagate to the outer Flux onError and so the whole Flux will just hang and cannot complete.

      Here is an example stacktrace:

       

      2019-09-07 14:02:29,605 ERROR [Thread-10      ] o.m.d.client                   : Callback onResult call produced an error
      java.lang.IllegalStateException: state should be: open
      	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
      	at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:104)
      	at com.mongodb.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:139)
      	at com.mongodb.operation.AsyncQueryBatchCursor.killCursorOnClose(AsyncQueryBatchCursor.java:228)
      	at com.mongodb.operation.AsyncQueryBatchCursor.close(AsyncQueryBatchCursor.java:112)
      	at com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)
      	at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)
      	at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180)
      	at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101)
      	at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85)
      	at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169)
      	at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118)
      	at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85)
      	at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53)
      	at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46)
      	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      	at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94)
      	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      	at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827)
      	at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412)
      	at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481)
      	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245)
      	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      	at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
      	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
      	at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
      	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
      	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
      	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
      	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
      	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
      	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
      	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
      	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
      	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
      	at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219)
      	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      

      When you look at

      com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)

      com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)

      It is possible that `postTerminate` can throw exception and thus `observer.onError(t)` cannot be called and so the Flux chain cannot complete.

      Actually the root cause of the problem is on `AbstractSubscription#onError` , which can make the exception to be thrown out of the observer

      Also sometimes similar connection errors are logged via reactor Hooks (i.e. not ErrorHandlingResultCallback):

      2019-09-06 19:21:07,787 ERROR [Thread-397     ] o.m.d.client                   : Calling onNext threw an exception
      reactor.core.Exceptions$BubblingException: com.mongodb.MongoException: state should be: open
              at reactor.core.Exceptions.bubble(Exceptions.java:154)
              at reactor.core.publisher.Operators.onErrorDropped(Operators.java:520)
              at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:120)
              at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:50)
              at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
              at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:742)
              at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:524)
              at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:943)
              at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
              at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
              at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1.onNext(ObservableToPublisher.java:66)
              at com.mongodb.async.client.AbstractSubscription.onNext(AbstractSubscription.java:148)
              at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:223)
              at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178)
              at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101)
              at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85)
              at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169)
              at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118)
              at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85)
              at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53)
              at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46)
              at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
              at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94)
              at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
      
              at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827)
              at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412)
              at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481)
              at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
              at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245)
              at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
              at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
              at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
              at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
              at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
              at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
              at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
              at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
              at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
              at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
              at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
              at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
              at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
              at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
              at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
              at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:553)
              at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
              at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
              at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137)
              at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105)
              at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511)
              at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76)
              at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634)
              at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619)
              at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
              at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
              at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
              at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
              at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
              at sun.nio.ch.Invoker$2.run(Invoker.java:218)
              at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: com.mongodb.MongoException: state should be: open
              at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79)
              at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:138)
              at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180)
              at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:90)
              at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48)
              ... 59 common frames omitted
      Caused by: java.lang.IllegalStateException: state should be: open
              at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
              at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:128)
              at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:84)
              at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:233)
              at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178)
              ... 61 common frames omitted
      
      

      At the meantime, a quick workarond is to add a timeout on the flux/mono chain, but hope there is a better way to deal with this problem

            Assignee:
            john.stewart@mongodb.com John Stewart (Inactive)
            Reporter:
            ywtsang@gmail.com Wing Tsang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: