[JAVA-2039] Subscription.request throws exception when state is open Created: 17/Nov/15  Updated: 13/Apr/16  Resolved: 13/Apr/16

Status: Closed
Project: Java Driver
Component/s: Async
Affects Version/s: None
Fix Version/s: 3.2.0

Type: Bug Priority: Major - P3
Reporter: Michal B. Assignee: Unassigned
Resolution: Duplicate Votes: 0
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
Related

 Description   

I'm working with Reactive Streams driver and Akka Streams. In my application I have stream which iterates over collections and databases.
But once in a while my application is reporting error from stream (and driver):

akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at akka.stream.impl.fusing.BatchingActorInputBoundary.akka$stream$impl$fusing$BatchingActorInputBoundary$$dequeue(ActorInterpreter.scala:55) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[org.scala-lang.scala-library-2.11.7.jar:na]
	at akka.stream.impl.SubReceive.apply(Transfer.scala:16) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at akka.stream.impl.SubReceive.apply(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[org.scala-lang.scala-library-2.11.7.jar:na]
	at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[org.scala-lang.scala-library-2.11.7.jar:na]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at akka.actor.ActorCell.invoke(ActorCell.scala:487) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at akka.dispatch.Mailbox.run(Mailbox.scala:220) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [com.typesafe.akka.akka-actor_2.11-2.3.14.jar:na]
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na]
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na]
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na]
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na]
Caused by: java.lang.IllegalStateException: state should be: open
	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
	at com.mongodb.operation.AsyncQueryBatchCursor.setBatchSize(AsyncQueryBatchCursor.java:104) ~[org.mongodb.mongodb-driver-core-3.1.0.jar:na]
	at com.mongodb.async.client.MappingAsyncBatchCursor.setBatchSize(MappingAsyncBatchCursor.java:62) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
	at com.mongodb.async.client.MongoIterableSubscription.requestMoreData(MongoIterableSubscription.java:86) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
	at com.mongodb.async.client.AbstractSubscription.processResultsQueue(AbstractSubscription.java:197) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
	at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:84) ~[org.mongodb.mongodb-driver-async-3.1.0.jar:na]
	at com.mongodb.reactivestreams.client.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48) ~[org.mongodb.mongodb-driver-reactivestreams-1.1.0.jar:na]
	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0.jar:na]
	... 19 common frames omitted

It looks like error from closed BatchCursor is not emitted as onError but thrown in Subscription.request.
This is violation of rule 3.16 of Reactive Streams and results in failure of whole stream (I don't see any possible way to catch it).

Unfortunately I can't reproduce this BatchCursor problem with test (I'm not sure why cursor was closed). But the same problem can occur when I try to run stream on publisher created from closed mongo client.



 Comments   
Comment by Ross Lawley [ 10/Dec/15 ]

Thanks michalbogacz@gmail.com,

Good news, I'm marking this as fixed as I believe the protection we put in will make the errors described call the onError callback.

Thanks for you help on this ticket.

Ross

Comment by Michal B. [ 10/Dec/15 ]

I upgraded driver to newest version. I didn't saw this problem since last time I reported it.
I think you can close this ticket because the main issue "Subscription.request throws exception" is resolved.
If problem with closed cursor will occur I will make new ticket.

Thank you for your help.

Best regards,
Michal

Comment by Ross Lawley [ 01/Dec/15 ]

I think JAVA-2048 will have fixed this issue - although manually changing the batchSize on a closed AsyncBatchCursor will still throw and exception.

I'll await any further information about the cause of the closed cursor before proceeding further on this ticket.

Comment by Michal B. [ 26/Nov/15 ]

Exactly the same error but not with version rc0. I upgraded driver to rc0 today.
As soon as I will get some results (error logs), I will post them here.

Michal

Comment by Ross Lawley [ 26/Nov/15 ]

Thanks michalbogacz@gmail.com, just to double check was it the same batchCursor error? Also, did you upgrade to the rc0?

Ross

Comment by Michal B. [ 25/Nov/15 ]

I wrote last comment too early. The problem occurred today on one of our environments.
Still it's somehow connected to server upgrade since is has been 5 days without it.

Comment by Michal B. [ 25/Nov/15 ]

Hi Ross,
I think I've found what was the real problem. It's not driver, problem was with the server.
We were using old MongoDB server version 3.0.3. After upgrade to 3.0.7 I don't see any errors in logs. Since stream is running every 5 minutes and from last 5 days everything looks ok, I'm pretty confident that it was the root cause.

Thank you for all support and sorry for taking your time.

Best regards,
Michal

Comment by Ross Lawley [ 24/Nov/15 ]

michalbogacz@gmail.com,

I've updated the Reactive Streams driver to ensure that the first case is handled - see JAVARS-15. It has been released in 1.2.0-rc0.

