I'm working with Reactive Streams driver and Akka Streams. In my application I have stream which iterates over collections and databases.
But once in a while my application is reporting error from stream (and driver):
akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16 at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$dequeue(ActorInterpreter.scala:55) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na] at akka.stream.impl.SubReceive.apply(Transfer.scala:16) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at akka.stream.impl.SubReceive.apply(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[org.scala-lang.scala-library-2.11.7.jar:na] at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[org.scala-lang.scala-library-2.11.7.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:220) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na] Caused by: java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na] at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:104) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na] at com.mongodb.async.client.MappingAsyncBatchCursor.setBatchSize(MappingAsyncBatchCursor.java:62) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na] at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:86) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na] at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:197) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na] at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:84) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na] at com.mongodb.reactivestreams.client.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) ~[org.mongodb.mongodb-driver-reactivestreams-1.1.0.jar:na] at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na] ... 19 common frames omitted
It looks like error from closed BatchCursor is not emitted as onError but thrown in Subscription.request.
This is violation of rule 3.16 of Reactive Streams and results in failure of whole stream (I don't see any possible way to catch it).
Unfortunately I can't reproduce this BatchCursor problem with test (I'm not sure why cursor was closed). But the same problem can occur when I try to run stream on publisher created from closed mongo client.