-
Type:
Bug
-
Resolution: Fixed
-
Priority:
Major - P3
-
Affects Version/s: 2.2.1
-
Component/s: Partitioners
-
None
-
None
-
None
-
None
-
None
-
None
-
None
Conditions (Spark conf):
sparkConf.set("spark.mongodb.input.uri", inDBUrl + ".session"); // partitioner config sparkConf.set("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner"); sparkConf.set("spark.mongodb.input.partitionerOptions.partitionKey", "_id"); sparkConf.set("spark.mongodb.input.partitionerOptions.numberOfPartitions",String.valueOf(partitionCnt)); // I tried 1 and 10 value for numberOfPartitions
Using the Connector, I'm getting:
- data for a wide time period (for example, the whole day),
- looping on previous whole data for getting subset for short time period (for example, for every 5 minutes of the day)
If the whole day period contains appropriate data (filtered by mongo aggregation pipeline), but the certain 5 minutes period doesn't contain, I'm getting error when calling any JavaRDD<Document> action, for example rdd.count():
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
at com.mongodb.spark.rdd.partitioner.PartitionerHelper$.setLastBoundaryToLessThanOrEqualTo(PartitionerHelper.scala:127)
at com.mongodb.spark.rdd.partitioner.MongoPaginateByCountPartitioner.partitions(MongoPaginateByCountPartitioner.scala:85)
at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66)
at org.apache.spark.Partitioner$$anonfun$defaultPartitioner$2.apply(Partitioner.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:66)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:644)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:643)
at org.apache.spark.api.java.JavaPairRDD.groupByKey(JavaPairRDD.scala:559)
at MyCode: rdd.count() // any JavaRDD<Document> action cause this error
Example, how to use use method getDataReadyToAgg (see code stub below) for the error reproducing:
- getting whole day data. Input:
JavaMongoRDD<Document> preaggData = MongoSpark.load(sparkContext); periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-12T00:00:00+03
If I call resultRdd.collect(), I see few records.
- use the same method for certain time. Input:
JavaMongoRDD<Document> preaggData = result of previous call (the whole day data) periodStart=2018-03-11T00:00:00+03, periodEnd=2018-03-11T00:05:00+03}}.
If data don't exist for this certain period, I'm getting NoSuchElementException
getDataReadyToAgg.java
/**
* gets from pre-aggregated only rows, that fall into periodStart-periodEnd intersection
*
* @param preaggData pre-aggregated data (by session)
* @param attemptItem attempt info
*/
private JavaMongoRDD<Document> getDataReadyToAgg(JavaMongoRDD<Document> preaggData,
OttAggAttemptItem attemptItem) {
long periodStart = attemptItem.getPeriodStart()
.getTime();
long periodEnd = attemptItem.getPeriodEnd()
.getTime();
List<Document> pipeline = new ArrayList<>();
// where by date
String matchDate = "{$match:{$or:[" +
"{start : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}," +
"{end : {$gte:{$date:" + periodStart + "}, $lte:{$date:" + periodEnd + "}}}";
if (period != PERIOD_MONTH) {
matchDate = matchDate + ",\n" + "{start : {$lt:{$date:" + periodStart + "}}, end: {$gt:{$date:" + periodEnd + "}}}";
}
matchDate = matchDate + "]}}";
pipeline.add(Document.parse(matchDate));
log.debug("pipeline: {}", pipeline);
return preaggData.withPipeline(pipeline);
}
- related to
-
SPARK-157 Exception when iterating over collection with single record
-
- Closed
-