[SERVER-18940] Optimise sharded aggregations that are targeted to a single shard Created: 12/Jun/15  Updated: 26/Jul/18  Resolved: 02/Aug/17

Status: Closed
Project: Core Server
Component/s: Aggregation Framework
Affects Version/s: 3.1.4
Fix Version/s: 3.5.11

Type: Bug Priority: Major - P3
Reporter: Kevin Pulo Assignee: Bernard Gorman
Resolution: Done Votes: 0
Labels: optimization, performance
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Issue Links:
Depends
depends on SERVER-27937 pull apart the AsyncResultsMerger log... Closed
Duplicate
is duplicated by SERVER-23330 Optimize aggregation on sharded setup... Closed
is duplicated by SERVER-30216 Rename AggregationRequest::isFromRout... Closed
Related
related to SERVER-30899 Adapting meaning of 'fromRouter' caus... Closed
related to SERVER-23955 Return what host the Shard::runComman... Backlog
related to SERVER-27283 Sharded aggregations that need mergin... Backlog
related to SERVER-7656 Optimize aggregation on sharded setup... Closed
related to SERVER-22760 Sharded aggregation pipelines which i... Closed
is related to SERVER-22671 Implement serverStatus section with a... Closed
Backwards Compatibility: Fully Compatible
Operating System: ALL
Steps To Reproduce:

sh.enableSharding("test")
sh.shardCollection("test.test", { a: 1, b: 1, _id: 1 } )
sh.splitAt("test.test", { a: 0, b: 0, _id: MinKey } )

Then observe that the following aggs are each targeted to just one shard, but still have an unnecessarily split pipeline:

> db.test.aggregate([ { $match: { a: -1, b: -1 } } ], {explain: true})
{
        "splitPipeline" : {
                "shardsPart" : [
                        {
                                "$match" : {
                                        "a" : -1,
                                        "b" : -1
                                }
                        }
                ],
                "mergerPart" : [ ]
        },
        "shards" : {
                "shard02" : {
                        "host" : "genique:20085",
                        "stages" : [
                                {
                                        "$cursor" : {
                                                "query" : {
                                                        "a" : -1,
                                                        "b" : -1
                                                },
                                                "queryPlanner" : {
                                                        "plannerVersion" : 1,
                                                        "namespace" : "test.test",
                                                        "indexFilterSet" : false,
                                                        "parsedQuery" : {
                                                                "$and" : [
                                                                        {
                                                                                "a" : {
                                                                                        "$eq" : -1
                                                                                }
                                                                        },
                                                                        {
                                                                                "b" : {
                                                                                        "$eq" : -1
                                                                                }
                                                                        }
                                                                ]
                                                        },
                                                        "winningPlan" : {
                                                                "stage" : "FETCH",
                                                                "inputStage" : {
                                                                        "stage" : "SHARDING_FILTER",
                                                                        "inputStage" : {
                                                                                "stage" : "IXSCAN",
                                                                                "keyPattern" : {
                                                                                        "a" : 1,
                                                                                        "b" : 1,
                                                                                        "_id" : 1
                                                                                },
                                                                                "indexName" : "a_1_b_1__id_1",
                                                                                "isMultiKey" : false,
                                                                                "indexVersion" : 1,
                                                                                "direction" : "forward",
                                                                                "indexBounds" : {
                                                                                        "a" : [
                                                                                                "[-1.0, -1.0]"
                                                                                        ],
                                                                                        "b" : [
                                                                                                "[-1.0, -1.0]"
                                                                                        ],
                                                                                        "_id" : [
                                                                                                "[MinKey, MaxKey]"
                                                                                        ]
                                                                                }
                                                                        }
                                                                }
                                                        },
                                                        "rejectedPlans" : [ ]
                                                }
                                        }
                                }
                        ]
                }
        },
        "ok" : 1
}
> db.test.aggregate([ { $match: { a: 1, b: 1 } } ], {explain: true})
{
        "splitPipeline" : {
                "shardsPart" : [
                        {
                                "$match" : {
                                        "a" : 1,
                                        "b" : 1
                                }
                        }
                ],
                "mergerPart" : [ ]
        },
        "shards" : {
                "shard01" : {
                        "host" : "genique:20084",
                        "stages" : [
                                {
                                        "$cursor" : {
                                                "query" : {
                                                        "a" : 1,
                                                        "b" : 1
                                                },
                                                "queryPlanner" : {
                                                        "plannerVersion" : 1,
                                                        "namespace" : "test.test",
                                                        "indexFilterSet" : false,
                                                        "parsedQuery" : {
                                                                "$and" : [
                                                                        {
                                                                                "a" : {
                                                                                        "$eq" : 1
                                                                                }
                                                                        },
                                                                        {
                                                                                "b" : {
                                                                                        "$eq" : 1
                                                                                }
                                                                        }
                                                                ]
                                                        },
                                                        "winningPlan" : {
                                                                "stage" : "FETCH",
                                                                "inputStage" : {
                                                                        "stage" : "SHARDING_FILTER",
                                                                        "inputStage" : {
                                                                                "stage" : "IXSCAN",
                                                                                "keyPattern" : {
                                                                                        "a" : 1,
                                                                                        "b" : 1,
                                                                                        "_id" : 1
                                                                                },
                                                                                "indexName" : "a_1_b_1__id_1",
                                                                                "isMultiKey" : false,
                                                                                "indexVersion" : 1,
                                                                                "direction" : "forward",
                                                                                "indexBounds" : {
                                                                                        "a" : [
                                                                                                "[1.0, 1.0]"
                                                                                        ],
                                                                                        "b" : [
                                                                                                "[1.0, 1.0]"
                                                                                        ],
                                                                                        "_id" : [
                                                                                                "[MinKey, MaxKey]"
                                                                                        ]
                                                                                }
                                                                        }
                                                                }
                                                        },
                                                        "rejectedPlans" : [ ]
                                                }
                                        }
                                }
                        ]
                }
        },
        "ok" : 1
}

