-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: 2.2.1
-
Component/s: Partitioners
-
None
Conditions (Spark conf):
sparkConf.set("spark.mongodb.input.uri", inDBUrl + ".session"); // partitioner config sparkConf.set("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner"); sparkConf.set("spark.mongodb.input.partitionerOptions.partitionKey", "_id"); sparkConf.set("spark.mongodb.input.partitionerOptions.numberOfPartitions",String.valueOf(partitionCnt)); // I tried 1 and 10 value for numberOfPartitions
Using the Connector, I'm getting:
- data for a wide time period (for example, the whole day),
- looping on previous whole data for getting subset for short time period (for example, for every 5 minutes of the day)
If the whole day period contains appropriate data (filtered by mongo aggregation pipeline), but the certain 5 minutes period doesn't contain, I'm getting error when calling any JavaRDD<Document> action, for example rdd.count():
java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186) at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126) at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186) at com.mongodb.spark.rdd.partitioner.PartitionerHelper$.setLastBoundaryToLessThanOrEqualTo(PartitionerHelper.scala:127) at com.mongodb.spark.rdd.partitioner.MongoPaginateByCountPartitioner.partitions(MongoPaginateByCountPartitioner.scala:85) at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66) at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:66) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:643) at org.apache.spark.api.java.JavaPairRDD.groupByKey(JavaPairRDD.scala:559) at MyCode: rdd.count() // any JavaRDD<Document> action cause this error
Example, how to use use method getDataReadyToAgg (see code stub below) for the error reproducing:
- getting whole day data. Input:
JavaMongoRDD<Document> preaggData = MongoSpark.load(sparkContext); periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-12T00:00:00+03
If I call resultRdd.collect(), I see few records.
- use the same method for certain time. Input:
JavaMongoRDD<Document> preaggData = result of previous call (the whole day data) periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-11T00:05:00+03}}.
If data don't exist for this certain period, I'm getting NoSuchElementException
getDataReadyToAgg.java
/** * gets from pre-aggregated only rows, that fall into periodStart-periodEnd intersection * * @param preaggData pre-aggregated data (by session) * @param attemptItem attempt info */ private JavaMongoRDD<Document> getDataReadyToAgg(JavaMongoRDD<Document> preaggData, OttAggAttemptItem attemptItem) { long periodStart = attemptItem.getPeriodStart() .getTime(); long periodEnd = attemptItem.getPeriodEnd() .getTime(); List<Document> pipeline = new ArrayList<>(); // where by date String matchDate = "{$match:{$or:[" + "{start : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}," + "{end : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}"; if (period != PERIOD_MONTH) { matchDate = matchDate + ",\n" + "{start : {$lt:{$date:" + periodStart + "}}, end: {$gt:{$date:" + periodEnd + "}}}"; } matchDate = matchDate + "]}}"; pipeline.add(Document.parse(matchDate)); log.debug("pipeline: {}", pipeline); return preaggData.withPipeline(pipeline); }
- related to
-
SPARK-157 Exception when iterating over collection with single record
- Closed