[JAVA-3791] onNext errors should not throw if the subscription has been cancelled. Created: 17/Jul/20  Updated: 12/Apr/21  Resolved: 19/Nov/20

Status: Closed
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: 4.0.3, 4.0.4
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Yue Gu Assignee: Ross Lawley
Resolution: Duplicate Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Depends
depends on JAVA-3813 Replace internal use of callbacks wit... Closed
Duplicate
is duplicated by JAVA-3813 Replace internal use of callbacks wit... Closed

 Description   

Below is a short description of my trouble, if it make any annoyance, I could write some code to repeat the problem.

When a MongoConverter threw an fatal exception(`reactor.core#Exceptions.throwIfFatal`), `com.mongodb.reactivestreams.client.internal#AbstractSubscription.onNext` catched the throwable, and check if subscriber is terminated to invoke the `onError` method, but the subscriber was canceled at `reactor.core.publisher#onNext`, as a result, nothing was passed to userland code.

I'm not sure it's the bug of reactive mongodb project or reactor project, but I believe it's not user's expecting. Please check it and at least tell me how resolve this.



 Comments   
Comment by Yue Gu [ 12/Apr/21 ]

I retried to execute above case, updated dependencies to:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-mongodb</artifactId>
    <version>3.1.7</version>
    <exclusions>
        <exclusion>
            <artifactId>mongodb-driver-core</artifactId>
            <groupId>org.mongodb</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
    <version>4.2.3</version>
    <exclusions>
        <exclusion>
            <artifactId>reactor-core</artifactId>
            <groupId>io.projectreactor</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.4</version>
</dependency>

 Looks like nothing was fixed.

Comment by Ross Lawley [ 19/Nov/20 ]

The work in JAVA-3813 changes how the reactive streams driver works as such this issue has been refactored away and there is no more AbstractSubscription and internally the driver will also use Mono/Flux adapters.

Comment by Ross Lawley [ 05/Aug/20 ]

Thanks h1023263204@gmail.com, I was able to reproduce the issue and can see that we can improve and ensure onNext doesnt throw an exception.

Comment by Ross Lawley [ 05/Aug/20 ]

I think the issue here is MonoNext has the following sequence of events:

@Override
public void onNext(T t) {
    if (done) {
        Operators.onNextDropped(t, actual.currentContext());
        return;
    }
 
    s.cancel();
    actual.onNext(t);
    onComplete();
}

A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.

_The intent of this rule is to highlight that there may be a delay between calling cancel and the Publisher observing that cancellation.
_

Calling onNext causes a failure but due to the subscription being cancelled onError throws rather than calling the subscribers onError.

Comment by Ross Lawley [ 27/Jul/20 ]

Thanks for the repo - will review.

Comment by Yue Gu [ 24/Jul/20 ]

import com.mongodb.reactivestreams.client.MongoClient;import com.mongodb.reactivestreams.client.MongoClients;
import java.util.HashMap;
import java.util.Map;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
 
/**
 * @since 2020/7/24, Fri
 */
public class ReactiveMongodbSubscribe {
 
    public static void main(String... args) {
        try (MongoClient client = MongoClients.create()) {
            ReactiveMongoTemplate template = new ReactiveMongoTemplate(client, "test");
 
            String tel = "111111";
            var data = new HashMap<>(Map.of("tel", tel, "child", Map.of("name", "Lily")));
            template.save(data, template.getCollectionName(Family.class)).block();
 
            template.findOne(Query.query(Criteria.where("tel").is(tel)), Family.class)
                .doOnError(Throwable::printStackTrace)
                .block();
        }
    }
 
    public static abstract class Child {
        private String name = "Unknown";
 
        public String getName() {
            return name;
        }
 
        public void setName(String name) {
            this.name = name;
        }
    }
 
    public static final class Son extends Child {
 
    }
 
    public static final class Family {
 
        private String tel = "000000";
 
        private Child child = new Son();
 
        public String getTel() {
            return tel;
        }
 
        public void setTel(String tel) {
            this.tel = tel;
        }
 
        public Child getChild() {
            return child;
        }
 
        public void setChild(Child child) {
            this.child = child;
        }
    }
}

This is a sample of my question.  Application will hang up and my doOnError callback cannot be invoked.

If you use maven:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-mongodb</artifactId>
    <version>3.0.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
    <version>4.0.3</version>
</dependency>

Comment by Ross Lawley [ 22/Jul/20 ]

Hi h1023263204@gmail.com,

You seem to be using a layer above the reactive streams driver, all I can comment on is the reactive streams driver appears to be working as design and as to specification.

Without a general reproduction case of the error, I can't see where the fault is or where the bug actually is.

Ross

 

Comment by Yue Gu [ 22/Jul/20 ]

Thanks for your reply.

Actually, there is a signal sent by publisher but which was not sent to it's subscriber and onError wasn't called because of something wrong. When the MappingMongoConverter threw an InstantiationError while converting mongodb document to java object, the error will be logged but the subscriber can not get any signal.

If this behavior works as designed, please tell me how to do to receive the error signal when error happened.

 

Comment by Ross Lawley [ 21/Jul/20 ]

For the reasons stated above, I'm closing this ticket as "works as designed"

 

Comment by Ross Lawley [ 21/Jul/20 ]

I believe this is behaviour correct to specification:

In response to a call to Publisher.subscribe(Subscriber) the possible invocation sequences for methods on the Subscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signalled, followed by a possibly unbounded number of onNext signals (as requested by Subscriber) followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available—all as long as the Subscription is not cancelled.

I think you may need to look at the logic in your application.

Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription.

So for signalling errors the only way to signal an error should be via onError.

I hope that helps,

Ross

Generated at Thu Feb 08 09:00:26 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.