ChangeStreamPublisher<Document> changeStreamPublisher = mongoCollection.watch();
|
changeStreamPublisher.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
|
private Subscription s;
|
|
@Override
|
public void onSubscribe(final Subscription s) {
|
this.s = s;
|
s.request(1);
|
}
|
|
@Override
|
public void onNext(final ChangeStreamDocument<Document> doc) {
|
if (true) throw new OutOfMemoryError();//or RuntimeException
|
}
|
...
|
});
|
Note that AsyncQueryBatchCursor always releases the connection before calling Subscriber.onNext. If Subscriber.onNext throws an exception before/after calling Subscription.request, then
- The Reactor logs the exception in some cases (e.g., OOM is logged, RuntimeException is not), neither Subscriber.onNext nor Subscriber.onError are called (this is fine according to Subscriber rule 13).
- BatchCursor.close is not called - this is a problem.
- The server cursor is not killed by AsyncQueryBatchCursor - this is a problem.
The problem was initially discovered as JAVA-3907 scenario 3).
|