[JAVA-3976] If Subscriber.onNext throws an exception, then BatchCursor.close is not called, AsyncQueryBatchCursor does not kill the server cursor Created: 29/Jan/21  Updated: 15/Feb/21  Resolved: 15/Feb/21

Status: Closed
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: None
Fix Version/s: None

Type: Improvement Priority: Major - P3
Reporter: Valentin Kavalenka Assignee: Ross Lawley
Resolution: Done Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Issue split
split from JAVA-3907 AsyncQueryBatchCursor does not releas... Closed

 Description   

ChangeStreamPublisher<Document> changeStreamPublisher = mongoCollection.watch();
changeStreamPublisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
    private Subscription s;
 
    @Override
    public void onSubscribe(final Subscription s) {
        this.s = s;
        s.request(1);
    }
 
    @Override
    public void onNext(final ChangeStreamDocument<Document> doc) {
        if (true) throw new OutOfMemoryError();//or RuntimeException
    }
    ...
});

Note that AsyncQueryBatchCursor always releases the connection before calling Subscriber.onNext. If Subscriber.onNext throws an exception before/after calling Subscription.request, then

  • The Reactor logs the exception in some cases (e.g., OOM is logged, RuntimeException is not), neither Subscriber.onNext nor Subscriber.onError are called (this is fine according to Subscriber rule 13).
  • BatchCursor.close is not called - this is a problem.
  • The server cursor is not killed by AsyncQueryBatchCursor - this is a problem.

The problem was initially discovered as JAVA-3907 scenario 3).



 Comments   
Comment by Githook User [ 15/Feb/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: BatchCursor close regression tests

Ensure sessions are returned if the batch cursor onNext call results in
an error.

JAVA-3976

Comment by Valentin Kavalenka [ 29/Jan/21 ]

We need Reactor to notify the driver when it decides that a subscription is terminated as a result of Subscriber.onNext throwing an exception. It appears that Flux.onError* methods (and maybe Flux.doOnTerminate, Flux.doOnError) are the levers we need to pull for achieving the desired behavior. However, I did not have any success as a result of playing with these methods in BatchCursorPublisher.subscribe.

Generated at Thu Feb 08 09:00:55 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.