[SERVER-10118] Parallel MR jobs with output going to the same DB does not increase performance Created: 06/Jul/13  Updated: 06/Dec/22  Resolved: 04/Feb/22

Status: Closed
Project: Core Server
Component/s: Concurrency, MapReduce
Affects Version/s: 2.4.4
Fix Version/s: None

Type: Improvement Priority: Major - P3
Reporter: Antoine Girbal Assignee: Backlog - Query Execution
Resolution: Done Votes: 4
Labels: None
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Related
Assigned Teams:
Query Execution
Participants:

 Description   

Inserting 10m docs with a random value between 0 and 1m:

> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
> db.uniques.findOne()
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
> db.uniques.ensureIndex({dim0: 1})

Do regular MR to calculate unique counts takes 192s:

> db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} })
{
        "result" : "mrout",
        "timeMillis" : 192589,
        "counts" : {
                "input" : 10000000,
                "emit" : 10000000,
                "reduce" : 1000372,
                "output" : 999961
        },
        "ok" : 1
}

Now let's run 4 MR jobs on 1/4 of the collection, it does over 200% CPU but the time is the same at 190s:

> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{ 
        "result" : "mrout0",
        "timeMillis" : 205790,
        "counts" : {
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        },
        "ok" : 1
}
...

If using multiple db output, it's quite better, at 100s:

> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{ 
        "result" : {
                "db" : "mrdb0",
                "collection" : "mrout0"
        },
        "timeMillis" : 108790,
        "counts" : {
                "input" : 2750002,
                "emit" : 2750002,
                "reduce" : 274828,
                "output" : 274723
        },
        "ok" : 1
}
...

It seems that MR jobs do not run well in parallel on the same db.



 Comments   
Comment by Esha Bhargava [ 04/Feb/22 ]

Closing these tickets as part of the deprecation of mapReduce.

Comment by Antoine Girbal [ 02/May/14 ]

One quick test would be to change the yielding interval and see if it improves concurrency.
Maybe yield every 100 is too large when JS is being called for each?
Did we do that test?

This is not just a matter of making parallel MRs running smoothly, but also it probably means that regular reads / writes are affected by a running MR.
Further it's doesnt really help that V8 can run jobs in parallel, if each job then becomes slower and the overall time is the same as running them in a single thread.
Few folks think about using multiple output dbs, though we could make mention of it in our doc if we're not planning on improving.

Comment by Davide Italiano [ 12/Nov/13 ]

I'm able to reproduce the problem reported by Antoine on recent trunk (pulled today).
I did some analysis with perf:
tl;dr : perf stat --sync show 10x CPU context switches and CPU migration more with respect to the multiple database case. For what concerns the single database I collected via perf record the main sources of context switching. It looks like sys_futex() is responsible of about 1/5 of the context switches. This is in part justified by the fact that running on single database introduces a lot more lock contention, but the informations gathered are too much coarse to guess more. I think the next step is that of instrumenting lock acquisition code path in order to collect contention statistics, and get a more precise picture about what's going on there.

## perf stat --sync

Single Database:

 Performance counter stats for './mongod':
 
     183378.738037 task-clock                #    1.706 CPUs utilized
         3,024,284 context-switches          #    0.016 M/sec
               314 cpu-migrations            #    0.002 K/sec
         2,387,381 page-faults               #    0.013 M/sec
   557,531,447,280 cycles                    #    3.040 GHz                     [83.39%]
   337,188,411,467 stalled-cycles-frontend   #   60.48% frontend cycles idle    [83.33%]
   230,256,262,117 stalled-cycles-backend    #   41.30% backend  cycles idle    [66.60%]
   496,692,434,197 instructions              #    0.89  insns per cycle
                                             #    0.68  stalled cycles per insn [83.27%]
   107,253,631,166 branches                  #  584.875 M/sec                   [83.36%]
     1,542,258,987 branch-misses             #    1.44% of all branches         [83.32%]
 
     107.469412762 seconds time elapsed

Multiple Databases:

 Performance counter stats for './mongod':
 
     151019.341559 task-clock                #    2.416 CPUs utilized
           375,927 context-switches          #    0.002 M/sec
                95 cpu-migrations            #    0.001 K/sec
         2,325,120 page-faults               #    0.015 M/sec
   521,053,938,356 cycles                    #    3.450 GHz                     [83.35%]
   311,290,334,790 stalled-cycles-frontend   #   59.74% frontend cycles idle    [83.34%]
   200,719,811,908 stalled-cycles-backend    #   38.52% backend  cycles idle    [66.68%]
   477,622,933,457 instructions              #    0.92  insns per cycle
                                             #    0.65  stalled cycles per insn [83.34%]
   103,774,421,613 branches                  #  687.160 M/sec                   [83.32%]
     1,389,569,505 branch-misses             #    1.34% of all branches         [83.32%]
 
      62.497470661 seconds time elapsed

Context switch analysis (single database):

[root@localhost mongo]# perf record -e sched:sched_stat_sleep -e sched:sched_switch  -e sched:sched_process_exit -g -o perf.data.raw ./mongod
[root@localhost mongo]# perf inject -v -s -i perf.data.raw -o perf.data
[root@localhost mongo]# perf report --stdio --show-total-period -i perf.data

# Overhead        Period  Command      Shared Object          Symbol
# ........  ............  .......  .................  ..............
#
   100.00%   501376467995   mongod  [kernel.kallsyms]  [k] __schedule
             |
             --- __schedule
                 schedule
                |
                |--60.16%-- do_nanosleep
                |          hrtimer_nanosleep
                |          sys_nanosleep
                |          system_call_fastpath
                |--20.18%-- futex_wait_queue_me
                |          futex_wait
                |          do_futex
                |          sys_futex
                |          system_call_fastpath
                |--19.65%-- schedule_hrtimeout_range_clock
                |          schedule_hrtimeout_range
                |          poll_schedule_timeout
                |          do_select
                |          core_sys_select
                |          sys_select
                |          system_call_fastpath

Comment by Daniel Pasette (Inactive) [ 30/Jul/13 ]

Oops, missed the third test (and point of the ticket) completely on my laptop screen!

Comment by Antoine Girbal [ 30/Jul/13 ]

right as stated in the description, using separate dbs makes job much faster.
But I dont think the write lock was anywhere close to 100% when doing it on a single db (to verify).
It could be instead the fact that each MR job yields only sometimes and requires quite a bit of writing, which makes them step on each other's.

Comment by Daniel Pasette (Inactive) [ 30/Jul/13 ]

antoine, i think the performance can be attributed to writing the result sets back to the same database. I tried using latest master branch, which includes this further performance improvement, SERVER-9907, and modified your example to use different output databases:

db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} })

I found the overall job was run in roughly 1/3 the time of running the unpartitioned job.

Generated at Thu Feb 08 03:22:19 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.