[JAVA-3467] Connection failure after onNext will skip onError and makes the Flux cannot complete Created: 07/Sep/19  Updated: 28/Oct/23  Resolved: 18/Oct/19

Status: Closed
Project: Java Driver
Component/s: Async
Affects Version/s: None
Fix Version/s: 3.11.2

Type: Bug Priority: Major - P3
Reporter: Wing Tsang Assignee: John Stewart (Inactive)
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Case:
Backwards Compatibility: Minor Change

 Description   

When using the reactive client to query records more than batch size, if the connection failed when calling next batch cursor, IllegalStateException ("state should be: open") will be thrown but cannot propagate to the outer Flux onError and so the whole Flux will just hang and cannot complete.

Here is an example stacktrace:

 

2019-09-07 14:02:29,605 ERROR [Thread-10      ] o.m.d.client                   : Callback onResult call produced an error
java.lang.IllegalStateException: state should be: open
	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
	at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:104)
	at com.mongodb.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:139)
	at com.mongodb.operation.AsyncQueryBatchCursor.killCursorOnClose(AsyncQueryBatchCursor.java:228)
	at com.mongodb.operation.AsyncQueryBatchCursor.close(AsyncQueryBatchCursor.java:112)
	at com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)
	at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)
	at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180)
	at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101)
	at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85)
	at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169)
	at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118)
	at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85)
	at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53)
	at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
	at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
	at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827)
	at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412)
	at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
	at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
	at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
	at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
	at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
	at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
	at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
	at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
	at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
	at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
	at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
	at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219)
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

When you look at

com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:130)

com.mongodb.async.client.MongoIterableSubscription.postTerminate(MongoIterableSubscription.java:69)

It is possible that `postTerminate` can throw exception and thus `observer.onError(t)` cannot be called and so the Flux chain cannot complete.

Actually the root cause of the problem is on `AbstractSubscription#onError` , which can make the exception to be thrown out of the observer

Also sometimes similar connection errors are logged via reactor Hooks (i.e. not ErrorHandlingResultCallback):

2019-09-06 19:21:07,787 ERROR [Thread-397     ] o.m.d.client                   : Calling onNext threw an exception
reactor.core.Exceptions$BubblingException: com.mongodb.MongoException: state should be: open
        at reactor.core.Exceptions.bubble(Exceptions.java:154)
        at reactor.core.publisher.Operators.onErrorDropped(Operators.java:520)
        at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:120)
        at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:50)
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:742)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:524)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:943)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1.onNext(ObservableToPublisher.java:66)
        at com.mongodb.async.client.AbstractSubscription.onNext(AbstractSubscription.java:148)
        at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:223)
        at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178)
        at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:101)
        at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85)
        at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:169)
        at com.mongodb.operation.AsyncQueryBatchCursor.next(AsyncQueryBatchCursor.java:118)
        at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:85)
        at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:53)
        at com.mongodb.async.client.MongoIterableSubscription$1.onResult(MongoIterableSubscription.java:46)
        at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
        at com.mongodb.async.client.OperationExecutorImpl$1$1$1.onResult(OperationExecutorImpl.java:94)
        at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
 
        at com.mongodb.operation.FindOperation$3.onResult(FindOperation.java:827)
        at com.mongodb.operation.OperationHelper$ReferenceCountedReleasingWrappedCallback.onResult(OperationHelper.java:412)
        at com.mongodb.operation.CommandOperationHelper$10.onResult(CommandOperationHelper.java:481)
        at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
        at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:245)
        at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
        at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
        at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
        at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
        at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
        at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
        at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
        at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
        at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
        at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
        at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
        at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
        at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
        at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
        at sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
        at sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:553)
        at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
        at sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
        at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137)
        at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105)
        at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511)
        at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76)
        at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634)
        at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619)
        at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
        at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
        at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
        at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
        at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
        at sun.nio.ch.Invoker$2.run(Invoker.java:218)
        at sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoException: state should be: open
        at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79)
        at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:138)
        at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:180)
        at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:90)
        at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48)
        ... 59 common frames omitted
Caused by: java.lang.IllegalStateException: state should be: open
        at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
        at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:128)
        at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:84)
        at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:233)
        at com.mongodb.async.client.AbstractSubscription.tryProcessResultsQueue(AbstractSubscription.java:178)
        ... 61 common frames omitted

At the meantime, a quick workarond is to add a timeout on the flux/mono chain, but hope there is a better way to deal with this problem



 Comments   
Comment by Githook User [ 04/Nov/19 ]

Author:

{'name': 'John Stewart', 'username': 'jstewart-mongo', 'email': 'john.stewart@mongodb.com'}

Message: AbstractSubscription#onError must call the onError method of the observer

JAVARS-216
Branch: 3.11.x
https://github.com/mongodb/mongo-java-driver/commit/6e0658503e946644261dc8dbc1a8aae38bdb0441

Comment by Githook User [ 18/Oct/19 ]

Author:

{'name': 'John Stewart', 'username': 'jstewart-mongo', 'email': 'john.stewart@mongodb.com'}

Message: AbstractSubscription#onError must call the onError method of the observer

JAVARS-216
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/5dca6d9d27e95d7cd2024db4a9a41f2bd78363f9

Generated at Thu Feb 08 08:59:41 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.