Spark Connector 2.2.0
Spark 2.2
MongoDB 3.4.6
The default SamplePartitioner gives unexpected behavior on an aggregation with:
- A $match clause that returns N objects from the collection, and
- N * avgObjSize > partitionSize
Steps to reproduce:
- Create a standalone MongoDB instance
- Create a test collection with data Size > the default partitionSize 64 MB
use test var tdoc = 'a'.repeat(1000) for(ii=0;ii<1024*64;ii++) { db.over64mb.insert({"a": tdoc}); } // count = 65536 // avgObjSize = 1030
- Run the following aggregation in Spark
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.bson.Document; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.ReadConfig; import com.mongodb.spark.rdd.api.java.JavaMongoRDD; import static java.util.Collections.singletonList; public final class DoAggregation { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("DoAggregation") .config("spark.mongodb.input.uri", "mongodb://localhost:24000/test.over64mb") .config("spark.mongodb.output.uri", "mongodb://localhost:24000/test.over64mb") .config("spark.mongodb.input.partitionerOptions.samplesPerPartition", "200") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); ReadConfig readConfig = ReadConfig.create(jsc); JavaMongoRDD<Document> rdd = MongoSpark.load(jsc, readConfig).withPipeline( singletonList( Document.parse("{ $match: { \"a\": {\"$ne\": \"b\"} }}"))); System.out.println("rdd.count(): " + rdd.count()); jsc.close(); } }
The $match clause here should find all 65536 documents (the behavior is the same if only a subset is matched, as long as the result set is larger than the partition size).
The unexpected behavior is that different counts will be produced with each run, such as:
rdd.count(): 64781 rdd.count(): 64689
Logging all queries in the mongod log, it seems only one partition was used and it does not cover the full result set:
2017-10-24T16:19:16.759-0700 I COMMAND [conn6] command test.over64mb command: collStats { collStats: "over64mb" } numYields:0 reslen:11363 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 1ms 2017-10-24T16:19:16.813-0700 I COMMAND [conn6] command test.over64mb command: count { count: "over64mb", query: { a: { $ne: "b" } } } planSummary: COLLSCAN keysExamined:0 docsExamined:65536 numYields:512 reslen:44 locks:{ Global: { acquireCount: { r: 1026 } }, Database: { acquireCount: { r: 513 } }, Collection: { acquireCount: { r: 513 } } } protocol:op_query 34ms 2017-10-24T16:19:16.883-0700 I COMMAND [conn6] command test.over64mb command: aggregate { aggregate: "over64mb", pipeline: [ { $match: { a: { $ne: "b" } } }, { $sample: { size: 201 } }, { $project: { _id: 1 } }, { $sort: { _id: 1 } } ], cursor: {}, allowDiskUse: true } planSummary: COLLSCAN cursorid:20327439006 keysExamined:0 docsExamined:65536 hasSortStage:1 numYields:513 nreturned:101 reslen:2718 locks:{ Global: { acquireCount: { r: 1038 } }, Database: { acquireCount: { r: 519 } }, Collection: { acquireCount: { r: 518 } } } protocol:op_query 59ms 2017-10-24T16:19:16.892-0700 I COMMAND [conn6] command test.over64mb command: getMore { getMore: 20327439006, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { a: { $ne: "b" } } }, { $sample: { size: 201 } }, { $project: { _id: 1 } }, { $sort: { _id: 1 } } ], cursor: {}, allowDiskUse: true } planSummary: COLLSCAN cursorid:20327439006 keysExamined:0 docsExamined:0 hasSortStage:1 cursorExhausted:1 numYields:0 nreturned:100 reslen:2690 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 1ms 2017-10-24T16:19:17.217-0700 I COMMAND [conn6] command test.over64mb command: aggregate { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:3450 docsExamined:3450 numYields:26 nreturned:101 reslen:104526 locks:{ Global: { acquireCount: { r: 60 } }, Database: { acquireCount: { r: 30 } }, Collection: { acquireCount: { r: 29 } } } protocol:op_query 6ms 2017-10-24T16:19:17.256-0700 I COMMAND [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:13800 docsExamined:13800 numYields:108 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 228 } }, Database: { acquireCount: { r: 114 } }, Collection: { acquireCount: { r: 114 } } } protocol:op_query 35ms 2017-10-24T16:19:17.348-0700 I COMMAND [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:17250 docsExamined:17250 numYields:136 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 286 } }, Database: { acquireCount: { r: 143 } }, Collection: { acquireCount: { r: 143 } } } protocol:op_query 39ms 2017-10-24T16:19:17.553-0700 I COMMAND [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:17250 docsExamined:17250 numYields:135 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 284 } }, Database: { acquireCount: { r: 142 } }, Collection: { acquireCount: { r: 142 } } } protocol:op_query 39ms 2017-10-24T16:19:17.615-0700 I COMMAND [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:12939 docsExamined:12939 cursorExhausted:1 numYields:101 nreturned:16021 reslen:16602767 locks:{ Global: { acquireCount: { r: 214 } }, Database: { acquireCount: { r: 107 } }, Collection: { acquireCount: { r: 107 } } } protocol:op_query 33ms
Also, the Sample Partitioner starts by doing a full count of the $match query, which seems unexpected in my view and in some cases may be expensive.
In my tests if lower sample sizes, such as the default of 10, are used on the example code above, the Spark run would abort with:
Exception in thread "main" java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186) at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126) at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186) at com.mongodb.spark.rdd.partitioner.PartitionerHelper$.setLastBoundaryToLessThanOrEqualTo(PartitionerHelper.scala:127) at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner.partitions(MongoSamplePartitioner.scala:111) at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:34) at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at DoAggregation.main(DoAggregation.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
The behavior is the same with 2 or 4 workers.