Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-2039

Subscription.request throws exception when state is open

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • 3.2.0
    • Affects Version/s: None
    • Component/s: Async
    • None

      I'm working with Reactive Streams driver and Akka Streams. In my application I have stream which iterates over collections and databases.
      But once in a while my application is reporting error from stream (and driver):

      akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
      	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$dequeue(ActorInterpreter.scala:55) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.stream.impl.SubReceive.apply(Transfer.scala:16) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.SubReceive.apply(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na]
      Caused by: java.lang.IllegalStateException: state should be: open
      	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
      	at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:104) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
      	at com.mongodb.async.client.MappingAsyncBatchCursor.setBatchSize(MappingAsyncBatchCursor.java:62) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:86) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:197) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:84) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.reactivestreams.client.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) ~[org.mongodb.mongodb-driver-reactivestreams-1.1.0.jar:na]
      	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	... 19 common frames omitted
      

      It looks like error from closed BatchCursor is not emitted as onError but thrown in Subscription.request.
      This is violation of rule 3.16 of Reactive Streams and results in failure of whole stream (I don't see any possible way to catch it).

      Unfortunately I can't reproduce this BatchCursor problem with test (I'm not sure why cursor was closed). But the same problem can occur when I try to run stream on publisher created from closed mongo client.

            Assignee:
            Unassigned Unassigned
            Reporter:
            michalbogacz@gmail.com Michal B.
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: