-
Type: Improvement
-
Resolution: Done
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.4.4
-
Component/s: Concurrency, MapReduce
-
None
-
Query Execution
Inserting 10m docs with a random value between 0 and 1m:
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });} > db.uniques.findOne() { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 } > db.uniques.ensureIndex({dim0: 1})
Do regular MR to calculate unique counts takes 192s:
> db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} }) { "result" : "mrout", "timeMillis" : 192589, "counts" : { "input" : 10000000, "emit" : 10000000, "reduce" : 1000372, "output" : 999961 }, "ok" : 1 }
Now let's run 4 MR jobs on 1/4 of the collection, it does over 200% CPU but the time is the same at 190s:
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 }) > var keys = res.splitKeys > keys.length 39 > var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > var numThreads = 4 > var inc = Math.floor(keys.length / numThreads) + 1 > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } { "result" : "mrout0", "timeMillis" : 205790, "counts" : { "input" : 2750002, "emit" : 2750002, "reduce" : 274828, "output" : 274723 }, "ok" : 1 } ...
If using multiple db output, it's quite better, at 100s:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } { "result" : { "db" : "mrdb0", "collection" : "mrout0" }, "timeMillis" : 108790, "counts" : { "input" : 2750002, "emit" : 2750002, "reduce" : 274828, "output" : 274723 }, "ok" : 1 } ...
It seems that MR jobs do not run well in parallel on the same db.