and indeed, if the aggs are run without explain, $mergeCursors aggs can be observed in verbose logs on the primary shard.

Sprint: Query 2017-07-10, Query 2017-07-31, Query 2017-08-21
Participants:

 Description   

The fix implemented on SERVER-7656 doesn't fully address the issue, because it only works with an exact $match on the full shard key. However, it is possible for an aggregation to be directed to a single shard even without this. For example:

  1. a range query that is wholly contained within a single chunk
  2. a prefix match on the shard key, where the trailing fields are wholly contained within a single chunk

This means that an aggregation may be perfectly targeted to a single shard, but will not be passed through to that shard.

Case #2 is particularly problematic when _id is appended to the shard key (to guarantee granularity), but is never actually used in queries/aggs (and the rest of the shard key is usually/often enough to target just one shard).



 Comments   
Comment by Githook User [ 10/Aug/17 ]

Author:

{'email': 'bernard.gorman@gmail.com', 'name': 'Bernard Gorman'}

Message: SERVER-18940 Optimise sharded aggregations that are targeted to a single shard
Branch: master
https://github.com/mongodb/mongo/commit/27d43e300e292043fefd7634de99160157955a17

Comment by Bernard Gorman [ 02/Aug/17 ]

JIRA githook was down. Commit: https://github.com/mongodb/mongo/commit/27d43e300e292043fefd7634de99160157955a17

Comment by Asya Kamsky [ 26/Jan/17 ]

This could be alleviated if we only considered as candidates for merge the shards that have data to contribute to the merge (SERVER-27283) - if data lived on one shard only, that's the only shard that will be involved.

It might not be as fast as re-writing pipeline without merge, but it's going to be faster than moving data elsewhere.

Comment by Charlie Swanson [ 28/Jul/16 ]

Related to SERVER-22671 in that both of these tickets will probably require some work to move away from using Strategy::commandOp to do the targeting. This ticket requires customized retrying logic (we need to know if the targeting failed and it is now potentially targeted to two shards), SERVER-22671, if implemented as an aggregation stage, requires forced targeting to all shards.

Generated at Thu Feb 08 03:49:18 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.