Index: jstests/noPassthrough/explain_set_window_fields.js diff --git a/jstests/noPassthrough/explain_set_window_fields.js b/jstests/noPassthrough/explain_set_window_fields.js new file mode 100644 index 0000000000000000000000000000000000000000..d61019d6c74826590a1b46586646316eb3f21494 --- /dev/null +++ b/jstests/noPassthrough/explain_set_window_fields.js @@ -0,0 +1,177 @@ +/** + * Tests that $setWindowFields stage reports memory footprint per function when explain is run + * with verbosities "executionStats" and "allPlansExecution". + */ +(function() { +"use strict"; + +load("jstests/libs/analyze_plan.js"); // For getAggPlanStages(). + +const conn = MongoRunner.runMongod(); +const testDB = conn.getDB('test'); +const coll = testDB[jsTestName()]; +coll.drop(); +const bigStr = Array(1025).toString(); // 1KB of ',' +const nDocs = 1000; +const nPartitions = 50; +const docSize = 8 + 8 + 1024; + +const featureEnabled = + assert.commandWorked(testDB.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1})) + .featureFlagWindowFunctions.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the window function feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +let bulk = coll.initializeUnorderedBulkOp(); +for (let i = 1; i <= nDocs; i++) { + bulk.insert({_id: i, key: i % nPartitions, bigStr: bigStr}); +} +assert.commandWorked(bulk.execute()); + +/** + * Checks that the execution stats in the explain output for a $setWindowFields stage are as + * expected. + * - 'stages' is an array of the explain output of $setWindowFields stages. + * - 'expectedFunctionMemUsages' is used to check the memory footprint stats for each function. + * - 'verbosity' indicates the explain verbosity used. + */ +function checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotalMemUsage, verbosity) { + const stages = + getAggPlanStages(coll.explain(verbosity).aggregate(pipeline), "$_internalSetWindowFields"); + for (let stage of stages) { + assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage); + + if (verbosity === "executionStats" || verbosity === "allPlansExecution") { + assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage); + assert(stage.hasOwnProperty("maxTotalMemoryUsageBytes"), stage); + const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"]; + for (let field of Object.keys(maxFunctionMemUsages)) { + // Ensures that the expected functions are all included and the corresponding + // memory usage is in a reasonable range. + if (expectedFunctionMemUsages.hasOwnProperty(field)) { + assert.gt(maxFunctionMemUsages[field], + expectedFunctionMemUsages[field], + "mismatch for function '" + field + "': " + tojson(stage)); + assert.lt(maxFunctionMemUsages[field], + 2 * expectedFunctionMemUsages[field], + "mismatch for function '" + field + "': " + tojson(stage)); + } + } + + // TODO SERVER-55786: Fix memory tracking in PartitionIterator. + // assert.gt(stage["maxTotalMemoryUsageBytes"], + // expectedTotalMemUsage, + // "Incorrect total mem usage: " + tojson(stage)); + // assert.lt(stage["maxTotalMemoryUsageBytes"], + // 2 * expectedTotalMemUsage, + // "Incorrect total mem usage: " + tojson(stage)); + } else { + assert(!stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage); + assert(!stage.hasOwnProperty("maxTotalMemoryUsageBytes"), stage); + } + } +} + +(function testQueryPlannerVerbosity() { + const pipeline = [ + { + $setWindowFields: + {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}} + }, + ]; + const stages = getAggPlanStages(coll.explain("queryPlanner").aggregate(pipeline), + "$_internalSetWindowFields"); + checkExplainResult(stages, {}, {}, "queryPlanner"); +})(); + +(function testUnboundedMemUsage() { + let pipeline = [ + { + $setWindowFields: + {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}} + }, + ]; + + // The $setWindowFields stage "streams" one partition at a time, so there's only one instance of + // each function. For the default [unbounded, unbounded] window type, each function uses memory + // usage comparable to it's $group counterpart. + let expectedFunctionMemUsages = { + count: 60, + push: nDocs * 1024, + set: 1024, + }; + + // The total mem usage for unbounded windows is the total from each function as well as the size + // of all documents in the partition. + let expectedTotal = nDocs * docSize; + for (let func in expectedFunctionMemUsages) { + expectedTotal += expectedFunctionMemUsages[func]; + } + + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution"); + + // Test that the memory footprint is reduced with partitioning. + pipeline = [ + { + $setWindowFields: { + partitionBy: "$key", + output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}} + } + }, + ]; + expectedFunctionMemUsages = { + count: 60, + push: (nDocs / nPartitions) * 1024, + set: 1024, + }; + expectedTotal = (nDocs / nPartitions) * docSize; + for (let func in expectedFunctionMemUsages) { + expectedTotal += expectedFunctionMemUsages[func]; + } + + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution"); +})(); + +(function testSlidingWindowMemUsage() { + const windowSize = 10; + let pipeline = [ + { + $setWindowFields: { + sortBy: {_id: 1}, + output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}} + } + }, + ]; + const expectedFunctionMemUsages = { + runningSum: windowSize * 16 + 8, // 10 integer values per window, and 8 for the $sum state. + }; + + let expectedTotal = windowSize * docSize; + for (let func in expectedFunctionMemUsages) { + expectedTotal += expectedFunctionMemUsages[func]; + } + + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution"); + + // Adding partitioning doesn't change the peak memory usage. + pipeline = [ + { + $setWindowFields: { + partitionBy: "$key", + sortBy: {_id: 1}, + output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}} + } + }, + ]; + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution"); +})(); + +MongoRunner.stopMongod(conn); +}()); Index: src/mongo/db/pipeline/document_source_group.cpp diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 51c7ccd3fdc3dab0f8ebf04da2df25bdbf9da1b3..a871061e7195f2e8e2d6296bf46b794cc24269ec 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -160,7 +160,8 @@ int DocumentSourceGroup::freeMemory() { auto memorySaved = prevMemUsage - group.second[i]->getMemUsage(); // Update the memory usage for this AccumulationStatement. - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes -= memorySaved; + _memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName].update( + memorySaved * -1); // Update the memory usage for this group. totalMemorySaved += memorySaved; } @@ -306,16 +307,14 @@ Value DocumentSourceGroup::serialize(boost::optional MutableDocument out; out[getSourceName()] = Value(insides.freeze()); - if (explain >= ExplainOptions::Verbosity::kExecStats) { + if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { MutableDocument md; - invariant(_accumulatedFields.size() == _memoryTracker.accumStatementMemoryBytes.size()); + invariant(_accumulatedFields.size() == _memoryTracker.functionMemoryTracker.size()); for (size_t i = 0; i < _accumulatedFields.size(); i++) { - md[_accumulatedFields[i].fieldName] = _stats.usedDisk - ? Value(static_cast( - _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes)) - : Value(static_cast( - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes)); + md[_accumulatedFields[i].fieldName] = Value(static_cast( + _memoryTracker.functionMemoryTracker.at(_accumulatedFields[i].fieldName) + .maxMemoryBytes)); } out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue()); @@ -390,9 +389,8 @@ intrusive_ptr DocumentSourceGroup::create( groupStage->setIdExpression(groupByExpression); for (auto&& statement : accumulationStatements) { groupStage->addAccumulator(statement); + groupStage->_memoryTracker.functionMemoryTracker[statement.fieldName] = {0, 0}; } - groupStage->_memoryTracker.accumStatementMemoryBytes.resize(accumulationStatements.size(), - {0, 0}); return groupStage; } @@ -500,8 +498,8 @@ intrusive_ptr DocumentSourceGroup::createFromBson( AccumulationStatement::parseAccumulationStatement(expCtx.get(), groupField, vps)); } } - groupStage->_memoryTracker.accumStatementMemoryBytes.resize( - groupStage->getAccumulatedFields().size(), {0, 0}); + // groupStage->_memoryTracker.functionMemoryTracker.resize( + // groupStage->getAccumulatedFields().size(), {0, 0}); uassert( 15955, "a group specification must include an _id", !groupStage->_idExpressions.empty()); @@ -595,8 +593,8 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _doingMerge); _memoryTracker.memoryUsageBytes += group[i]->getMemUsage(); - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes += - group[i]->getMemUsage() - oldAccumMemUsage[i]; + _memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName].update( + group[i]->getMemUsage() - oldAccumMemUsage[i]); } if (kDebugBuild && !storageGlobalParams.readOnly) { @@ -700,11 +698,14 @@ shared_ptr::Iterator> DocumentSourceGroup::spill() { // Update the max memory consumption per accumulation statement if the previous max was exceeded // prior to spilling. Then zero out the current per-accumulation statement memory consumption, // as the memory has been freed by spilling. - for (size_t i = 0; i < _memoryTracker.accumStatementMemoryBytes.size(); i++) { - _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes = - std::max(_memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes, - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes); - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes = 0; + for (size_t i = 0; i < _memoryTracker.functionMemoryTracker.size(); i++) { + // _memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes = + // std::max(_memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName] + // .maxMemoryBytes, + // _memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName] + // .currentMemoryBytes); + _memoryTracker.functionMemoryTracker[_accumulatedFields[i].fieldName].currentMemoryBytes = + 0; } Sorter::Iterator* iteratorPtr = writer.done(); @@ -788,9 +789,8 @@ boost::optional DocumentSourceGroup::distr copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse( pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps); mergingGroup->addAccumulator(copiedAccumulatedField); + mergingGroup->_memoryTracker.functionMemoryTracker[accumulatedField.fieldName] = {0, 0}; } - mergingGroup->_memoryTracker.accumStatementMemoryBytes.resize(_accumulatedFields.size(), - {0, 0}); // {shardsStage, mergingStage, sortPattern} return DistributedPlanLogic{this, mergingGroup, boost::none}; Index: src/mongo/db/pipeline/document_source_group.h diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 67fcd9f6e9e4cb4ca078593e9a883aaefac8d409..70e384f74cf88062a4fed9e5e3edf38c7febc185 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/memory_usage_tracker.h" #include "mongo/db/pipeline/transformer_interface.h" #include "mongo/db/sorter/sorter.h" @@ -188,24 +189,6 @@ protected: void doDispose() final; private: - struct MemoryUsageTracker { - struct AccumStatementMemoryTracker { - // Maximum memory consumption thus far observed. Only updated when data is spilled to - // disk during execution of the $group. - uint64_t maxMemoryBytes; - // Tracks the current memory footprint. - uint64_t currentMemoryBytes; - }; - - const bool allowDiskUse; - const size_t maxMemoryUsageBytes; - - // Tracks current memory used. This variable will be reset if data is spilled to disk. - size_t memoryUsageBytes = 0; - // Tracks memory consumption per accumulation statement. - std::vector accumStatementMemoryBytes; - }; - explicit DocumentSourceGroup(const boost::intrusive_ptr& expCtx, boost::optional maxMemoryUsageBytes = boost::none); Index: src/mongo/db/pipeline/document_source_set_window_fields.cpp diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index c3321aea81655775cb3d13e7bd328f713cb67b10..3edd70e86afdc98649716c20bcf9f66c1feb4e07 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -207,7 +207,11 @@ list> document_source_set_window_fields::create( // $_internalSetWindowFields result.push_back(make_intrusive( - expCtx, simplePartitionByExpr, sortBy, outputFields)); + expCtx, + simplePartitionByExpr, + sortBy, + outputFields, + internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load())); // $unset if (complexPartitionBy) { @@ -235,7 +239,26 @@ Value DocumentSourceInternalSetWindowFields::serialize( } spec[SetWindowFieldsSpec::kOutputFieldName] = output.freezeToValue(); - return Value(DOC(kStageName << spec.freeze())); + MutableDocument out; + out[getSourceName()] = Value(spec.freeze()); + + if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { + MutableDocument md; + auto totalFunctionUsage = 0; + + for (auto&& [fieldName, function] : _executableOutputs) { + md[fieldName] = Value(static_cast( + _memoryTracker.functionMemoryTracker.at(fieldName).maxMemoryBytes)); + totalFunctionUsage += _memoryTracker.functionMemoryTracker.at(fieldName).maxMemoryBytes; + } + + out["maxFunctionMemoryUsageBytes"] = Value(md.freezeToValue()); + out["maxTotalMemoryUsageBytes"] = Value(static_cast( + std::max(_iterator.getApproximateSize(), _iterator.getMaxMemUsageBytes()) + + totalFunctionUsage)); + } + + return Value(out.freezeToValue()); } boost::intrusive_ptr DocumentSourceInternalSetWindowFields::createFromBson( @@ -267,13 +290,18 @@ boost::intrusive_ptr DocumentSourceInternalSetWindowFields::crea } return make_intrusive( - expCtx, partitionBy, sortBy, outputFields); + expCtx, + partitionBy, + sortBy, + outputFields, + internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load()); } void DocumentSourceInternalSetWindowFields::initialize() { - _maxMemory = internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load(); for (auto& wfs : _outputFields) { + _memoryTracker.functionMemoryTracker[wfs.fieldName] = {0, 0}; _executableOutputs[wfs.fieldName] = WindowFunctionExec::create(&_iterator, wfs); + _memoryTracker.memoryUsageBytes += _executableOutputs[wfs.fieldName]->getApproximateSize(); } _init = true; } @@ -295,13 +323,16 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() // Populate the output document with the result from each window function. MutableDocument addFieldsSpec; - size_t functionMemUsage = 0; for (auto&& [fieldName, function] : _executableOutputs) { + _memoryTracker.memoryUsageBytes -= function->getApproximateSize(); addFieldsSpec.addField(fieldName, function->getNext()); - functionMemUsage += function->getApproximateSize(); + _memoryTracker.functionMemoryTracker[fieldName].setCurrent(function->getApproximateSize()); + _memoryTracker.memoryUsageBytes += function->getApproximateSize(); uassert(5414201, "Exceeded memory limit in DocumentSourceSetWindowFields", - functionMemUsage + _iterator.getApproximateSize() < _maxMemory); + _memoryTracker.functionMemoryTracker[fieldName].currentMemoryBytes + + _iterator.getApproximateSize() < + _memoryTracker.maxMemoryUsageBytes); } // Advance the iterator and handle partition/EOF edge cases. Index: src/mongo/db/pipeline/document_source_set_window_fields.h diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h index 2c89bc572ea443506889807fc595625ce29af265..6845cb8a3c01c5da3457fcaa15f7d0e6fe0502b8 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.h +++ b/src/mongo/db/pipeline/document_source_set_window_fields.h @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_set_window_fields_gen.h" +#include "mongo/db/pipeline/memory_usage_tracker.h" #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/db/pipeline/window_function/window_bounds.h" #include "mongo/db/pipeline/window_function/window_function_exec.h" @@ -85,17 +86,18 @@ public: static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& pExpCtx); - DocumentSourceInternalSetWindowFields( const boost::intrusive_ptr& expCtx, boost::optional> partitionBy, const boost::optional& sortBy, - std::vector outputFields) + std::vector outputFields, + size_t maxMemoryBytes) : DocumentSource(kStageName, expCtx), _partitionBy(partitionBy), _sortBy(std::move(sortBy)), _outputFields(std::move(outputFields)), - _iterator(expCtx.get(), pSource, std::move(partitionBy)) {} + _iterator(expCtx.get(), pSource, std::move(partitionBy)), + _memoryTracker{false /* allowDiskUse */, maxMemoryBytes} {}; StageConstraints constraints(Pipeline::SplitState pipeState) const final { return StageConstraints(StreamType::kBlocking, @@ -135,9 +137,9 @@ private: std::vector _outputFields; PartitionIterator _iterator; StringMap> _executableOutputs; + MemoryUsageTracker _memoryTracker; bool _init = false; bool _eof = false; - size_t _maxMemory; }; } // namespace mongo Index: src/mongo/db/pipeline/memory_usage_tracker.h diff --git a/src/mongo/db/pipeline/memory_usage_tracker.h b/src/mongo/db/pipeline/memory_usage_tracker.h new file mode 100644 index 0000000000000000000000000000000000000000..8e09c43260a579c7e741b0a097b12763869b4fd1 --- /dev/null +++ b/src/mongo/db/pipeline/memory_usage_tracker.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include +#include + +#include "mongo/util/string_map.h" + +namespace mongo { + +struct MemoryUsageTracker { + struct PerFunctionMemoryTracker { + // Maximum memory consumption thus far observed. Only updated when data is spilled to + // disk during execution of the $group. + uint64_t maxMemoryBytes; + // Tracks the current memory footprint. + uint64_t currentMemoryBytes; + + void setCurrent(uint64_t newCurrent) { + if (newCurrent > maxMemoryBytes) + maxMemoryBytes = newCurrent; + currentMemoryBytes = newCurrent; + } + + void update(size_t diff) { + setCurrent(currentMemoryBytes + diff); + } + }; + + const bool allowDiskUse; + const size_t maxMemoryUsageBytes; + + // Tracks current memory used. This variable will be reset if data is spilled to disk. + size_t memoryUsageBytes = 0; + // Tracks memory consumption per function using the output field name as a key. + StringMap functionMemoryTracker; +}; + +} // namespace mongo Index: src/mongo/db/pipeline/window_function/partition_iterator.cpp diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp index 08d0b1f8e3038527a67a474691fbe9ca199bb954..c74f1d2b9df77a5d85a3c0d9e395ef2fc002e7d7 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp @@ -239,14 +239,14 @@ void PartitionIterator::getNextDocument() { advanceToNextPartition(); } else if (_expCtx->getValueComparator().compare(curKey, _partitionKey) != 0) { _nextPartition = NextPartitionState{std::move(doc), std::move(curKey)}; - _memUsageBytes += getNextPartitionStateSize(); + updateMemUsage(getNextPartitionStateSize()); _state = IteratorState::kAwaitingAdvanceToNext; } else { - _memUsageBytes += doc.getApproximateSize(); + updateMemUsage(doc.getApproximateSize()); _cache.emplace_back(std::move(doc)); } } else { - _memUsageBytes += doc.getApproximateSize(); + updateMemUsage(doc.getApproximateSize()); _cache.emplace_back(std::move(doc)); _state = IteratorState::kIntraPartition; } Index: src/mongo/db/pipeline/window_function/partition_iterator.h diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h index c2021dca75c43775d228a7a2b407348f08d4383b..18472317077b48d8b1a7008e81fe845149afd63d 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.h +++ b/src/mongo/db/pipeline/window_function/partition_iterator.h @@ -104,6 +104,10 @@ public: return _memUsageBytes; } + auto getMaxMemUsageBytes() const { + return _maxMemUsageBytes; + } + private: friend class PartitionAccessor; @@ -157,6 +161,7 @@ private: void resetCache() { _cache.clear(); // Everything should be empty at this point. + _maxMemUsageBytes = std::max(_maxMemUsageBytes, _memUsageBytes); _memUsageBytes = 0; _currentCacheIndex = 0; _currentPartitionIndex = 0; @@ -165,6 +170,12 @@ private: } } + void updateMemUsage(size_t diff) { + _memUsageBytes += diff; + if (_memUsageBytes > _maxMemUsageBytes) + _maxMemUsageBytes = _memUsageBytes; + } + /** * Resets the state of the iterator with the first document of the new partition. */ @@ -209,6 +220,7 @@ private: // The value in bytes of the data being held. Does not include the size of the constant size // data members or overhead of data structures. size_t _memUsageBytes = 0; + size_t _maxMemUsageBytes = 0; enum class IteratorState { // Default state, no documents have been pulled into the cache. Index: src/mongo/db/pipeline/window_function/window_function_exec.h diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h index 531a863df65494c6ba2d840daa7535ffc3df1510..4fbdf98745be6c2e3206ea75eab458f35461edd9 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec.h @@ -117,7 +117,6 @@ public: return _function->getApproximateSize() + _memUsageBytes; } - protected: boost::intrusive_ptr _input; std::unique_ptr _function; Index: src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h index dc8327368ab77f15914530a6da80ce5100206875..5c885b34af32b9feb7f33068798d18c33828f637 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h @@ -58,6 +58,7 @@ public: _function->reset(); _values = std::queue(); _initialized = false; + _memUsageBytes = sizeof(this); } private: