-
Type: Task
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: None
-
Component/s: Aggregation Framework
-
None
-
Fully Compatible
-
Query 2018-07-16, Query 2018-07-30, Query 2018-08-13, Query 2018-08-27, Query 2018-09-10
For pipelines like the following:
db.teams.aggregate([ {$unwind: "$users"}, {$group: {_id: "$users.username"}, teams: {$push: "$_id"}}, {$project: {username: "$_id", teams: 1, _id: 0}}, {$out: {to: "users", mode: "replace", uniqueKey: {username: 1}}}, ])
The ideal execution plan would be to perform the $unwind and a partial group on each shard, then scatter the partial groups by _id to merge in parallel on each shard. Supposing "users" was sharded with the shard key pattern {username: 1}, we should distribute each partial group to the shard that would own that range of usernames.
For example, suppose we had two shards: A and B. The "teams" collection is sharded and has some chunks on each shard. The "users" collection is sharded and has the chunk [MinKey, "example") on shard A and ["example", MaxKey] on shard B. Then we should execute this plan by having each shard perform a partial group on the teams collection, then send the partial groups with _id in the range [MinKey, "example") to shard A, and the partial groups with _id in the range ["example", MaxKey] to shard B. That way, in the absence of any chunk migrations, the final $out stage will perform a local write.
In order to detect that partitioning by the shard key in this way is a good idea, we'll need to do some analysis of the merging half of the pipeline. It'll look something like this:
- Check if the last stage is a $out to a sharded collection
- If so, consult the chunk manager to find the shard key and the routing table.
- Work backwards through the merging half of the pipeline, tracking if any of the shard key fields are modified. If they are renamed, remember their new name.
- If we get to the beginning of the merging pipeline without anyone modifying the fields, output the routing table and which fields should be used to partition the data.
Once we do this, we can take advantage of this machinery and the work in SERVER-35899 to set up an $exchange across all the shards.
- is depended on by
-
SERVER-35905 Plug pieces together to perform a distributed $exchange when applicable
- Closed
-
SERVER-36364 For sharded $out, select merging shard only from set of shards that might accept writes
- Closed