[JAVA-3266] Log exception in user's code Created: 12/Apr/19  Updated: 28/Oct/23  Resolved: 27/Jun/19

Status: Closed
Project: Java Driver
Component/s: Async, Error Handling
Affects Version/s: None
Fix Version/s: 3.11.0

Type: Improvement Priority: Major - P3
Reporter: Boris Petrov Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified


 Description   

Ensure exceptions calling onError|onComplete log if the Publisher already considers itself terminal.

In the scenario where onComplete errors, then onError won't be called as the Publisher has marked itself as terminated. The error should be logged to help debugging.

Was:

I'm using the "mongodb-driver-reactivestreams" driver version 1.11.0.

An exception is thrown in our code which propagates all the way to "AbstractSubscription.tryProcessResultsQueue" on line 161 where "onError" is called but in it the "terminalAction()" check returns "false" so "observer.onError(t)" is not called. Actually neither is "observer.onComplete()".

This is definitely a bug, at least one of the two should be called and, of course, in this case "onError" should be.

If it is not obvious to you what's going on, I could try creating a reproduction but this will take a while.



 Comments   
Comment by Githook User [ 01/Aug/19 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Log exceptions cause by bad Observer implementations

Ensure any exceptions from userland code are logged if they
cannot be processed by onError.

JAVA-3266
Branch: mongot
https://github.com/mongodb/mongo-java-driver/commit/25a4f878c15a42c353e9a0cb855f7c1356797c53

Comment by Githook User [ 27/Jun/19 ]

Author:

{'name': 'Ross Lawley', 'username': 'rozza', 'email': 'ross.lawley@gmail.com'}

Message: Log exceptions cause by bad Observer implementations

Ensure any exceptions from userland code are logged if they
cannot be processed by onError.

JAVA-3266
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/25a4f878c15a42c353e9a0cb855f7c1356797c53

Comment by Ross Lawley [ 25/Jun/19 ]

PR: https://github.com/rozza/mongo-java-driver/pull/325

Comment by Ross Lawley [ 24/Apr/19 ]

Sounds like rethrowing it might also make sense. I'd need to consider the impact and see if that would be caught / handled by the RxJava catch all.

Comment by Boris Petrov [ 24/Apr/19 ]

Hi,

Yes, logging this error would definitely make debugging easier. Otherwise - you're right that the spec says that, but - I'm using RxJava and they have an Exceptions.throwIfFatal method that they use in all their "catch" clauses - which practically rethrows the exception if it is a "fatal" one. Otherwise, they call their own handlers and everything is fine (because we've handled those). But in this case we had exactly that - some LinkageError was thrown and that's why it got to Mongo's handling code which was just quiet.

Not sure what the correct behavior should be. But logging is a good start.

Comment by Ross Lawley [ 24/Apr/19 ]

Hi alien,

Thanks for the stacktrace. Ok it does look like it is as thought and the supplied onComplete method throws an error.

CallbackCompletableObserver.onComplete() line: 53

According to the reactive stream spec, specifically the subscriber code:

Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

For a subscribers onComplete method to throw an error, I think can be considered a breach of the spec.

I think an improvement to the driver would be should log this error, so that debugging is easier. I propose repurposing this ticket to make that change.

Ross

Comment by Boris Petrov [ 24/Apr/19 ]

Thank you for the support! Here is the stacktrace:

Daemon Thread [my-thread] (Suspended)
 ... user code
 CallbackCompletableObserver.onComplete() line: 53
 CompletableFromPublisher$FromPublisherSubscriber<T>.onComplete() line: 68
 ObservableToPublisher$1.onComplete() line: 78
 SingleResultCallbackSubscription<TResult>(AbstractSubscription<TResult>).onComplete() line: 145
 SingleResultCallbackSubscription<TResult>(AbstractSubscription<TResult>).processResultsQueue() line: 211
 SingleResultCallbackSubscription<TResult>(AbstractSubscription<TResult>).tryProcessResultsQueue() line: 159
 SingleResultCallbackSubscription$1.onResult(TResult, Throwable) line: 48
 MongoCollectionImpl$4.onResult(BulkWriteResult, Throwable) line: 646
 MongoCollectionImpl$4.onResult(Object, Throwable) line: 640
 MongoCollectionImpl$9.onResult(BulkWriteResult, Throwable) line: 1043
 MongoCollectionImpl$9.onResult(Object, Throwable) line: 1027
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 OperationExecutorImpl$2$1.onResult(T, Throwable) line: 117
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 OperationHelper$ConnectionReleasingWrappedCallback<T>.onResult(T, Throwable) line: 384
 MixedBulkWriteOperation.addBatchResult(BsonDocument, AsyncWriteBinding, AsyncConnection, BulkWriteBatch, boolean, ConnectionReleasingWrappedCallback<BulkWriteResult>) line: 506
 MixedBulkWriteOperation.access$1500(MixedBulkWriteOperation, BsonDocument, AsyncWriteBinding, AsyncConnection, BulkWriteBatch, boolean, OperationHelper$ConnectionReleasingWrappedCallback) line: 70
 MixedBulkWriteOperation$6.onResult(BsonDocument, Throwable) line: 486
 MixedBulkWriteOperation$6.onResult(Object, Throwable) line: 458
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 DefaultServer$DefaultServerProtocolExecutor$2.onResult(T, Throwable) line: 227
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 CommandProtocolImpl$1.onResult(T, Throwable) line: 85
 DefaultConnectionPool$PooledConnection$1.onResult(T, Throwable) line: 461
 UsageTrackingInternalConnection$2.onResult(T, Throwable) line: 111
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 InternalStreamConnection$2$1.onResult(ResponseBuffers, Throwable) line: 395
 InternalStreamConnection$2$1.onResult(Object, Throwable) line: 372
 InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(ByteBuf, Throwable) line: 667
 InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(Object, Throwable) line: 634
 InternalStreamConnection$5.completed(ByteBuf) line: 510
 InternalStreamConnection$5.completed(Object) line: 507
 AsynchronousChannelStream$BasicCompletionHandler.completed(Integer, Void) line: 220
 AsynchronousChannelStream$BasicCompletionHandler.completed(Object, Object) line: 203
 Invoker.invokeUnchecked(CompletionHandler<V,? super A>, A, V, Throwable) line: 127
 Invoker.invokeDirect(GroupAndInvokeCount, CompletionHandler<V,? super A>, A, V, Throwable) line: 158
 UnixAsynchronousSocketChannelImpl.implRead(boolean, ByteBuffer, ByteBuffer[], long, TimeUnit, A, CompletionHandler<V,? super A>) line: 560
 UnixAsynchronousSocketChannelImpl(AsynchronousSocketChannelImpl).read(boolean, ByteBuffer, ByteBuffer[], long, TimeUnit, A, CompletionHandler<V,? super A>) line: 277
 UnixAsynchronousSocketChannelImpl(AsynchronousSocketChannelImpl).read(ByteBuffer, long, TimeUnit, A, CompletionHandler<Integer,? super A>) line: 298
 AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(ByteBuffer, long, TimeUnit, A, CompletionHandler<Integer,? super A>) line: 137
 AsynchronousSocketChannelStream(AsynchronousChannelStream).readAsync(int, AsyncCompletionHandler<ByteBuf>) line: 105
 InternalStreamConnection.readAsync(int, SingleResultCallback<ByteBuf>) line: 507
 InternalStreamConnection.access$1000(InternalStreamConnection, int, SingleResultCallback) line: 74
 InternalStreamConnection$MessageHeaderCallback.onResult(ByteBuf, Throwable) line: 624
 InternalStreamConnection$MessageHeaderCallback.onResult(Object, Throwable) line: 609
 InternalStreamConnection$5.completed(ByteBuf) line: 510
 InternalStreamConnection$5.completed(Object) line: 507
 AsynchronousChannelStream$BasicCompletionHandler.completed(Integer, Void) line: 220
 AsynchronousChannelStream$BasicCompletionHandler.completed(Object, Object) line: 203
 Invoker.invokeUnchecked(CompletionHandler<V,? super A>, A, V, Throwable) line: 127
 UnixAsynchronousSocketChannelImpl.finishRead(boolean) line: 437
 UnixAsynchronousSocketChannelImpl.finish(boolean, boolean, boolean) line: 191
 UnixAsynchronousSocketChannelImpl.onEvent(int, boolean) line: 213
 EPollPort$EventHandlerTask.run() line: 306
 AsynchronousChannelGroupImpl$1.run() line: 112
 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1128
 ThreadPoolExecutor$Worker.run() line: 628
 Thread.run() line: 834

And with stepping over this gets to to the point in my original post:

Daemon Thread [my-thread] (Suspended)
 SingleResultCallbackSubscription<TResult>(AbstractSubscription<TResult>).tryProcessResultsQueue() line: 161
 SingleResultCallbackSubscription$1.onResult(TResult, Throwable) line: 48
 MongoCollectionImpl$4.onResult(BulkWriteResult, Throwable) line: 646
 MongoCollectionImpl$4.onResult(Object, Throwable) line: 640
 MongoCollectionImpl$9.onResult(BulkWriteResult, Throwable) line: 1043
 MongoCollectionImpl$9.onResult(Object, Throwable) line: 1027
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 OperationExecutorImpl$2$1.onResult(T, Throwable) line: 117
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 OperationHelper$ConnectionReleasingWrappedCallback<T>.onResult(T, Throwable) line: 384
 MixedBulkWriteOperation.addBatchResult(BsonDocument, AsyncWriteBinding, AsyncConnection, BulkWriteBatch, boolean, ConnectionReleasingWrappedCallback<BulkWriteResult>) line: 506
 MixedBulkWriteOperation.access$1500(MixedBulkWriteOperation, BsonDocument, AsyncWriteBinding, AsyncConnection, BulkWriteBatch, boolean, OperationHelper$ConnectionReleasingWrappedCallback) line: 70
 MixedBulkWriteOperation$6.onResult(BsonDocument, Throwable) line: 486
 MixedBulkWriteOperation$6.onResult(Object, Throwable) line: 458
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 DefaultServer$DefaultServerProtocolExecutor$2.onResult(T, Throwable) line: 227
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 CommandProtocolImpl$1.onResult(T, Throwable) line: 85
 DefaultConnectionPool$PooledConnection$1.onResult(T, Throwable) line: 461
 UsageTrackingInternalConnection$2.onResult(T, Throwable) line: 111
 ErrorHandlingResultCallback<T>.onResult(T, Throwable) line: 49
 InternalStreamConnection$2$1.onResult(ResponseBuffers, Throwable) line: 395
 InternalStreamConnection$2$1.onResult(Object, Throwable) line: 372
 InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(ByteBuf, Throwable) line: 667
 InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(Object, Throwable) line: 634
 InternalStreamConnection$5.completed(ByteBuf) line: 510
 InternalStreamConnection$5.completed(Object) line: 507
 AsynchronousChannelStream$BasicCompletionHandler.completed(Integer, Void) line: 220
 AsynchronousChannelStream$BasicCompletionHandler.completed(Object, Object) line: 203
 Invoker.invokeUnchecked(CompletionHandler<V,? super A>, A, V, Throwable) line: 127
 Invoker.invokeDirect(GroupAndInvokeCount, CompletionHandler<V,? super A>, A, V, Throwable) line: 158
 UnixAsynchronousSocketChannelImpl.implRead(boolean, ByteBuffer, ByteBuffer[], long, TimeUnit, A, CompletionHandler<V,? super A>) line: 560
 UnixAsynchronousSocketChannelImpl(AsynchronousSocketChannelImpl).read(boolean, ByteBuffer, ByteBuffer[], long, TimeUnit, A, CompletionHandler<V,? super A>) line: 277
 UnixAsynchronousSocketChannelImpl(AsynchronousSocketChannelImpl).read(ByteBuffer, long, TimeUnit, A, CompletionHandler<Integer,? super A>) line: 298
 AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(ByteBuffer, long, TimeUnit, A, CompletionHandler<Integer,? super A>) line: 137
 AsynchronousSocketChannelStream(AsynchronousChannelStream).readAsync(int, AsyncCompletionHandler<ByteBuf>) line: 105
 InternalStreamConnection.readAsync(int, SingleResultCallback<ByteBuf>) line: 507
 InternalStreamConnection.access$1000(InternalStreamConnection, int, SingleResultCallback) line: 74
 InternalStreamConnection$MessageHeaderCallback.onResult(ByteBuf, Throwable) line: 624
 InternalStreamConnection$MessageHeaderCallback.onResult(Object, Throwable) line: 609
 InternalStreamConnection$5.completed(ByteBuf) line: 510
 InternalStreamConnection$5.completed(Object) line: 507
 AsynchronousChannelStream$BasicCompletionHandler.completed(Integer, Void) line: 220
 AsynchronousChannelStream$BasicCompletionHandler.completed(Object, Object) line: 203
 Invoker.invokeUnchecked(CompletionHandler<V,? super A>, A, V, Throwable) line: 127
 UnixAsynchronousSocketChannelImpl.finishRead(boolean) line: 437
 UnixAsynchronousSocketChannelImpl.finish(boolean, boolean, boolean) line: 191
 UnixAsynchronousSocketChannelImpl.onEvent(int, boolean) line: 213
 EPollPort$EventHandlerTask.run() line: 306
 AsynchronousChannelGroupImpl$1.run() line: 112
 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1128
 ThreadPoolExecutor$Worker.run() line: 628
 Thread.run() line: 834

I don't know if that will work for you. Please tell me if you need anything more. As I said, if this is not enough and a reproduction is needed, this will take longer. However, debugging is easy for me so if you want, I could tell you the values of variables, etc, if that would help you figure it out.

Comment by Ross Lawley [ 23/Apr/19 ]

Hi alien,

This is a strange one and I can't reproduce the issue. Could it be onError is not being due to onComplete throwing an error? That would set the isTerminated to true, so when the exception is caught onError wouldn't actually trigger.

Otherwise a stack trace would and/or a repo case would be super helpful.

Ross
 

Comment by Boris Petrov [ 16/Apr/19 ]

Thanks! A new version of the "mongodb-driver-reactivestreams" driver should also be released I guess, right?

Comment by Ross Lawley [ 16/Apr/19 ]

Hi alien,

Thanks for the ticket. I'll review in due course and a fix should be released with the next version of the Java driver.

Ross

Generated at Thu Feb 08 08:59:11 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.