[JAVA-4303] Improve map/flatmap of Publisher[Void] Created: 16/Sep/21  Updated: 05/Jan/24  Resolved: 05/Jan/24

Status: Closed
Project: Java Driver
Component/s: Reactive Streams
Affects Version/s: None
Fix Version/s: 5.0.0

Type: Improvement Priority: Minor - P4
Reporter: Colin Fairless Assignee: Valentin Kavalenka
Resolution: Done Votes: 0
Labels: external-user
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Documented
Duplicate
is duplicated by JAVA-4311 Observable[Void] cannot be composed u... Closed
Related
related to JAVA-4345 Add .completeWithUnit helper for Obse... Closed
related to JAVA-4322 Observable<T> toFuture() implicit ret... Backlog
is related to JAVA-3398 Consider Publisher<Void> instead of P... Closed
Epic Link: 5.0 Breaking Changes
Quarter: FY24Q4
Backwards Compatibility: Major Change
Documentation Changes: Needed
Documentation Changes Summary:

There are multiple changes that break compile-time compatibility (the source code will likely have to be changed to work the the upgraded driver version). Let's mention them in What's new:

  • `org.reactivestreams.Publisher[Void]` is no longer automatically convertible to `org.mongodb.scala.SingleObservable[Void]`, because we removed the `org.mongodb.scala.ObservableImplicits.ToSingleObservableVoid` implicit class.
  • Our API no longer exposes `org.mongodb.scala.Observable[Void]`. Instead, it exposes `org.mongodb.scala.Observable[Unit]`. The documentation of the `Observable` trait explains the difference.

There is also a non-breaking change that we should mention in What's new: we deprecated the method `org.mongodb.scala.Observable.completeWithUnit`, and plan to remove it in a major release.


 Description   

In 5.0.0 for the Scala driver use Observable[Unit] instead of Observable[Void] to allow for composable Observables.

-------

Was:

mongo-scala-driver: 4.3.1

The implementation of MapObservable and FlatMapObservable in the scala driver can lead to unexpected results with `Publisher[Void]`.

Since Void is not instantiable, the impact on the monadic functions can be missed and lead to bugs. i.e. map/flatmap called on a `Publisher[Void]` will never be invoked.

This may lead to bugs in clients which are not aware of this. For example

val obs: Observable[List] = 
  for {
     _ <- session.commitTransaction() // Publisher[Void]
     _ = println("Transaction committed") // never seen
 } yield List(1, 2, 3) // never reached

will result in an empty List.

while we can work around this by wrapping every Publisher[Void] with our `completeWith(obs, ()): Observable[Unit]`
where

def completeWith[A](obs: Observable[Void], f: => A): Observable[A] =
   new Observable[A] {
    override def subscribe(observer: Observer[_ >: A]): Unit =
       obs.subscribe(
          new Observer[Void] {
            override def onError(throwable: Throwable): Unit =
            observer.onError(throwable)
 
            override def onSubscribe(subscription: Subscription): Unit =
               observer.onSubscribe(subscription)
 
             override def onComplete(): Unit = {
               observer.onNext(f)
               observer.onComplete()
            }
 
            override def onNext(tResult: Void): Unit =
              ??? // by definition never called
     }
   )

It would seem safer for clients if the MapObservable/FlatMapObservables threw exceptions if onComplete was called without calling onNext.

An alternative may be to replace occurences of `Publisher[Void]` with `Publisher[Unit]` so that clients can use map/flatMap to react to completion (rather than being forced to wrap or use the reactive API (e.g. `obs.subscribe` )

 



 Comments   
Comment by Githook User [ 05/Jan/24 ]

Author:

{'name': 'Valentin Kovalenko', 'email': 'valentin.kovalenko@mongodb.com', 'username': 'stIncMale'}

Message: Expose `Observable[Unit]` instead of `Observable[Void]` (#1282)

JAVA-4303
Branch: master
https://github.com/mongodb/mongo-java-driver/commit/e78a2dcf73b6eec2c77fe647899c8d29a46e5216

Comment by Ross Lawley [ 12/Oct/21 ]

Hi colin.fairless@digital.hmrc.gov.uk,

Marked this for 5.0.0 and added JAVA-4345 for the next minor release.

Ross

Comment by Colin Fairless [ 12/Oct/21 ]

Hi Ross,

I'm glad this will be considered for the 5.0.0 major release.

I think an observable.toUnit (or .completeWithUnit) helper could be useful in the meantime. Thanks.

Comment by Ross Lawley [ 11/Oct/21 ]

Hi colin.fairless@digital.hmrc.gov.uk,

I agree and think with hindsight the Observable[Unit] api makes more sense. That said without introducing a breaking change I'm unsure how best to solve this before the next major release (5.0) which may be some time out.

Would a Observable.toUnit helper be of any use? I tried to see if I could implicitly handle Observable[Void] in the composable helper but was unable to find a quick solution.

Ross

Comment by Colin Fairless [ 28/Sep/21 ]

Hi Ross Lawley,

Thanks for looking at this.

I think that the functions being called which return Observable[Void] (e.g. `commitTransaction`) are being called for their side-effects, not for short-circuiting behaviour, and the short-circuiting is surprising if not looking closely at the type. It currently requires wrapping with a custom completeWith function in order to compose with other Observables (with map/flatMap). I think using Observable[Unit] instead would be more intuitive, and lead to less bugs.

Comment by Ross Lawley [ 28/Sep/21 ]

Hi colin.fairless@digital.hmrc.gov.uk,

This is a tricky one, an Observable[Void] is essentially an empty Observable and an Observable can be thought of as an asynchronous sequence.

Take the following code:

    val syncSequences = for {
      _ <- Seq(3, 2, 1)
      _ <- Seq()
      x <- Seq(1, 2, 3)
    } yield x
 
    println(s"syncSequences:  $syncSequences")
 
outputs: syncSequences:  List()

The argument can be made that the async observable with an empty observable acts as expected. eg:

    val resultObservable: Observable[_] = for {
      _ <- obsInteger
      _ <- obsVoid
      y <- obsInteger
    } yield y
 
    resultObservable.subscribe(x => println(s"onNext: -> $x"), e => println(s"Error: $e"), () => println("Complete"))
 
outputs: Complete

However, I can also see that having an Observable[Unit] signature would be more user friendly and could be proposed for the next major version of the driver: 5.0.0

Comment by Ross Lawley [ 24/Sep/21 ]

Hi colin.fairless@digital.hmrc.gov.uk,

Thanks for the ticket. I'm investigating the best path forward here.

Just to note:

It would seem safer for clients if the MapObservable/FlatMapObservables threw exceptions if onComplete was called without calling onNext.

 
Would be against the reactive streams specification. So will have to look for an alternative approach.

Comment by Colin Fairless [ 16/Sep/21 ]

I can't edit the ticket to escape the { in the code examples. (it's added in "Unknown macro: ").. Hope they are still intelligible.

Generated at Thu Feb 08 09:01:44 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.