[JAVA-4304] Propagate exceptions thrown in map or flatmap for Observable Created: 16/Sep/21  Updated: 28/Oct/23  Resolved: 04/Oct/21

Status: Closed
Project: Java Driver
Component/s: Scala
Affects Version/s: None
Fix Version/s: 4.3.3

Type: Bug Priority: Unknown
Reporter: Colin Fairless Assignee: Ross Lawley
Resolution: Fixed Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
is related to JAVA-4524 Observable.map calls onComplete after... Closed

 Description   

mongo-scala-driver: 4.3.1

exceptions thrown in map or flatmap for Observable are not propagated up.

take for example

val obs: Future[Unit] =
  (for {
      _ <- collection.insert(BsonDocument())
      _ = sys.error("Boom")
    } yield ()
   ).toFuture()

this future will never complete and lead to a timeout.

Only exceptions thrown in an Observable are propagated up. e.g. using a `raiseError` defined as such

def raiseError[A](e: Throwable) = new Observable[A] {
  val delegate: Observable[Unit] = Observable[Unit](Seq(()))
     override def subscribe(observer: Observer[_ >: A]): Unit =
       delegate.subscribe(
         new Observer[Unit] {
           override def onError(throwable: Throwable): Unit =
             observer.onError(throwable)

          override def onSubscribe(sub: Subscription): Unit =
             observer.onSubscribe(sub)

          override def onComplete(): Unit =
             () // not reached

          override def onNext(tResult: Unit): Unit =
             onError(e)
         }
       )
  }

 

It would seem resolvable by changing `onNext` in MapObservable  to

override def onNext(tResult: T): Unit =
  try {
     observer.onNext(s(tResult))
   } catch {
     case t: Throwable => observer.onError(f(t))
   }

and FlatMapObservable similarly.



 Comments   
Comment by Githook User [ 04/Oct/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Scala Observables fix (#787)

Should handle errors from user functions gracefully

JAVA-4304
Branch: 4.3.x
https://github.com/mongodb/mongo-java-driver/commit/8a3e4db7325eb071f939fa0438d31d53910092fa

Comment by Githook User [ 04/Oct/21 ]

Author:

{'name': 'Ross Lawley', 'email': 'ross.lawley@gmail.com', 'username': 'rozza'}

Message: Scala Observables fix (#787)

Should handle errors from user functions gracefully

JAVA-4304
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/ecaddcb14c48c9528542230554472b7d656c117e

Comment by Ross Lawley [ 24/Sep/21 ]

Thanks colin.fairless@digital.hmrc.gov.uk for reporting this. I've put a fix in for code review.

Thanks again,

Ross

Comment by Ross Lawley [ 24/Sep/21 ]

PR: https://github.com/mongodb/mongo-java-driver/pull/787

Comment by Ross Lawley [ 24/Sep/21 ]

The Reactive Streams specification: section 2.13 states:

Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

So it is the responsibility of the Observable (Publisher) to behave correctly and so should call onError if the mapping function fails.

The following example using reactor behaves that way:

 
        Flux.fromIterable(asList(1, 2, 3))
                .map(i -> {
                            if (i % 2 == 0) {
                                throw new IllegalArgumentException("Even numbers are banned");
                            }
                            return i.toString();
                        }
                ).doOnNext(s -> System.out.println("onNext called: " + s))
                .doOnError(e -> System.out.println("onError called: " + e.getMessage()))
                .doOnComplete(() -> System.out.println("OnComplete called"))
                .collectList()
                .block();

Generated at Thu Feb 08 09:01:44 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.