Description
When using the reactive client to query records more than batch size, if the connection failed when calling next batch cursor, IllegalStateException ("state should be: open") will be thrown but cannot propagate to the outer Flux onError and so the whole Flux will just hang and cannot complete.
Here is an example stacktrace:
Â
2019-09-07 14:02:29,605 ERROR [Thread-10 ] o.m.d.client : Callback onResult call produced an error |
java.lang.IllegalStateException: state should be: open
|
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) |
at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:104) |
at com.mongodb.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:139) |
at com.mongodb.operation.AsyncQueryBatchCursor.killCursorOnClose(AsyncQueryBatchCursor.java:228) |
at com.mongodb.operation.AsyncQueryBatchCursor.close(AsyncQueryBatchCursor.java:112) |
at com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69) |
at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130) |
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180) |
at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101) |
at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) |
at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169) |
at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118) |
at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85) |
at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53) |
at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827) |
at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412) |
at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85) |
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) |
at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399) |
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) |
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) |
at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219) |
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) |
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) |
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) |
at java.base/java.lang.Thread.run(Thread.java:834) |
When you look at
com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)
com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)
It is possible that `postTerminate` can throw exception and thus `observer.onError(t)` cannot be called and so the Flux chain cannot complete.
Actually the root cause of the problem is on `AbstractSubscription#onError` , which can make the exception to be thrown out of the observer
Also sometimes similar connection errors are logged via reactor Hooks (i.e. not ErrorHandlingResultCallback):
2019-09-06 19:21:07,787 ERROR [Thread-397 ] o.m.d.client : Calling onNext threw an exception |
reactor.core.Exceptions$BubblingException: com.mongodb.MongoException: state should be: open
|
at reactor.core.Exceptions.bubble(Exceptions.java:154) |
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:520) |
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:120) |
at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:50) |
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) |
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:742) |
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:524) |
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:943) |
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) |
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) |
at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1.onNext(ObservableToPublisher.java:66) |
at com.mongodb.async.client.AbstractSubscription.onNext(AbstractSubscription.java:148) |
at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:223) |
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178) |
at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101) |
at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) |
at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169) |
at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118) |
at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85) |
at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53) |
at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
|
|
at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827) |
at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412) |
at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85) |
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) |
at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) |
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) |
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399) |
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) |
at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) |
at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157) |
at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:553) |
at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276) |
at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297) |
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137) |
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105) |
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511) |
at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634) |
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) |
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) |
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) |
at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) |
at sun.nio.ch.Invoker$2.run(Invoker.java:218) |
at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) |
at java.lang.Thread.run(Thread.java:748) |
Caused by: com.mongodb.MongoException: state should be: open
|
at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) |
at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:138) |
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180) |
at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:90) |
at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) |
... 59 common frames omitted |
Caused by: java.lang.IllegalStateException: state should be: open
|
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) |
at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:128) |
at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:84) |
at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:233) |
at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178) |
... 61 common frames omitted |
|
At the meantime, a quick workarond is to add a timeout on the flux/mono chain, but hope there is a better way to deal with this problem