When using the reactive client to query records more than batch size, if the connection failed when calling next batch cursor, IllegalStateException ("state should be: open") will be thrown but cannot propagate to the outer Flux onError and so the whole Flux will just hang and cannot complete.
Here is an example stacktrace:
2019-09-07 14:02:29,605 ERROR [Thread-10 ] o.m.d.client : Callback onResult call produced an error java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:104) at com.mongodb.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:139) at com.mongodb.operation.AsyncQueryBatchCursor.killCursorOnClose(AsyncQueryBatchCursor.java:228) at com.mongodb.operation.AsyncQueryBatchCursor.close(AsyncQueryBatchCursor.java:112) at com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69) at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130) at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180) at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101) at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169) at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118) at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85) at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53) at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827) at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412) at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399) at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219) at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
When you look at
com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)
com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)
It is possible that `postTerminate` can throw exception and thus `observer.onError(t)` cannot be called and so the Flux chain cannot complete.
Actually the root cause of the problem is on `AbstractSubscription#onError` , which can make the exception to be thrown out of the observer
Also sometimes similar connection errors are logged via reactor Hooks (i.e. not ErrorHandlingResultCallback):
2019-09-06 19:21:07,787 ERROR [Thread-397 ] o.m.d.client : Calling onNext threw an exception reactor.core.Exceptions$BubblingException: com.mongodb.MongoException: state should be: open at reactor.core.Exceptions.bubble(Exceptions.java:154) at reactor.core.publisher.Operators.onErrorDropped(Operators.java:520) at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:120) at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:50) at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:742) at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:524) at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:943) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1.onNext(ObservableToPublisher.java:66) at com.mongodb.async.client.AbstractSubscription.onNext(AbstractSubscription.java:148) at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:223) at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178) at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101) at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169) at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118) at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85) at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53) at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827) at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412) at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399) at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157) at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:553) at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137) at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105) at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511) at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634) at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) at sun.nio.ch.Invoker$2.run(Invoker.java:218) at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.mongodb.MongoException: state should be: open at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:138) at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180) at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:90) at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) ... 59 common frames omitted Caused by: java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:128) at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:84) at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:233) at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178) ... 61 common frames omitted
At the meantime, a quick workarond is to add a timeout on the flux/mono chain, but hope there is a better way to deal with this problem