commit bebbf2f269699e95167b6519ba40543426c1ab49 Author: Charlie Swanson Date: Fri Jan 10 15:10:24 2020 -0500 SERVER-40909 Absorb skip into the query layer when possible diff --git a/jstests/aggregation/bugs/skip_limit_overflow.js b/jstests/aggregation/bugs/skip_limit_overflow.js index 00cdc936b0..962d279ace 100644 --- a/jstests/aggregation/bugs/skip_limit_overflow.js +++ b/jstests/aggregation/bugs/skip_limit_overflow.js @@ -59,27 +59,24 @@ function testPipeline(pipeline, expectedResult, optimizedAwayStages) { assert.eq(coll.aggregate(pipeline).toArray(), []); } -// Case where overflow of limit + skip prevents limit stage from being absorbed. Values -// are specified as integrals > MAX_LONG. Note that we cannot specify this huge value as -// a NumberLong, as we get a number conversion error (even if it's passed as a string). testPipeline([{$sort: {x: -1}}, {$skip: 18446744073709552000}, {$limit: 6}], { $limit: {path: "$limit", expectedValue: [NumberLong(6)]}, - $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775807")]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775807")]} }); testPipeline([{$sort: {x: -1}}, {$skip: 6}, {$limit: 18446744073709552000}], { $limit: {path: "$limit", expectedValue: [NumberLong("9223372036854775807")]}, - $skip: {path: "$skip", expectedValue: [NumberLong(6)]} + SKIP: {path: "skipAmount", expectedValue: [6]} }); // Case where overflow of limit + skip prevents limit stage from being absorbed. One of the // values == MAX_LONG, another one is 1. testPipeline([{$sort: {x: -1}}, {$skip: NumberLong("9223372036854775807")}, {$limit: 1}], { $limit: {path: "$limit", expectedValue: [NumberLong(1)]}, - $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775807")]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775807")]} }); testPipeline([{$sort: {x: -1}}, {$skip: 1}, {$limit: NumberLong("9223372036854775807")}], { $limit: {path: "$limit", expectedValue: [NumberLong("9223372036854775807")]}, - $skip: {path: "$skip", expectedValue: [NumberLong(1)]} + SKIP: {path: "skipAmount", expectedValue: [1]} }); // Case where limit + skip do not overflow. Limit == MAX_LONG and skip is 0. Should be able to @@ -94,13 +91,13 @@ testPipeline([{$sort: {x: -1}}, {$skip: 0}, {$limit: NumberLong("922337203685477 testPipeline([{$sort: {x: -1}}, {$skip: NumberLong("9223372036854775806")}, {$limit: 1}], { SORT: {path: "limitAmount", expectedValue: [NumberLong("9223372036854775807")]}, - $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775806")]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775806")]} }, ["$limit"]); testPipeline([{$sort: {x: -1}}, {$skip: 1}, {$limit: NumberLong("9223372036854775806")}], { SORT: {path: "limitAmount", expectedValue: [NumberLong("9223372036854775807")]}, - $skip: {path: "$skip", expectedValue: [NumberLong(1)]} + SKIP: {path: "skipAmount", expectedValue: [1]} }, ["$limit"]); @@ -116,6 +113,8 @@ testPipeline( ], { SORT: {path: "limitAmount", expectedValue: [NumberLong("9223372036854775807")]}, + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775800")]}, + $skip: {path: "$skip", expectedValue: [NumberLong(10)]}, $limit: {path: "$limit", expectedValue: [NumberLong(1)]} }); @@ -131,40 +130,41 @@ testPipeline( ], { SORT: {path: "limitAmount", expectedValue: [NumberLong("9223372036854775804")]}, - $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775803")]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775803")]} }); // Case where limit + skip do not overflow. Both values are < MAX_LONG. testPipeline([{$sort: {x: -1}}, {$skip: 674761616283}, {$limit: 35361718}], { SORT: {path: "limitAmount", expectedValue: [NumberLong(674796978001)]}, - $skip: {path: "$skip", expectedValue: [NumberLong(674761616283)]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong(674761616283)]} }, ["$limit"]); testPipeline([{$sort: {x: -1}}, {$skip: 35361718}, {$limit: 674761616283}], { SORT: {path: "limitAmount", expectedValue: [NumberLong(674796978001)]}, - $skip: {path: "$skip", expectedValue: [NumberLong(35361718)]} + SKIP: {path: "skipAmount", expectedValue: [35361718]} }, ["$limit"]); // Case where where overflow of limit + skip + skip prevents limit stage from being absorbed. -// One skip == MAX_LONG - 1, another one is 1. Should merge two skip stages into one. +// One skip == MAX_LONG - 1, another one is 1. Should merge two skip stages into one and push down. testPipeline( [{$sort: {x: -1}}, {$skip: 1}, {$skip: NumberLong("9223372036854775806")}, {$limit: 1}], { $limit: {path: "$limit", expectedValue: [NumberLong(1)]}, - $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775807")]} + SKIP: {path: "skipAmount", expectedValue: [NumberLong("9223372036854775807")]} }, ["$sort"]); -// Case where where overflow of limit + skip + skip prevents limit stage from being absorbed. -// One skip == MAX_LONG, another one is 1. Should not absorb or merge any stages. +// Case where where overflow of limit + skip + skip prevents limit stage and one of the skip stages +// from being absorbed. One skip == MAX_LONG, another one is 1. Should absorb the first skip. testPipeline( [{$sort: {x: -1}}, {$skip: 1}, {$skip: NumberLong("9223372036854775807")}, {$limit: 1}], { $limit: {path: "$limit", expectedValue: [NumberLong(1)]}, - $skip: {path: "$skip", expectedValue: [NumberLong(1), NumberLong("9223372036854775807")]} + SKIP: {path: "skipAmount", expectedValue: [1]}, + $skip: {path: "$skip", expectedValue: [NumberLong("9223372036854775807")]} }, ["$sort"]); diff --git a/jstests/aggregation/extras/limitskip.js b/jstests/aggregation/extras/limitskip.js deleted file mode 100644 index 3644282b3c..0000000000 --- a/jstests/aggregation/extras/limitskip.js +++ /dev/null @@ -1,79 +0,0 @@ - -var coll = "numbers"; - -db[coll].drop(); -for (i = 0; i < 100; i++) { - db[coll].save({_id: i, mod: [i % 2, i % 3, i % 5]}); -} - -print("-----LIMIT-----"); - -print("normal limit"); -var doc = db.runCommand({aggregate: coll, pipeline: [{$limit: 2}]}); -assert.eq(doc.result.length, 2, tojson(doc)); - -print("limit larger than result size"); -doc = db.runCommand({aggregate: coll, pipeline: [{$limit: 200}]}); -assert.eq(doc.result.length, 100, tojson(doc)); - -print("limit on sort"); -doc = db.runCommand({aggregate: coll, pipeline: [{$sort: {_id: -1}}, {$limit: 3}]}); -r = doc.result; -assert.eq(doc.result.length, 3); -for (var i = 0; i < r; i++) { - assert.eq(100 - r[i]._id, i, tojson(doc)); -} - -print("TODO: invalid limit"); // once assert has been replaced with uassert - -print("-----SKIP------"); - -print("normal skip"); -doc = db.runCommand({aggregate: coll, pipeline: [{$skip: 95}]}); -assert.eq(doc.result.length, 5, tojson(doc)); - -print("skip larger than result size"); -doc = db.runCommand({aggregate: coll, pipeline: [{$skip: 102}]}); -assert.eq(doc.result.length, 0, tojson(doc)); - -print("check skip results"); -doc = db.runCommand({aggregate: coll, pipeline: [{$sort: {_id: 1}}, {$skip: 6}, {$limit: 3}]}); -assert.eq(doc.result.length, 3, tojson(doc)); -for (var i = 0; i < 3; i++) { - assert.eq(i + 6, doc.result[i]._id, tojson(doc)); -} - -print("TODO: invalid skip"); // once assert has been replaced with uassert - -print("on virtual collection"); -doc = db.runCommand({ - aggregate: coll, - pipeline: [ - {$unwind: "$mod"}, - {$project: {m: "$mod"}}, - {$sort: {m: 1, _id: -1}}, - {$skip: 150}, - {$limit: 5} - ] -}); - -assert.eq(doc.result.length, 5); -for (var i = 0; i < 5; i++) { - assert.eq(1, doc.result[i].m, tojson(doc)); -} -assert.eq(doc.result[0]._id, 55, tojson(doc)); -assert.eq(doc.result[1]._id, 53, tojson(doc)); -assert.eq(doc.result[2]._id, 52, tojson(doc)); -assert.eq(doc.result[3]._id, 51, tojson(doc)); -assert.eq(doc.result[4]._id, 51, tojson(doc)); - -print("size 0 collection"); -db[coll].drop(); - -doc = db.runCommand({aggregate: coll, pipeline: [{$skip: 6}]}); -assert.eq(doc.ok, 1); -assert.eq(doc.result.length, 0); - -doc = db.runCommand({aggregate: coll, pipeline: [{$limit: 3}]}); -assert.eq(doc.ok, 1); -assert.eq(doc.result.length, 0); diff --git a/jstests/aggregation/optimize_away_pipeline.js b/jstests/aggregation/optimize_away_pipeline.js index ab4be0995a..5a4ece07bc 100644 --- a/jstests/aggregation/optimize_away_pipeline.js +++ b/jstests/aggregation/optimize_away_pipeline.js @@ -210,14 +210,13 @@ assertPipelineDoesNotUseAggregation({ }); assert.commandWorked(coll.deleteOne({_id: 4})); -// Pipelines which cannot be optimized away. - -// TODO SERVER-40909: $skip stage is not supported yet. -assertPipelineUsesAggregation({ +assertPipelineDoesNotUseAggregation({ pipeline: [{$match: {x: {$gte: 20}}}, {$skip: 1}], - expectedStages: ["COLLSCAN"], + expectedStages: ["COLLSCAN", "SKIP"], expectedResult: [{_id: 3, x: 30}] }); + +// Pipelines which cannot be optimized away. // We cannot optimize away a pipeline if there are stages which have no equivalent in the // find command. assertPipelineUsesAggregation({ @@ -389,10 +388,7 @@ let limitStage = getAggPlanStage(explain, "LIMIT"); assert.neq(null, limitStage, explain); assert.eq(1, limitStage.limitAmount, explain); -// We can optimize away interleaved $limit and $skip after a project. The $limits can be collapsed -// into a single $limit:35 prior to the $skip stages. We currently do not push down $skip into the -// PlanStage layer (see SERVER-40909), which prevents this pipeline from being entirely optimized -// away. +// We can optimize away interleaved $limit and $skip after a project. pipeline = [ {$match: {x: {$gte: 0}}}, {$project: {_id: 0, x: 1}}, @@ -401,18 +397,20 @@ pipeline = [ {$skip: 10}, {$limit: 7} ]; -assertPipelineUsesAggregation({ +assertPipelineDoesNotUseAggregation({ pipeline: pipeline, - expectedStages: ["IXSCAN", "PROJECTION_COVERED", "LIMIT"], - optimizedAwayStages: ["$match", "$limit"], + expectedStages: ["IXSCAN", "PROJECTION_COVERED", "LIMIT", "SKIP"], + optimizedAwayStages: ["$match", "$limit", "$skip"], }); explain = coll.explain().aggregate(pipeline); +let skipStage = getAggPlanStage(explain, "SKIP"); +assert.neq(null, skipStage, explain); +assert.eq(30, skipStage.skipAmount, explain); limitStage = getAggPlanStage(explain, "LIMIT"); assert.neq(null, limitStage, explain); -assert.eq(35, limitStage.limitAmount, explain); -let skipStage = getAggPlanStage(explain, "$skip"); -assert.neq(null, skipStage, explain); -assert.eq(30, skipStage.$skip, explain); +let expectedLimit = + (Math.min(20 + 15 /* first limit */, 20 + 10 + 7 /* second limit */) - (20 + 10)); +assert.eq(expectedLimit, limitStage.limitAmount, explain); assert.commandWorked(coll.dropIndexes()); diff --git a/jstests/aggregation/stages/skip_with_limit.js b/jstests/aggregation/stages/skip_with_limit.js index d0bad0ed03..c3644dbf2d 100644 --- a/jstests/aggregation/stages/skip_with_limit.js +++ b/jstests/aggregation/stages/skip_with_limit.js @@ -47,4 +47,29 @@ assert.eq(count, 2); count = coll.aggregate([{$match: {x: 4}}, {$skip: 18}, {$group: {_id: '$y'}}, {$limit: 5}]).itcount(); assert.eq(count, 2); + +// Now add some pipelines that have multiple consecutive skips to test that our logic to swap a +// limit in front of a skip adds the correct total to the limit. For example, in the first test the +// limit should end up being 23. Here we also throw in some tests with $sort stages, because $sort +// stages will try to pull limits forward. +count = coll.aggregate([{$match: {x: 4}}, {$sort: {x: 1}}, {$skip: 10}, {$skip: 8}, {$limit: 5}]) + .itcount(); +assert.eq(count, 2); + +jsTestLog(coll.explain().aggregate( + [{$match: {x: 4}}, {$skip: 5}, {$limit: 10}, {$skip: 5}, {$limit: 4}])); +count = + coll.aggregate([{$match: {x: 4}}, {$skip: 5}, {$limit: 10}, {$skip: 5}, {$limit: 4}]).itcount(); +assert.eq(count, 4); + +count = coll.aggregate([{$match: {x: 4}}, {$skip: 7}, {$skip: 4}, {$limit: 4}]).itcount(); +assert.eq(count, 4); +count = coll.aggregate([{$match: {x: 4}}, {$sort: {y: -1}}, {$skip: 7}, {$skip: 4}, {$limit: 4}]) + .itcount(); +assert.eq(count, 4); +count = coll.aggregate([{$match: {x: 4}}, {$skip: 7}, {$skip: 10}, {$limit: 4}]).itcount(); +assert.eq(count, 3); +count = coll.aggregate([{$match: {x: 4}}, {$sort: {y: -1}}, {$skip: 7}, {$skip: 10}, {$limit: 4}]) + .itcount(); +assert.eq(count, 3); }()); diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 91e5c77f5a..e69d5a20a4 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -189,7 +189,7 @@ bool DocumentSource::pushMatchBefore(Pipeline::SourceContainer::iterator itr, bool DocumentSource::pushSampleBefore(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { auto nextSample = dynamic_cast((*std::next(itr)).get()); - if (constraints().canSwapWithLimitAndSample && nextSample) { + if (constraints().canSwapWithSkippingOrLimitingStage && nextSample) { container->insert(itr, std::move(nextSample)); container->erase(std::next(itr)); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index c30c3a9d6c..00ae37ba50 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -203,7 +203,7 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { LookupRequirement::kAllowed); constraints.canSwapWithMatch = true; - constraints.canSwapWithLimitAndSample = !_unwindSrc; + constraints.canSwapWithSkippingOrLimitingStage = !_unwindSrc; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 8ef435bd77..e984cfaa3d 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -65,7 +65,7 @@ public: LookupRequirement::kAllowed, ChangeStreamRequirement::kWhitelist); constraints.canSwapWithMatch = true; - constraints.canSwapWithLimitAndSample = true; + constraints.canSwapWithSkippingOrLimitingStage = true; constraints.isAllowedWithinUpdatePipeline = true; // This transformation could be part of a 'collectionless' change stream on an entire // database or cluster, mark as independent of any collection if so. diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 605d5022c3..2cc5e40f67 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -120,9 +120,11 @@ boost::optional DocumentSourceSort::getLimit() const { : boost::none; } -boost::optional DocumentSourceSort::extractLimitForPushdown( - Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { - int64_t skipSum = 0; +LimitThenSkip DocumentSourceSort::extractSkipAndLimitForPushdown( + Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container, + bool extractSkips) { + long long skipSum = 0; boost::optional minLimit; while (itr != container->end()) { auto nextStage = (*itr).get(); @@ -134,13 +136,16 @@ boost::optional DocumentSourceSort::extractLimitForPushdown( // overflow before applying an optimization to swap the $limit with the $skip. if (nextSkip && !overflow::add(skipSum, nextSkip->getSkip(), &safeSum)) { skipSum = safeSum; - ++itr; - } else if (nextLimit && !overflow::add(nextLimit->getLimit(), skipSum, &safeSum)) { - if (!minLimit) { - minLimit = safeSum; + if (extractSkips) { + itr = container->erase(itr); } else { - minLimit = std::min(static_cast(safeSum), *minLimit); + ++itr; } + } else if (nextLimit && !overflow::add(nextLimit->getLimit(), skipSum, &safeSum)) { + // Unlike skipping, limiting is not additive. So simply preserve the minimum. For + // example, [{$limit: 10}, {$limit: 5}] is the same as [{$limit: 5}]. + minLimit = std::min(static_cast(safeSum), + minLimit.value_or(std::numeric_limits::max())); itr = container->erase(itr); // If the removed stage wasn't the last in the pipeline, make sure that the stage @@ -148,14 +153,17 @@ boost::optional DocumentSourceSort::extractLimitForPushdown( if (itr != container->end()) { (*itr)->setSource(itr != container->begin() ? std::prev(itr)->get() : nullptr); } - } else if (!nextStage->constraints().canSwapWithLimitAndSample) { + } else if (!nextStage->constraints().canSwapWithSkippingOrLimitingStage) { break; } else { ++itr; } } - return minLimit; + if (skipSum > 0) { + return {minLimit, skipSum}; + } + return {minLimit, boost::none}; } Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( @@ -163,9 +171,9 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( invariant(*itr == this); auto stageItr = std::next(itr); - auto limit = extractLimitForPushdown(stageItr, container); - if (limit) { - _sortExecutor->setLimit(*limit); + auto limitThenSkip = extractSkipAndLimitForPushdown(stageItr, container, false); + if (limitThenSkip.limit) { + _sortExecutor->setLimit(*limitThenSkip.limit); } return std::next(itr); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 69ff2b8abb..53c9d830c3 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -34,6 +34,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/limit_then_skip.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/sorter/sorter.h" @@ -45,16 +46,18 @@ public: static constexpr StringData kStageName = "$sort"_sd; /** - * If there are any $limit stages that could be logically swapped forward to the position of the - * pipeline pointed to by 'itr' without changing the meaning of the query, removes these $limit - * stages from the Pipeline and returns the resulting limit. A single limit value is computed by - * taking the minimum after swapping each individual $limit stage forward. + * If there are any $skip and/or $limit stages that could be logically swapped forward to the + * position of the pipeline pointed to by 'itr' without changing the meaning of the query, + * removes those stages from the Pipeline and returns the resulting skip and limit. The skip + * value is the sum of all eligible skip values. A single limit value is computed by taking the + * minimum of each $limit stage encountered, factoring in any additional padding to preserve + * documents to be skipped. * - * This method also implements the ability to swap a $limit before a $skip, by adding the value - * of the $skip to the value of the $limit. + * If 'extractSkips' is set to false, the skips will stay in the pipeline unmodified. */ - static boost::optional extractLimitForPushdown( - Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container); + static LimitThenSkip extractSkipAndLimitForPushdown(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container, + bool extractSkips = true); const char* getSourceName() const final { return kStageName.rawData(); diff --git a/src/mongo/db/pipeline/limit_then_skip.h b/src/mongo/db/pipeline/limit_then_skip.h new file mode 100644 index 0000000000..91f142dfe8 --- /dev/null +++ b/src/mongo/db/pipeline/limit_then_skip.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include + +namespace mongo { +// A struct representing a limit and a skip, with the limit to be applied before the skip. +struct LimitThenSkip { + boost::optional limit; + boost::optional skip; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f8a3132b06..a89d879efe 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -174,7 +174,7 @@ StatusWith> attemptToGetExe BSONObj projectionObj, const QueryMetadataBitSet& metadataRequested, BSONObj sortObj, - boost::optional limit, + LimitThenSkip limitThenSkip, boost::optional groupIdForDistinctScan, const AggregationRequest* aggRequest, const size_t plannerOpts, @@ -184,7 +184,16 @@ StatusWith> attemptToGetExe qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); - qr->setLimit(limit); + qr->setSkip(limitThenSkip.skip); + // The query semantics are to apply the skip before the limit. The struct we have here is in the + // opposite order though. Consequently, if there is both a skip and limit, we have to subtract + // the skip from the limit. + // For example, this struct might say limit 10 then skip 5. Which is the same as skip 5 then + // limit 5. + if (limitThenSkip.limit) { + invariant(!limitThenSkip.skip || *limitThenSkip.skip < *limitThenSkip.limit); + qr->setLimit(*limitThenSkip.limit - limitThenSkip.skip.value_or(0)); + } if (aggRequest) { qr->setExplain(static_cast(aggRequest->getExplain())); qr->setHint(aggRequest->getHint()); @@ -429,19 +438,20 @@ getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) { return std::make_pair(sortStage, groupStage); } -boost::optional extractLimitForPushdown(Pipeline* pipeline) { +LimitThenSkip extractSkipAndLimitForPushdown(Pipeline* pipeline) { // If the disablePipelineOptimization failpoint is enabled, then do not attempt the limit // pushdown optimization. if (MONGO_unlikely(disablePipelineOptimization.shouldFail())) { - return boost::none; + return {boost::none, boost::none}; } auto&& sources = pipeline->getSources(); - auto limit = DocumentSourceSort::extractLimitForPushdown(sources.begin(), &sources); - if (limit) { - // Removing $limit stages may have produced the opportunity for additional optimizations. + auto limitThenSkip = + DocumentSourceSort::extractSkipAndLimitForPushdown(sources.begin(), &sources); + if (limitThenSkip.skip || limitThenSkip.limit) { + // Removing stages may have produced the opportunity for additional optimizations. pipeline->optimizePipeline(); } - return limit; + return limitThenSkip; } /** @@ -525,7 +535,7 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection, // This only handles the case in which the the $limit can logically be swapped to the front of // the pipeline. We can also push down a $limit which comes after a $sort into the PlanStage // layer, but that is handled elsewhere. - const auto limit = extractLimitForPushdown(pipeline); + const auto limitThenSkip = extractSkipAndLimitForPushdown(pipeline); auto unavailableMetadata = DocumentSourceMatch::isTextQuery(queryObj) ? DepsTracker::kDefaultUnavailableMetadata & ~DepsTracker::kOnlyTextScore @@ -541,7 +551,7 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection, std::move(rewrittenGroupStage), unavailableMetadata, queryObj, - limit, + limitThenSkip, aggRequest, Pipeline::kAllowedMatcherFeatures, &shouldProduceEmptyDocs)); @@ -598,7 +608,7 @@ PipelineD::buildInnerQueryExecutorGeoNear(Collection* collection, nullptr, /* rewrittenGroupStage */ DepsTracker::kDefaultUnavailableMetadata & ~DepsTracker::kAllGeoNearData, std::move(fullQuery), - boost::none, /* limit */ + LimitThenSkip{boost::none, boost::none}, aggRequest, Pipeline::kGeoNearMatcherFeatures, &shouldProduceEmptyDocs)); @@ -633,7 +643,7 @@ StatusWith> PipelineD::prep std::unique_ptr rewrittenGroupStage, QueryMetadataBitSet unavailableMetadata, const BSONObj& queryObj, - boost::optional limit, + LimitThenSkip limitThenSkip, const AggregationRequest* aggRequest, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, bool* hasNoRequirements) { @@ -656,12 +666,18 @@ StatusWith> PipelineD::prep .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) .toBson(); - // If the $sort has a coalesced $limit, then we push it down as well. Since the $limit was - // after a $sort in the pipeline, it should not have been provided by the caller. - invariant(!limit); - limit = sortStage->getLimit(); - pipeline->popFrontWithName(DocumentSourceSort::kStageName); + + // Now that we've pushed down the sort, see if there is a $limit and $skip to push down + // also. We should not already have a limit or skip here, otherwise it would be incorrect + // for the caller to pass us a sort stage to push down, since the order matters. + invariant(!limitThenSkip.limit); + invariant(!limitThenSkip.skip); + if (sortStage->getLimit()) { + // Temporarily add it back to participate in our analysis. + pipeline->addInitialSource(DocumentSourceLimit::create(expCtx, *sortStage->getLimit())); + } + limitThenSkip = extractSkipAndLimitForPushdown(pipeline); } // Perform dependency analysis. In order to minimize the dependency set, we only analyze the @@ -698,7 +714,7 @@ StatusWith> PipelineD::prep projObj, deps.metadataDeps(), sortObj, - boost::none, /* limit */ + LimitThenSkip{boost::none, boost::none}, rewrittenGroupStage->groupId(), aggRequest, plannerOpts, @@ -738,7 +754,7 @@ StatusWith> PipelineD::prep projObj, deps.metadataDeps(), sortObj, - limit, + limitThenSkip, boost::none, /* groupIdForDistinctScan */ aggRequest, plannerOpts, diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 9eab5f93b3..47cd2216d7 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/limit_then_skip.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/plan_executor.h" @@ -186,7 +187,7 @@ private: std::unique_ptr rewrittenGroupStage, QueryMetadataBitSet metadataAvailable, const BSONObj& queryObj, - boost::optional limit, + LimitThenSkip, const AggregationRequest* aggRequest, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, bool* hasNoRequirements); diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index da6133f11d..2e4650e5d9 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -306,12 +306,9 @@ struct StageConstraints { // $match predicates be swapped before itself. bool canSwapWithMatch = false; - // Neither a $sample nor a $limit can be moved before any stage which will possibly change the - // number of documents in the stream. Further, no stage which will change the order of documents - // can be swapped with a $limit or $sample, and no stage which will change behavior based on the - // order of documents can be swapped with a $sample because our implementation of sample will do - // a random sort which shuffles the order. - bool canSwapWithLimitAndSample = false; + // True if this stage can be safely swapped with a stage which alters the number of documents in + // the stream. For example, a $project can be safely swapped with a $skip, $limit, or $sample. + bool canSwapWithSkippingOrLimitingStage = false; // Indicates that a stage is allowed within a pipeline-stlye update. bool isAllowedWithinUpdatePipeline = false; diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 5e3f2e38d5..259f5ccb8f 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -139,7 +139,7 @@ boost::optional getPipelineLimit(Pipeline* pipeline) { // If this stage is one that can swap with a $limit stage, then we can look at the previous // stage to see if it includes a limit. Otherwise, we give up trying to find a limit on this // stage's output. - if (!source->constraints().canSwapWithLimitAndSample) { + if (!source->constraints().canSwapWithSkippingOrLimitingStage) { break; } } @@ -190,7 +190,7 @@ void propagateDocLimitToShards(Pipeline* shardPipe, Pipeline* mergePipe) { // If there are any stages in the merge pipeline before the $skip and $limit stages, then we // cannot use the $limit to determine an upper bound, unless those stages could be swapped // with the $limit. - if (!source->constraints().canSwapWithLimitAndSample) { + if (!source->constraints().canSwapWithSkippingOrLimitingStage) { return; } }