Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-10118

Parallel MR jobs with output going to the same DB does not increase performance

    • Type: Icon: Improvement Improvement
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.4.4
    • Component/s: Concurrency, MapReduce
    • Labels:
      None
    • Query Execution

      Inserting 10m docs with a random value between 0 and 1m:

      > for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
      > db.uniques.findOne()
      { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
      > db.uniques.ensureIndex({dim0: 1})
      

      Do regular MR to calculate unique counts takes 192s:

      > db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} })
      {
              "result" : "mrout",
              "timeMillis" : 192589,
              "counts" : {
                      "input" : 10000000,
                      "emit" : 10000000,
                      "reduce" : 1000372,
                      "output" : 999961
              },
              "ok" : 1
      }
      

      Now let's run 4 MR jobs on 1/4 of the collection, it does over 200% CPU but the time is the same at 190s:

      > var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
      > var keys = res.splitKeys
      > keys.length
      39
      > var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
      > var numThreads = 4
      > var inc = Math.floor(keys.length / numThreads) + 1
      > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
      min:0 max:274736
      min:274736 max:524997
      min:524997 max:775025
      min:775025 max:{ "$maxKey" : 1 }
      connecting to: test
      connecting to: test
      connecting to: test
      connecting to: test
      > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
      { 
              "result" : "mrout0",
              "timeMillis" : 205790,
              "counts" : {
                      "input" : 2750002,
                      "emit" : 2750002,
                      "reduce" : 274828,
                      "output" : 274723
              },
              "ok" : 1
      }
      ...
      

      If using multiple db output, it's quite better, at 100s:

      > var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
      > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
      min:0 max:274736
      min:274736 max:524997
      min:524997 max:775025
      min:775025 max:{ "$maxKey" : 1 }
      connecting to: test
      connecting to: test
      connecting to: test
      connecting to: test
      > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
      { 
              "result" : {
                      "db" : "mrdb0",
                      "collection" : "mrout0"
              },
              "timeMillis" : 108790,
              "counts" : {
                      "input" : 2750002,
                      "emit" : 2750002,
                      "reduce" : 274828,
                      "output" : 274723
              },
              "ok" : 1
      }
      ...
      

      It seems that MR jobs do not run well in parallel on the same db.

            Assignee:
            backlog-query-execution [DO NOT USE] Backlog - Query Execution
            Reporter:
            antoine Antoine Girbal
            Votes:
            4 Vote for this issue
            Watchers:
            16 Start watching this issue

              Created:
              Updated:
              Resolved: