-
Type: Bug
-
Resolution: Won't Fix
-
Priority: Major - P3
-
None
-
Affects Version/s: None
-
Component/s: API, Performance
-
None
-
(copied to CRM)
Hi,
while I've skimmed through the partitioner package, especially MongoShardedPartitioner I ran into something weird.
What it I think the MongoShardedPartitioner does is:
- Query config.chunks.find({ns: "dbName.collectionName"}) while projecting to include only the fields min, max, shard in the output
- generates a MongoPartition containing the index and a boundary query for each chunk.
When accessing the MongoRDD, the boundary query will be attached to all queries that get sent to MongoDB. Taken from a Mongod logfile:
2016-11-17T11:22:33.347+0100 I COMMAND [conn31] getmore fleetdata.data query: { aggregate: "data", pipeline: [ { $match: { $and: [ { sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42920592668 ntoreturn:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:21431 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 42868 } }, Database: { acquireCount: { r: 21434 } }, Collection: { acquireCount: { r: 21434 } } } 42765ms
However, this operation is very slow.
I get 0 as the result when running db.data.find({ sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }).count() – I'm not quite sure if that's supposed to happen. I also ran the same query on the boundaries of a different chunk, but the outcome were also 0: db.data.find({ sh_key: { $gte: NumberLong("-8016443680476223100"), $lt: NumberLong("-7983401451479346525") } }).count()
When I explain() that query, this output is given:
{ "queryPlanner" : { "mongosPlannerVersion" : 1, "winningPlan" : { "stage" : "SHARD_MERGE", "shards" : [ { "shardName" : "rs0", "connectionString" : "rs0/hadoopb19:27018,hadoopb24:27018", "serverInfo" : { "host" : "Hadoopb24", "port" : 27018, "version" : "3.2.9", "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c" }, "plannerVersion" : 1, "namespace" : "fleetdata.data", "indexFilterSet" : false, "parsedQuery" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "winningPlan" : { "stage" : "SHARDING_FILTER", "inputStage" : { "stage" : "COLLSCAN", "filter" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "direction" : "forward" } }, "rejectedPlans" : [ ] }, { "shardName" : "rs1", "connectionString" : "rs1/hadoopb28:27018,hadoopb30:27018", "serverInfo" : { "host" : "hadoopb28", "port" : 27018, "version" : "3.2.9", "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c" }, "plannerVersion" : 1, "namespace" : "fleetdata.data", "indexFilterSet" : false, "parsedQuery" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "winningPlan" : { "stage" : "SHARDING_FILTER", "inputStage" : { "stage" : "COLLSCAN", "filter" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "direction" : "forward" } }, "rejectedPlans" : [ ] }, { "shardName" : "rs2", "connectionString" : "rs2/Hadoopb32:27018,hadoopb36:27018", "serverInfo" : { "host" : "Hadoopb36", "port" : 27018, "version" : "3.2.9", "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c" }, "plannerVersion" : 1, "namespace" : "fleetdata.data", "indexFilterSet" : false, "parsedQuery" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "winningPlan" : { "stage" : "SHARDING_FILTER", "inputStage" : { "stage" : "COLLSCAN", "filter" : { "$and" : [ { "sh_key" : { "$lt" : -9218136772714080000 } }, { "sh_key" : { "$gte" : { "$minKey" : 1 } } } ] }, "direction" : "forward" } }, "rejectedPlans" : [ ] } ] } }, "ok" : 1 }
So apparently there is a COLLSCAN being done even though there is an index created on the sh_key:
db.data.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "fleetdata.data" }, { "v" : 1, "key" : { "sh_key" : "hashed" }, "name" : "sh_key_hashed", "ns" : "fleetdata.data" }, { "v" : 1, "key" : { "ts" : 1 }, "name" : "ts_1", "ns" : "fleetdata.data" }, { "v" : 1, "key" : { "location" : "2dsphere" }, "name" : "location_2dsphere", "ns" : "fleetdata.data", "2dsphereIndexVersion" : 3 }, { "v" : 1, "key" : { "signals.signal" : 1 }, "name" : "signals.signal_1", "ns" : "fleetdata.data" } ]
Output of sh.status():
{ "_id" : "fleetdata", "primary" : "rs0", "partitioned" : true } fleetdata.data shard key: { "sh_key" : "hashed" } unique: false balancing: true chunks: rs0 389 rs1 388 rs2 392 too many chunks to print, use verbose if you want to force print
When I remove the .set("spark.mongodb.input.partitionerOptions.shardkey", "sh_key") line of my SparkConf, I see outputs like this in the log file.
2016-11-17T13:59:28.282+0100 I COMMAND [conn13] command fleetdata.data command: aggregate { aggregate: "data", pipeline: [ { $match: { $and: [ { _id: { $gte: ObjectId('58073d6fe5a82e03c315f340'), $lt: ObjectId('58073de7e5a82e03c3195bf4') } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42428915938 keyUpdates:0 writeConflicts:0 numYields:435 reslen:180 locks:{ Global: { acquireCount: { r: 874 } }, Database: { acquireCount: { r: 437 } }, Collection: { acquireCount: { r: 437 } } } protocol:op_query 141ms
This slows down my entire aggregation pipeline by a lot. Seems like when I use the default "shardKey" (which resolves to _id), the execution times are fine. However, when setting the configuration property to the shardKey of my collection ("sh_key"), the execution times explode.
- depends on
-
SERVER-14400 Using $min and $max on shard key doesn't target queries
- Backlog
- related to
-
SERVER-28667 Provide a way for the Aggregation framework to query against intervals of a hashed index
- Backlog