Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-98

MongoShardedPartitioner and hashed shard keys not working correctly

    • Type: Icon: Bug Bug
    • Resolution: Won't Fix
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: API, Performance
    • None


      while I've skimmed through the partitioner package, especially MongoShardedPartitioner I ran into something weird.
      What it I think the MongoShardedPartitioner does is:

      1. Query config.chunks.find({ns: "dbName.collectionName"}) while projecting to include only the fields min, max, shard in the output
      2. generates a MongoPartition containing the index and a boundary query for each chunk.

      When accessing the MongoRDD, the boundary query will be attached to all queries that get sent to MongoDB. Taken from a Mongod logfile:

      2016-11-17T11:22:33.347+0100 I COMMAND  [conn31] getmore fleetdata.data query: { aggregate: "data", pipeline: [ { $match: { $and: [ { sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42920592668 ntoreturn:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:21431 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 42868 } }, Database: { acquireCount: { r: 21434 } }, Collection: { acquireCount: { r: 21434 } } } 42765ms

      However, this operation is very slow.
      I get 0 as the result when running db.data.find({ sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }).count() – I'm not quite sure if that's supposed to happen. I also ran the same query on the boundaries of a different chunk, but the outcome were also 0: db.data.find({ sh_key: { $gte: NumberLong("-8016443680476223100"), $lt: NumberLong("-7983401451479346525") } }).count()

      When I explain() that query, this output is given:

              "queryPlanner" : {
                      "mongosPlannerVersion" : 1,
                      "winningPlan" : {
                              "stage" : "SHARD_MERGE",
                              "shards" : [
                                              "shardName" : "rs0",
                                              "connectionString" : "rs0/hadoopb19:27018,hadoopb24:27018",
                                              "serverInfo" : {
                                                      "host" : "Hadoopb24",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                              "direction" : "forward"
                                              "rejectedPlans" : [ ]
                                              "shardName" : "rs1",
                                              "connectionString" : "rs1/hadoopb28:27018,hadoopb30:27018",
                                              "serverInfo" : {
                                                      "host" : "hadoopb28",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                              "direction" : "forward"
                                              "rejectedPlans" : [ ]
                                              "shardName" : "rs2",
                                              "connectionString" : "rs2/Hadoopb32:27018,hadoopb36:27018",
                                              "serverInfo" : {
                                                      "host" : "Hadoopb36",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                              "direction" : "forward"
                                              "rejectedPlans" : [ ]
              "ok" : 1

      So apparently there is a COLLSCAN being done even though there is an index created on the sh_key:

                      "v" : 1,
                      "key" : {
                              "_id" : 1
                      "name" : "_id_",
                      "ns" : "fleetdata.data"
                      "v" : 1,
                      "key" : {
                              "sh_key" : "hashed"
                      "name" : "sh_key_hashed",
                      "ns" : "fleetdata.data"
                      "v" : 1,
                      "key" : {
                              "ts" : 1
                      "name" : "ts_1",
                      "ns" : "fleetdata.data"
                      "v" : 1,
                      "key" : {
                              "location" : "2dsphere"
                      "name" : "location_2dsphere",
                      "ns" : "fleetdata.data",
                      "2dsphereIndexVersion" : 3
                      "v" : 1,
                      "key" : {
                              "signals.signal" : 1
                      "name" : "signals.signal_1",
                      "ns" : "fleetdata.data"

      Output of sh.status():

              {  "_id" : "fleetdata",  "primary" : "rs0",  "partitioned" : true }
                              shard key: { "sh_key" : "hashed" }
                              unique: false
                              balancing: true
                                      rs0     389
                                      rs1     388
                                      rs2     392
                              too many chunks to print, use verbose if you want to force print

      When I remove the .set("spark.mongodb.input.partitionerOptions.shardkey", "sh_key") line of my SparkConf, I see outputs like this in the log file.

      2016-11-17T13:59:28.282+0100 I COMMAND  [conn13] command fleetdata.data command: aggregate { aggregate: "data", pipeline: [ { $match: { $and: [ { _id: { $gte: ObjectId('58073d6fe5a82e03c315f340'), $lt: ObjectId('58073de7e5a82e03c3195bf4') } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42428915938 keyUpdates:0 writeConflicts:0 numYields:435 reslen:180 locks:{ Global: { acquireCount: { r: 874 } }, Database: { acquireCount: { r: 437 } }, Collection: { acquireCount: { r: 437 } } } protocol:op_query 141ms

      This slows down my entire aggregation pipeline by a lot. Seems like when I use the default "shardKey" (which resolves to _id), the execution times are fine. However, when setting the configuration property to the shardKey of my collection ("sh_key"), the execution times explode.

            ross@mongodb.com Ross Lawley
            j9dy F H
            2 Vote for this issue
            8 Start watching this issue
