[SERVER-14324] MapReduce does not respect existing shard key on output:sharded Created: 20/Jun/14  Updated: 06/Dec/22  Resolved: 12/Nov/19

Status: Closed
Project: Core Server
Component/s: MapReduce, Sharding
Affects Version/s: None
Fix Version/s: None

Type: Bug Priority: Major - P3
Reporter: Scott Hernandez (Inactive) Assignee: Backlog - Query Team (Inactive)
Resolution: Won't Fix Votes: 6
Labels: MongosFewerPipelines, mapreduce, open_todo_in_code, qopt-team
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Duplicate
duplicates SERVER-16605 Mapreduce into sharded collection wit... Closed
is duplicated by SERVER-22060 Sharded mapReduce output option {repl... Closed
is duplicated by SERVER-28321 In mapReduce map function emitting a ... Closed
Related
related to SERVER-12261 Map Reduce with sharded output collec... Closed
is related to SERVER-12581 mapReduce sharded output replace does... Closed
Assigned Teams:
Query
Operating System: ALL
Participants:
Case:

 Description   

If you output into an existing sharded collection then the shard key is not changed but data is still written to a shard – if the shard key is anything but {_id: 1} this is a problem. This results in all of the data being "unowned" and filtered out from queries, and it looking like the sharded collection is empty.

There are probably many more nuances here, and places where things are broken, so more time should be taken to investigate all behaviors if there is an existing sharded collection, or simply disallow that option until we can define and fix all behaviors.



 Comments   
Comment by Craig Homa [ 12/Nov/19 ]

MapReduce output requires that the collection be sharded on _id.

Comment by Rui Ribeiro [ 07/Feb/19 ]

Hi @asya 

Thank you for your answer.

For me is not the question how to execute the commands but to understand how I calculate the "mid" points when I have a compounded key for the "_id" like this

"_id" :

{ "utc" : ISODate("2018-11-01T00:00:00.000+0000"), "bi" : "00cdc09c9aec631b584cca058645e48b44ebf0d4e441f50fdeer517dfa360c8afb", "lac" : "USASD", "dac" : "USAPT" }

 

The examples that you have in the documentation is only with fields that are numbers or ids.

Thank you for your help

Cheers

 

 

Comment by Asya Kamsky [ 07/Feb/19 ]

rribeiro all you need is to call the split command with approximate “mid” points (presumably you have an idea how the data will be distributed in this range?). Take a look at https://github.com/asya999/bits-n-pieces/blob/master/scripts/presplit.js I have touched it in a long time but the basic idea of how to presplit is there.

This ticket probably isn’t the best place to follow up but the MongoDB-users google group might be a better place to discuss any issues related to this.

Comment by Rui Ribeiro [ 07/Feb/19 ]

Thank you @asya  for your reply.

In the Mongo Documentation I see that we can pre split for example using tagRange, but in the case of the MapReduce my _id will be compound key 

"_id" :

{ "utc" : ISODate("2018-11-01T00:00:00.000+0000"), "bi" : "00cdc09c9aec631b584cca058645e48b44ebf0d4e441f50fdeer517dfa360c8afb", "lac" : "USASD", "dac" : "USAPT" }

I don't see how I can create this tagRanges with this _id to achieve a even and heterogeneous data distribution.

Do you have a small example (code snippet) how I can create this pre-splitting in a empty collection?

 

If I could use the shard key {_id:hashed}, this would make the data well distributed.

The options that I use to do the Map Reduce is merge and reduce and with {sharded: true} 

Cheers

Comment by Asya Kamsky [ 06/Feb/19 ]

The way to do this would be to create a collection with shard key {_id:1} and then pre-split and distribute it across the shards. I believe that should work as long as you are running mapReduce with {sharded: true} but I think it would also depend on which action you are running - if you are trying to populate a new collection that's pre-split and empty, merge mode should work.

Comment by Rui Ribeiro [ 06/Feb/19 ]

@asya  

Thank you for reply. I am glad that the plan is to keep the ability to run JavaScript. 

Regarding this issue, I would like to know if there is a workarround or other solution, that I can have map reduce with sharded output, that in the end I have the data even distributed over the different shards? I tried to create an empty collection and then shard the output collection sh.shardCollection("database.collection", { "_id": "hashed" }), and then run the map reduce to this output collection but this solution does work. How can I have the data even distributed after I run a map reduce? 

 

Comment by Asya Kamsky [ 06/Feb/19 ]

rribeiro our intent is to make aggregation fully compatible with mapReduce (including ability to run Javascript) so I believe that it will have the same flexibility once we finish our work on this, in addition, the goal is to make it possible to transition without having to re-write any of your code.

Hope that helps,
Asya

Comment by Rui Ribeiro [ 06/Feb/19 ]

Hello

@asya  

In my opinion not solving these problems, and telling people to use the aggregation framework, when the aggregation framework does not support and have the same flexibility of the map reduce is not a good idea. I believe  a lot of people are using Map Reduce, and the work to migrate to the aggregation framework is a lot of effort. For example the code that I write using JavaScript with Map Reduce comparing  if I have to do the same using the aggregation framework is a nightmare and it is not maintainable.

Why the decision to not fix problems with the Map Reduce?

 

Thank you

 

Comment by mils [ 21/Jan/18 ]

Thanks @asya that seems like a good idea, unfortunately in my case I perform some tricky regex on my documents. I guess when the aggregation framework implements regex I will be switching
Thanks

Comment by Asya Kamsky [ 18/Jan/18 ]

mlis since you are getting results back to mongos is there a reason you cannot use aggregation framework? That wouldn't have restriction of the format of the output documents you create.

Note that if you have only one chunk, in effect the collection is equivalent to being unsharded.

Comment by mils [ 17/Jan/18 ]

One last thing, I am not observing this behaviour if there is only one chunk. That is to say, when there is only one chunk, map-reduce can insert into an existing sharded collection, with shard key

{ "value.user_id" : 1 }

Thanks

Comment by mils [ 17/Jan/18 ]

Thanks Asya, I guess I see this as a bottleneck more than others. In case anyone else comes across this, this is how I solve it:

My current setup is 4x shards (i.e. 4x replica sets), and a dedicated lightweight mongos server. I have a sharded input-collection, and a sharded collection where I would like to ouptut M/R results.
My current solution is:

  1. trigger M/R from my application running on the mongos server - (in batches of 10k docs)
    1. M/R will read from the sharded collection (), and use the shard CPUs ()
  2. return results to my application on the mongos server
  3. bulk-write into the sharded collection from my application ()

In this scenario the bottleneck is the bandwidth to/from the mongos server, and RAM on the mongos server (for holding a result set of 10k documents). But I can't see a faster way to do this.

Thanks,

Comment by Asya Kamsky [ 16/Jan/18 ]

mlis that's correct, mapReduce only outputs to a sharded collection that's sharded on _id field.

From docs:

If the out field for mapReduce has the sharded value, MongoDB shards the output collection using the _id field as the shard key.

Because the result of mapReduce has a fixed schema (two fields, one _id and the other value) support for different shard key is not possible.

This ticket is to track a check for checking shard key of output collection and if it's not _id then to error out, not to make it possible to output to a collection sharded on a different field.

That work will be done only for aggregation framework output, and it's tracked by ticket SERVER-18027.

Comment by mils [ 16/Jan/18 ]

Hi, does this mean that we cannot send the output of M/R into an existing collection that is sharded on a key other than _id?

For example, my existing collection is sharded like so (where true refers to a unique shard key):

sh.shardCollection("mydb.users", { "user_id" : 1 }, true)

Comment by Scott Hernandez (Inactive) [ 20/Jun/14 ]

Simple jstest to repro, but needs more work.

/**
 * This test checks that trying to output a sharded map-reduce where a sharded collection already 
 * exists will error out.
 */
 
jsTest.log("Setting up new ShardingTest");
var st = new ShardingTest( "mrSOWESC", 2, 1, 1, { chunksize : 1 } );
 
var config = st.getDB("config");
st.adminCommand( { enablesharding: "mrSOWESC" } );
 
var testDB = st.getDB( "mrSOWESC" );
var sColl = testDB.source;
var dColl = testDB.dest;
st.adminCommand( { shardcollection: dColl.getFullName(), key: { "a": 1 } } );
 
function map() { 
    emit({k:1}, {b:1}); }
 
function reduce(key, vals) {
    return vals[0];
}
 
// Setup start data for source and dest collections (dest is sharded)
sColl.insert([{_id:1}, {_id:2}, {_id:3}, {_id:4}])
dColl.insert([{_id:1, a:1}, {_id:2, a:2}, {_id:3, a:3}, {_id:4, a:4} ])
 
assert.eq(4, dColl.count(), tojson(dColl.find().toArray()));
assert.eq(4, sColl.count(), tojson(sColl.find().toArray()));
print("******* before mapreduce *****")
db = testDB; // need to have sh helpers use the correct global "db"...
sh.status()
var res = sColl.mapReduce(map,reduce,{out:dColl.getName(), sharded:true })
printjson(res);
 
print("******* dest docs *******")
printjson(dColl.find().toArray());
 
sh.status()
printjson(dColl.find().explain())
assert.eq(1, dColl.count(), tojson(res))
assert.docEq({_id:{k:1}, value: {b:1}}, dColl.findOne(), dColl.findOne())

Generated at Thu Feb 08 03:34:30 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.