Details
-
Improvement
-
Status: Closed
-
Major - P3
-
Resolution: Gone away
-
2.4.4
-
None
-
None
-
Query Execution
Description
Inserting 10m docs with a random value between 0 and 1m:
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
|
> db.uniques.findOne()
|
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
|
> db.uniques.ensureIndex({dim0: 1})
|
Do regular MR to calculate unique counts takes 192s:
> db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} })
|
{
|
"result" : "mrout",
|
"timeMillis" : 192589,
|
"counts" : {
|
"input" : 10000000,
|
"emit" : 10000000,
|
"reduce" : 1000372,
|
"output" : 999961
|
},
|
"ok" : 1
|
}
|
Now let's run 4 MR jobs on 1/4 of the collection, it does over 200% CPU but the time is the same at 190s:
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
|
> var keys = res.splitKeys
|
> keys.length
|
39
|
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
|
> var numThreads = 4
|
> var inc = Math.floor(keys.length / numThreads) + 1
|
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
|
min:0 max:274736
|
min:274736 max:524997
|
min:524997 max:775025
|
min:775025 max:{ "$maxKey" : 1 }
|
connecting to: test
|
connecting to: test
|
connecting to: test
|
connecting to: test
|
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
|
{
|
"result" : "mrout0",
|
"timeMillis" : 205790,
|
"counts" : {
|
"input" : 2750002,
|
"emit" : 2750002,
|
"reduce" : 274828,
|
"output" : 274723
|
},
|
"ok" : 1
|
}
|
...
|
If using multiple db output, it's quite better, at 100s:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
|
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
|
min:0 max:274736
|
min:274736 max:524997
|
min:524997 max:775025
|
min:775025 max:{ "$maxKey" : 1 }
|
connecting to: test
|
connecting to: test
|
connecting to: test
|
connecting to: test
|
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
|
{
|
"result" : {
|
"db" : "mrdb0",
|
"collection" : "mrout0"
|
},
|
"timeMillis" : 108790,
|
"counts" : {
|
"input" : 2750002,
|
"emit" : 2750002,
|
"reduce" : 274828,
|
"output" : 274723
|
},
|
"ok" : 1
|
}
|
...
|
It seems that MR jobs do not run well in parallel on the same db.