-
Type:
Bug
-
Resolution: Won't Fix
-
Priority:
Major - P3
-
None
-
Affects Version/s: None
-
Component/s: API, Performance
-
None
-
(copied to CRM)
-
None
-
None
-
None
-
None
-
None
-
None
Hi,
while I've skimmed through the partitioner package, especially MongoShardedPartitioner I ran into something weird.
What it I think the MongoShardedPartitioner does is:
- Query config.chunks.find({ns: "dbName.collectionName"}) while projecting to include only the fields min, max, shard in the output
- generates a MongoPartition containing the index and a boundary query for each chunk.
When accessing the MongoRDD, the boundary query will be attached to all queries that get sent to MongoDB. Taken from a Mongod logfile:
2016-11-17T11:22:33.347+0100 I COMMAND [conn31] getmore fleetdata.data query: { aggregate: "data", pipeline: [ { $match: { $and: [ { sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42920592668 ntoreturn:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:21431 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 42868 } }, Database: { acquireCount: { r: 21434 } }, Collection: { acquireCount: { r: 21434 } } } 42765ms
However, this operation is very slow.
I get 0 as the result when running db.data.find({ sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }).count() – I'm not quite sure if that's supposed to happen. I also ran the same query on the boundaries of a different chunk, but the outcome were also 0: db.data.find({ sh_key: { $gte: NumberLong("-8016443680476223100"), $lt: NumberLong("-7983401451479346525") } }).count()
When I explain() that query, this output is given:
{
"queryPlanner" : {
"mongosPlannerVersion" : 1,
"winningPlan" : {
"stage" : "SHARD_MERGE",
"shards" : [
{
"shardName" : "rs0",
"connectionString" : "rs0/hadoopb19:27018,hadoopb24:27018",
"serverInfo" : {
"host" : "Hadoopb24",
"port" : 27018,
"version" : "3.2.9",
"gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
},
"plannerVersion" : 1,
"namespace" : "fleetdata.data",
"indexFilterSet" : false,
"parsedQuery" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"winningPlan" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "COLLSCAN",
"filter" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"direction" : "forward"
}
},
"rejectedPlans" : [ ]
},
{
"shardName" : "rs1",
"connectionString" : "rs1/hadoopb28:27018,hadoopb30:27018",
"serverInfo" : {
"host" : "hadoopb28",
"port" : 27018,
"version" : "3.2.9",
"gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
},
"plannerVersion" : 1,
"namespace" : "fleetdata.data",
"indexFilterSet" : false,
"parsedQuery" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"winningPlan" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "COLLSCAN",
"filter" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"direction" : "forward"
}
},
"rejectedPlans" : [ ]
},
{
"shardName" : "rs2",
"connectionString" : "rs2/Hadoopb32:27018,hadoopb36:27018",
"serverInfo" : {
"host" : "Hadoopb36",
"port" : 27018,
"version" : "3.2.9",
"gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
},
"plannerVersion" : 1,
"namespace" : "fleetdata.data",
"indexFilterSet" : false,
"parsedQuery" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"winningPlan" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "COLLSCAN",
"filter" : {
"$and" : [
{
"sh_key" : {
"$lt" : -9218136772714080000
}
},
{
"sh_key" : {
"$gte" : { "$minKey" : 1 }
}
}
]
},
"direction" : "forward"
}
},
"rejectedPlans" : [ ]
}
]
}
},
"ok" : 1
}
So apparently there is a COLLSCAN being done even though there is an index created on the sh_key:
db.data.getIndexes()
[
{
"v" : 1,
"key" : {
"_id" : 1
},
"name" : "_id_",
"ns" : "fleetdata.data"
},
{
"v" : 1,
"key" : {
"sh_key" : "hashed"
},
"name" : "sh_key_hashed",
"ns" : "fleetdata.data"
},
{
"v" : 1,
"key" : {
"ts" : 1
},
"name" : "ts_1",
"ns" : "fleetdata.data"
},
{
"v" : 1,
"key" : {
"location" : "2dsphere"
},
"name" : "location_2dsphere",
"ns" : "fleetdata.data",
"2dsphereIndexVersion" : 3
},
{
"v" : 1,
"key" : {
"signals.signal" : 1
},
"name" : "signals.signal_1",
"ns" : "fleetdata.data"
}
]
Output of sh.status():
{ "_id" : "fleetdata", "primary" : "rs0", "partitioned" : true }
fleetdata.data
shard key: { "sh_key" : "hashed" }
unique: false
balancing: true
chunks:
rs0 389
rs1 388
rs2 392
too many chunks to print, use verbose if you want to force print
When I remove the .set("spark.mongodb.input.partitionerOptions.shardkey", "sh_key") line of my SparkConf, I see outputs like this in the log file.
2016-11-17T13:59:28.282+0100 I COMMAND [conn13] command fleetdata.data command: aggregate { aggregate: "data", pipeline: [ { $match: { $and: [ { _id: { $gte: ObjectId('58073d6fe5a82e03c315f340'), $lt: ObjectId('58073de7e5a82e03c3195bf4') } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42428915938 keyUpdates:0 writeConflicts:0 numYields:435 reslen:180 locks:{ Global: { acquireCount: { r: 874 } }, Database: { acquireCount: { r: 437 } }, Collection: { acquireCount: { r: 437 } } } protocol:op_query 141ms
This slows down my entire aggregation pipeline by a lot. Seems like when I use the default "shardKey" (which resolves to _id), the execution times are fine. However, when setting the configuration property to the shardKey of my collection ("sh_key"), the execution times explode.
- depends on
-
SERVER-14400 Using $min and $max on shard key doesn't target queries
-
- Backlog
-
- related to
-
SERVER-28667 Provide a way for the Aggregation framework to query against intervals of a hashed index
-
- Backlog
-