Subscription.request throws exception when state is open

XMLWordPrintableJSON

    • Type: Bug
    • Resolution: Duplicate
    • Priority: Major - P3
    • 3.2.0
    • Affects Version/s: None
    • Component/s: Async
    • None
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      I'm working with Reactive Streams driver and Akka Streams. In my application I have stream which iterates over collections and databases.
      But once in a while my application is reporting error from stream (and driver):

      akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
      	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$dequeue(ActorInterpreter.scala:55) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.stream.impl.SubReceive.apply(Transfer.scala:16) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.stream.impl.SubReceive.apply(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[org.scala-lang.scala-library-2.11.7.jar:na]
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na]
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na]
      Caused by: java.lang.IllegalStateException: state should be: open
      	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
      	at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:104) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
      	at com.mongodb.async.client.MappingAsyncBatchCursor.setBatchSize(MappingAsyncBatchCursor.java:62) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:86) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:197) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:84) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
      	at com.mongodb.reactivestreams.client.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) ~[org.mongodb.mongodb-driver-reactivestreams-1.1.0.jar:na]
      	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
      	... 19 common frames omitted
      

      It looks like error from closed BatchCursor is not emitted as onError but thrown in Subscription.request.
      This is violation of rule 3.16 of Reactive Streams and results in failure of whole stream (I don't see any possible way to catch it).

      Unfortunately I can't reproduce this BatchCursor problem with test (I'm not sure why cursor was closed). But the same problem can occur when I try to run stream on publisher created from closed mongo client.

            Assignee:
            Unassigned
            Reporter:
            Michal B.
            None
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: