Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-170

MongoPaginateByCountPartitioner NoSuchElementException for any spark action

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.2.2
    • Affects Version/s: 2.2.1
    • Component/s: Partitioners
    • None

      Conditions (Spark conf):

      sparkConf.set("spark.mongodb.input.uri", inDBUrl + ".session");
      // partitioner config
      sparkConf.set("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner");
      sparkConf.set("spark.mongodb.input.partitionerOptions.partitionKey", "_id");
      sparkConf.set("spark.mongodb.input.partitionerOptions.numberOfPartitions",String.valueOf(partitionCnt)); // I tried 1 and 10 value for numberOfPartitions
      

      Using the Connector, I'm getting:

      1. data for a wide time period (for example, the whole day),
      2. looping on previous whole data for getting subset for short time period (for example, for every 5 minutes of the day)

      If the whole day period contains appropriate data (filtered by mongo aggregation pipeline), but the certain 5 minutes period doesn't contain, I'm getting error when calling any JavaRDD<Document> action, for example rdd.count():

          java.util.NoSuchElementException: next on empty iterator
      	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
      	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
      	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
      	at scala.collection.IterableLike$class.head(IterableLike.scala:107)
      	at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
      	at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
      	at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
      	at com.mongodb.spark.rdd.partitioner.PartitionerHelper$.setLastBoundaryToLessThanOrEqualTo(PartitionerHelper.scala:127)
      	at com.mongodb.spark.rdd.partitioner.MongoPaginateByCountPartitioner.partitions(MongoPaginateByCountPartitioner.scala:85)
      	at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66)
      	at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:285)
      	at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:66)
      	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644)
      	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      	at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:643)
      	at org.apache.spark.api.java.JavaPairRDD.groupByKey(JavaPairRDD.scala:559)
      	at MyCode: rdd.count() // any JavaRDD<Document> action cause this error
      

      Example, how to use use method getDataReadyToAgg (see code stub below) for the error reproducing:

      1. getting whole day data. Input:
        JavaMongoRDD<Document> preaggData = MongoSpark.load(sparkContext);
        periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-12T00:00:00+03
        

        If I call resultRdd.collect(), I see few records.

      1. use the same method for certain time. Input:
        JavaMongoRDD<Document> preaggData = result of previous call (the whole day data)
        periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-11T00:05:00+03}}.
        

        If data don't exist for this certain period, I'm getting NoSuchElementException

      getDataReadyToAgg.java
          /**
           * gets from pre-aggregated only rows, that fall into periodStart-periodEnd intersection
           *
           * @param preaggData  pre-aggregated data (by session)
           * @param attemptItem attempt info
           */
          private JavaMongoRDD<Document> getDataReadyToAgg(JavaMongoRDD<Document> preaggData,
                                                           OttAggAttemptItem attemptItem) {
      
              long periodStart = attemptItem.getPeriodStart()
                      .getTime();
              long periodEnd = attemptItem.getPeriodEnd()
                      .getTime();
      
              List<Document> pipeline = new ArrayList<>();
      
              // where by date
              String matchDate = "{$match:{$or:[" +
                      "{start : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}," +
                      "{end : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}";
      
              if (period != PERIOD_MONTH) {
                  matchDate = matchDate + ",\n" + "{start : {$lt:{$date:" + periodStart + "}}, end: {$gt:{$date:" + periodEnd + "}}}";
              }
      
              matchDate = matchDate + "]}}";
      
              pipeline.add(Document.parse(matchDate));
      
              log.debug("pipeline: {}", pipeline);
      
              return preaggData.withPipeline(pipeline);
      
          }
      

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            aryasn Alex Ryasn
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: