db = db.getSiblingDB("testX"); show = function(c,emit=true) { var nn = 0; c.forEach(function(r) { nn++; if(emit == true) { printjson(r); } }); print("found " + nn); }; function loadEm(p1cnt, p2cnt) { var type = ["A", "A", "B", "B", "C", "D"]; // 1/3 A and B, 1/6 C and D for(i = 0; i < p1cnt; i++) { var tt = type[i % type.length]; db.foo.insert({portfolio: "P1", sec: ("S"+i), type: tt, amt: i*10}); } var type = ["A", "B", "C", "C", "D"]; // 20% A,B,D, 40% C for(i = 0; i < p2cnt; i++) { var tt = type[i % type.length]; db.foo.insert({portfolio: "P2", sec: ("S"+i), type: tt, amt: i*10}); } } function showEm() { c=db.foo.aggregate([ {$group: {_id: {p:"$portfolio",t:"$type"}, n: {$sum:1}, totAmt: {$sum: "$amt"}}} ,{$sort: {"_id.p":1,"_id.t":1}} ]); show(c,true); c=db.foo.aggregate([ {$addFields: {qq: {$switch: { branches: [ { case: { $in: [ "$type", ["A","B"]] }, then: "$amt" }, { case: { $in: [ "$type", ["C","D"]] }, then: {$multiply: ["$amt", -1]} } ] ,default: {$sum:0} }} }} ,{$group: {_id: "$portfolio", n:{$sum:1}, totAmt: {$sum: "$qq"} }} ]); show(c,true); } var mapFunction1 = function() { emit(this['portfolio'], this); }; var reduceFunction1 = function(portId, data) { var x = {__reducer: 1, totAmt:0, n:0, nAB:0, nCD:0, all:[]}; var start = 0; for(var n = start; n < data.length; n++) { var item = data[n]; var itype = item['type']; /* Why this check? The reducer can be called multiple times. If it is called more than once, the first item in the new inbound data[] array is the result of the previous reduce() -- a "subtotal" if you will. So how can we tell subtotal from fresh input? Fresh input from the emit() does NOT contain the marker field "__reducer". But as you can see, our accumulator object 'x' here DOES have that marker. So if the engine turns around and passes this BACK IN, we KNOW it must have come from us! Therefore, we must treat it as a subtotal. (I think by definition it MUST be data[0] but whatevs...) */ if(item.hasOwnProperty('__reducer')){ /* Sadly, you must "manually" deep copy. If you do this: x = item then when the calling context (the MR engine that called this reduceFunction1) cleans up the data[] array, x is lost. You have to copy the contents into the on-stack object (in this case 'x') and go with that. */ x['totAmt'] = item['totAmt']; x['n'] = item['n']; x['nAB'] = item['nAB']; x['nCD'] = item['nCD']; } else { if(itype == "A" || itype == 'B') { x['totAmt'] += item['amt']; x['nAB'] += 1; } else if(itype == "C" || itype == 'D') { x['totAmt'] -= item['amt']; x['nCD'] += 1; } x['all'].push(item['type']); x['n'] += 1; } } x['n_all'] = x['all'].length; return x; }; function mapEm() { var start = (new ISODate()).getTime(); c = db.foo.mapReduce( mapFunction1, reduceFunction1, { out: {inline:1} } ); var end = (new ISODate()).getTime(); print((end - start) + " millis to setup agg"); printjson(c); var end = (new ISODate()).getTime(); print((end - start) + " millis for agg + walking cursor to fetch results"); //c = db.mr_test.find({},{"value.all":0}); //show(c,true); } /* S T A R T O F P R O G R A M */ db.foo.drop(); var p1cnt = 100; var p2cnt = 50; print("TEST with " + p1cnt + " P1 and " + p2cnt + " P2"); loadEm(p1cnt, p2cnt); print("total items: " + db.foo.count()); showEm(); print(""); print("If you look at (A+B)-(C+D), you will see the numbers above tally correctly"); print("Now let's do the same thing with MR"); mapEm(); print("OK. It all ties out and aggregate = mapreduce"); print("");print(""); db.foo.drop(); var p1cnt = 342; var p2cnt = 50; print("NOW let's try with " + p1cnt + " P1 and " + p2cnt + " P2"); loadEm(p1cnt, p2cnt); print("total items: " + db.foo.count()); showEm(); print(""); print("If you look at (A+B)-(C+D), you will see the numbers above tally correctly"); print("Now let's do the same thing with MR"); mapEm(); print("?!? This is not right. Look at the P1 results. It only got n = 2?");