Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-243

“state should be: open” when using mapPartitions

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Labels:
      None

      The setup
      =========
      I have a simple Spark application that uses `mapPartitions` to transform an RDD. As part of this transformation, I retrieve some necessary data from a Mongo database. The connection from the Spark worker to the Mongo database is managed using the MongoDB Connector for Spark (https://docs.mongodb.com/spark-connector/current/).

      I'm using `mapPartitions` instead of the simpler `map` because there is some relatively expensive setup that is only required once for all elements in a partition. If I were to use `map` instead, this setup would have to be repeated for every element individually.

      The problem
      ===========
      When one of the partitions in the source RDD becomes large enough, the transformation fails with the message

      IllegalStateException: state should be: open
      or, occasionally, with

      IllegalStateException: The pool is closed

      The code
      {{ ========}}
      {{ Below is the code of a simple Scala application with which I can reproduce the issue:}}

      package my.package
      
      import com.mongodb.spark.MongoConnector
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.SparkSession
      import org.bson.Document
      
      object MySparkApplication {
         def main(args: Array[String]): Unit = {
            val sparkSession: SparkSession = SparkSession.builder()
               .appName("MySparkApplication")
               .master(???) // The Spark master URL
               .config("spark.jars", ???) // The path at which the application's fat JAR is located.
               .config("spark.scheduler.mode", "FAIR")
               .config("spark.mongodb.keep_alive_ms", "86400000")
               .getOrCreate()
      
            val mongoConnector: MongoConnector = MongoConnector(Map(
               "uri" -> ??? // The MongoDB URI.
               , "spark.mongodb.keep_alive_ms" -> "86400000"
               , "keep_alive_ms" -> "86400000"
            ))
      
            val localDocumentIds: Seq[Long] = Seq.range(1L, 100L)
            val documentIdsRdd: RDD[Long] = sparkSession.sparkContext.parallelize(localDocumentIds)
      
            val result: RDD[Document] = documentIdsRdd.mapPartitions { documentIdsIterator =>
               mongoConnector.withMongoClientDo { mongoClient =>
                  val collection = mongoClient.getDatabase("databaseName").getCollection("collectionName")
                  // Some expensive query that should only be performed once for every partition.
                  collection.find(new Document("_id", 99999L)).first()
      
                  documentIdsIterator.map { documentId =>
                     // An expensive operation that does not interact with the Mongo database.
                     Thread.sleep(1000)
                     collection.find(new Document("_id", documentId)).first()
                  }
               }
            }
      
            val resultLocal = result.collect()
         }
      }
       

      The stack trace
      ===============
      Below is the stack trace returned by Spark when I run the application above:

       Driver stacktrace:
       [...]
       at my.package.MySparkApplication.main(MySparkApplication.scala:41)
       at my.package.MySparkApplication.main(MySparkApplication.scala)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
       at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
       Caused by: java.lang.IllegalStateException: state should be: open
       at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
       at com.mongodb.connection.BaseCluster.getDescription(BaseCluster.java:152)
       at com.mongodb.Mongo.getConnectedClusterDescription(Mongo.java:885)
       at com.mongodb.Mongo.createClientSession(Mongo.java:877)
       at com.mongodb.Mongo$3.getClientSession(Mongo.java:866)
       at com.mongodb.Mongo$3.execute(Mongo.java:823)
       at com.mongodb.FindIterableImpl.first(FindIterableImpl.java:193)
       at my.package.MySparkApplication$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(MySparkApplication.scala:36)
       at my.package.MySparkApplication$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(MySparkApplication.scala:33)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
       at scala.collection.AbstractIterator.to(Iterator.scala:1336)
       at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
       at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
       at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
       at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
       at org.apache.spark.scheduler.Task.run(Task.scala:108)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
      

      The research I have done
      ========================
      I have found several people asking about this issue, and it seems that in all of their cases, the problem turned out to be them using the Mongo client after it had been closed. As far as I can tell, this is not happening in my application - opening and closing the connection should be handled by the Mongo-Spark connector, and I would expect the client to only be closed after the function passed to `mongoConnector.withMongoClientDo` returns.

      I did manage to discover that the issue does not arise for the very first element in the RDD. It seems instead that a number of elements are being processed successfully, and that the failure only occurs once the process has taken a certain amount of time. This amount of time seems to be on the order of 5 to 15 seconds.

      The above leads me to believe that something is automatically closing the client once it has been active for a certain amount of time, even though it is still being used.

      As you can tell by my code, I have discovered the fact that the Mongo-Spark connector exposes a configuration `spark.mongodb.keep_alive_ms` that, according to the connector documentation, controls "The length of time to keep a MongoClient available for sharing". Its default value is 5 seconds, so this seemed like a useful thing to try. In the application above, I attempt to set it to an entire day in three different ways, with zero effect. The documentation does state that this specific property "can only be configured via a System Property". I think that this is what I'm doing (by setting the property when initialising the Spark session and/or Mongo connector), but I'm not entirely sure. It seems to be impossible to verify the setting once the Mongo connector has been initialised.

      One other StackOverflow question mentions that I should try setting the `maxConnectionIdleTime` option in the `MongoClientOptions`, but as far as I can tell it is not possible to set these options through the connector.

      As a sanity check, I tried replacing the use of `mapPartitions` with a functionally equivalent use of `map`. The issue disappeared, which is probably because the connection to the Mongo database is re-initialised for each individual element of the RDD. However, as mentioned above, this approach would have significantly worse performance because I would end up repeating expensive setup work for every element in the RDD.

      Out of curiosity I also tried replacing the call to `mapPartitions` with a call to `foreachPartition`, also replacing the call to `documentIdsIterator.map` with `documentIdsIterator.foreach`. The issue also disappeared in this case. I have no idea why this would be, but because I need to transform my RDD, this is also not an acceptable approach.

      The kind of answer I am looking for
      ===================================

      • "You actually are closing the client prematurely, and here's where: [...]"
      • "This is a known issue in the Mongo-Spark connector, and here's a link to their issue tracker: [...]"
      • "You are setting the `spark.mongodb.keep_alive_ms` property incorrectly, this is how you should do it: [...]"
      • "It is possible to verify the value of `spark.mongodb.keep_alive_ms` on your Mongo connector, and here's how: [...]"
      • "It is possible to set `MongoClientOptions` such as `maxConnectionIdleTime` through the Mongo connector, and here's how: [...]"

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ross@mongodb.com Ross Lawley
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: