[SERVER-19643] Aggregation on unversioned connection should not use SHARDING_FILTER execution stage Created: 29/Jul/15  Updated: 07/Mar/16  Resolved: 08/Feb/16

Status: Closed
Project: Core Server
Component/s: Aggregation Framework
Affects Version/s: 3.0.4
Fix Version/s: 3.3.2

Type: Bug Priority: Minor - P4
Reporter: Kevin Pulo Assignee: Benjamin Murphy
Resolution: Done Votes: 0
Labels: grab-bag, optimization, usability
Remaining Estimate: Not Specified
Time Spent: Not Specified
Original Estimate: Not Specified

Backwards Compatibility: Fully Compatible
Operating System: ALL
Sprint: Query F (02/01/16), Query 10 (02/22/16)
Participants:

 Description   

The SHARDING_FILTER stage seems to always be included in aggregations on shard servers, even if the connection is not versioned.

This is inconsistent with how find() works, which defies user expectations. It means that it's not possible to run an aggregation on orphaned documents (which would be strange, but could be desirable).

I'm not sure, but it may also mean that the presence of a migration might not be properly noticed by an in-progress agg. This will depend on how the shard versioning is handled, and I would much more strongly expect it in the case of an unversioned connection. This might actually cause results to be wrong (either by not noticing that some documents have moved away, or not noticing that more have arrived). In the case of an unversioned direct connection, this isn't entirely unexpected (since they normally see orphans anyway), but it might mean that aggregations in this case could sometimes avoid orphans, and sometimes not, which wouldn't be great.

Observed in 3.0.4.

mongos

mongos> sh.status()
--- Sharding Status ---
  sharding version: {
        "_id" : 1,
        "minCompatibleVersion" : 5,
        "currentVersion" : 6,
        "clusterId" : ObjectId("55b749b3a7717d4ea1a796fa")
}
  shards:
        {  "_id" : "shard01",  "host" : "shard01/genique:11112" }
        {  "_id" : "shard02",  "host" : "shard02/genique:11113" }
  databases:
        {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
        {  "_id" : "test",  "partitioned" : true,  "primary" : "shard01" }
                test.test
                        shard key: { "_id" : 1 }
                        chunks:
                                shard02 1
                                shard01 1
                        { "_id" : { "$minKey" : 1 } } -->> { "_id" : 0 } on : shard02 Timestamp(2, 0)
                        { "_id" : 0 } -->> { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(2, 1)
 
mongos> db.test.find()
{ "_id" : -2, "not_an_orphan" : 1 }
{ "_id" : -3, "not_an_orphan" : 2 }
mongos> db.test.aggregate([])
{ "_id" : -2, "not_an_orphan" : 1 }
{ "_id" : -3, "not_an_orphan" : 2 }

direct to shard01

shard01:PRIMARY> db.test.find()
{ "_id" : -1 }
shard01:PRIMARY> db.test.aggregate([])
shard01:PRIMARY> db.test.find().explain(true)
{
        "queryPlanner" : {
                "plannerVersion" : 1,
                "namespace" : "test.test",
                "indexFilterSet" : false,
                "parsedQuery" : {
                        "$and" : [ ]
                },
                "winningPlan" : {
                        "stage" : "COLLSCAN",
                        "filter" : {
                                "$and" : [ ]
                        },
                        "direction" : "forward"
                },
                "rejectedPlans" : [ ]
        },
        "executionStats" : {
                "executionSuccess" : true,
                "nReturned" : 1,
                "executionTimeMillis" : 0,
                "totalKeysExamined" : 0,
                "totalDocsExamined" : 1,
                "executionStages" : {
                        "stage" : "COLLSCAN",
                        "filter" : {
                                "$and" : [ ]
                        },
                        "nReturned" : 1,
                        "executionTimeMillisEstimate" : 0,
                        "works" : 3,
                        "advanced" : 1,
                        "needTime" : 1,
                        "needFetch" : 0,
                        "saveState" : 0,
                        "restoreState" : 0,
                        "isEOF" : 1,
                        "invalidates" : 0,
                        "direction" : "forward",
                        "docsExamined" : 1
                },
                "allPlansExecution" : [ ]
        },
        "serverInfo" : {
                "host" : "genique",
                "port" : 11112,
                "version" : "3.0.4",
                "gitVersion" : "0481c958daeb2969800511e7475dc66986fa9ed5"
        }
}
shard01:PRIMARY> db.test.aggregate([], {explain:true})
{
        "stages" : [
                {
                        "$cursor" : {
                                "query" : {
 
                                },
                                "queryPlanner" : {
                                        "plannerVersion" : 1,
                                        "namespace" : "test.test",
                                        "indexFilterSet" : false,
                                        "parsedQuery" : {
                                                "$and" : [ ]
                                        },
                                        "winningPlan" : {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>               "stage" : "SHARDING_FILTER",
                                                "inputStage" : {
                                                        "stage" : "COLLSCAN",
                                                        "filter" : {
                                                                "$and" : [ ]
                                                        },
                                                        "direction" : "forward"
                                                }
                                        },
                                        "rejectedPlans" : [ ]
                                }
                        }
                }
        ],
        "ok" : 1,
        "$gleStats" : {
                "lastOpTime" : Timestamp(1438078475, 1),
                "electionId" : ObjectId("55b749b326a7d1afefbbfebb")
        }
}



 Comments   
Comment by Githook User [ 08/Feb/16 ]

Author:

{u'username': u'benjaminmurphy', u'name': u'Benjamin Murphy', u'email': u'benjamin_murphy@me.com'}

Message: SERVER-19643 Aggregation directly from a shard does not use sharding filter.
Branch: master
https://github.com/mongodb/mongo/commit/c3a232ef38480f317cd42cc00755fd9032c107d2

Comment by Charlie Swanson [ 29/Jul/15 ]

This looks like it is indeed on purpose, stemming from this line. It is not clear why it is always included, but my guess is that it was thought that a user would never want to see orphan documents. redbeard0531, any insights?

Generated at Thu Feb 08 03:51:38 UTC 2024 using Jira 9.7.1#970001-sha1:2222b88b221c4928ef0de3161136cc90c8356a66.