I'd still like to understand why the cursor was closed and that resulting error before actioning this ticket.

Ross

Comment by Michal B. [ 18/Nov/15 ]

About first example:
If see possibility that many people will face problem with closing client too fast. Example: You create mongo client per stream. After stream will complete mongo client is closed. But if stream will have bad design, it's possible to close it prematurely and get this error.
Other way of usage is share client within many processes and stream. Then if some actor will accidentally close client, stream will start with this error. So it would be nice to have additional method to check if client is closed (I didn't found any method for this).

About second:
No other process have access to the client. It's created before start of the stream and closed in the end.
What are possible ways of closing cursor ? Is there some configuration that could close it ? maybe time out or too many connections/cursors open?

My client configuration is:
serverSelectionTimeout = 1s
maxConnectionThreadPool = 100
maxWaitQueueSize = 5000

Other config it not changed.
Total number of collections is about 4000.

Michal

Comment by Ross Lawley [ 18/Nov/15 ]

Thanks for the examples.

The first example reproduces the issue - but is currently expected behaviour. You can't close the client and expect it to use it, although the notNull checks are done outside of callbacks, and potentially this could be improved upon.

Your second example looks fine but something is closing the client and I'd like to understand that more, to see if there is another issue being hidden here. Please let me know if you can identify it in your tests. Does any other process have access to the client?

Ross

Comment by Michal B. [ 17/Nov/15 ]

Hi Ross,

I managed to only reproduce similar problem with this (scala) code

val client = MongoClients.create("mongodb://localhost:27017")
client.close()
Source(client.listDatabaseNames()).runForeach{
        s=> println(s)
}

result stacktrace:

akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111)
	at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.onSubscribe(ActorGraphInterpreter.scala:175)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:374)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:291)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: state should be: open
	at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
	at com.mongodb.connection.BaseCluster.selectServerAsync(BaseCluster.java:127)
	at com.mongodb.binding.AsyncClusterBinding.getAsyncClusterBindingConnectionSource(AsyncClusterBinding.java:75)
	at com.mongodb.binding.AsyncClusterBinding.getReadConnectionSource(AsyncClusterBinding.java:65)
	at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:211)
	at com.mongodb.operation.ListDatabasesOperation.executeAsync(ListDatabasesOperation.java:111)
	at com.mongodb.async.client.MongoClientImpl$2.execute(MongoClientImpl.java:98)
	at com.mongodb.async.client.OperationIterable.batchCursor(OperationIterable.java:125)
	at com.mongodb.async.client.ListDatabasesIterableImpl.batchCursor(ListDatabasesIterableImpl.java:85)
	at com.mongodb.async.client.MappingIterable.batchCursor(MappingIterable.java:100)
	at com.mongodb.async.client.MongoIterableSubscription.requestInitialData(MongoIterableSubscription.java:48)
	at com.mongodb.async.client.AbstractSubscription.request(AbstractSubscription.java:82)
	at com.mongodb.reactivestreams.client.ObservableToPublisher$1$1.request(ObservableToPublisher.java:48)
	at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110)
	... 13 more

This code doesn't prove batch exception but shows that violation of rule is possible.
Like you see, there is no my interference in creation of stream. I just use Source( somePublisher ).

What I'm trying to get in my code is status of all collections in all databases. I can't show code because of restrictions but it bases on approach from below:

      val client = MongoClients.create("mongodb://localhost:27017")
      
      Source(client.listDatabaseNames())
        .map(dbName => client.getDatabase(dbName))
        .flatMapConcat(db => Source(db.listCollectionNames()))
        .mapAsync(5){
          ///runCommand collStats 
        }.runForeach {
          s => println(s)
        }

The environment where it runs is nondeterministic. It can change really fast (creating and removing databases and collections).
In logs I can see that most of the time this solution is working. But some times I only get around 70% of all status data and this exception which ends all processing.

I'm still trying to figure out why there is close on BatchCursor. Still even if something like this happen I would like to have my application resilient and not stop whole stream because of one error (and this error stops whole stream).

Comment by Ross Lawley [ 17/Nov/15 ]

Thanks michalbogacz@gmail.com for the ticket.

We're looking into it, can you provide an example of how you are using the BatchCursor? I'm trying to understand how it was closed in the first place and if that is an issue.

Until the cursor is turned into a reactive stream by making it a Publisher, it does check its state and as you've seen it can throw exceptions certain expectations aren't met. Do you have any sample code reproducing this issue?

Ross

Comment by Jeffrey Yemin [ 17/Nov/15 ]

Moved to JAVA project until we determine that the issue is actually with the reactive streams code itself.

Generated at Thu Feb 08 08:56:10 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.