From 328223af88b55662ac7791042b1094af52a92217 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Thu, 7 Sep 2023 10:33:17 +0000 Subject: [PATCH] Repro SERVER-80853 --- .../noPassthrough/agg_out_stepdown_repro.js | 52 +++++++++++++++++++ src/mongo/db/pipeline/document_source_out.cpp | 24 +++++++++ src/mongo/db/pipeline/document_source_out.h | 20 +------ 3 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 jstests/noPassthrough/agg_out_stepdown_repro.js diff --git a/jstests/noPassthrough/agg_out_stepdown_repro.js b/jstests/noPassthrough/agg_out_stepdown_repro.js new file mode 100644 index 00000000000..843c2859f59 --- /dev/null +++ b/jstests/noPassthrough/agg_out_stepdown_repro.js @@ -0,0 +1,52 @@ +/** + * Repro SERVER-80853. + */ + +import {configureFailPoint} from "jstests/libs/fail_point_util.js"; + +var rst = new ReplSetTest({nodes: 2}); +rst.startSet(); +rst.initiate(); + +const replSetConn = new Mongo(rst.getURL()); +replSetConn.setReadPref("primary"); +let db = replSetConn.getDB('test'); + +// Insert a few large documents to the input collections. Documents are large in order to make them +// not fit in a single $out write batch (up to 16 MB). +let largeVal = 'a'.repeat(15 * 1024 * 1024); // 15 MB +db['foo'].insert({x: 1, a: largeVal}); +db['foo'].insert({x: 2, a: largeVal}); +db['foo'].insert({x: 3, a: largeVal}); + +// Set fp. +const nodeRunningAggConn = rst.getSecondary(); +let fp = configureFailPoint(nodeRunningAggConn, 'hangDollarOutAfterInsert'); + +// Start $out. Make it hang after writing the first batch (which consists of one single document). +const awaitAgg = startParallelShell(() => { + let testDb = db.getSiblingDB('test'); + testDb.setSecondaryOk(true); + const aggRes = testDb['foo'].aggregate([{$out: {db: 'test', coll: 'out'}}]).toArray(); + assert.eq(0, aggRes.length); +}, nodeRunningAggConn.port); + +// Wait fp to be hit. +fp.wait(); + +// Stepdown the primary. +let initialPrimary = rst.getPrimary(); +assert.commandWorked(initialPrimary.adminCommand({replSetStepDown: 60, force: true})); + +// Wait for a new primary to be elected. +const newPrimary = rst.getPrimary(); +assert.neq(newPrimary.port, initialPrimary.port); + +// Unblock the fp and wait for $out to finish (Note it succeeds). +fp.off(); +awaitAgg(); + +// Check the output collection documents. +assert.eq(3, replSetConn.getDB('test')['out'].countDocuments({})); + +rst.stopSet(); diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 3d99e7860b8..7c7bd0bb64c 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -71,6 +71,8 @@ using namespace fmt::literals; MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch); MONGO_FAIL_POINT_DEFINE(outWaitAfterTempCollectionCreation); +MONGO_FAIL_POINT_DEFINE(hangDollarOutAfterInsert); + REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::LiteParsed::parse, DocumentSourceOut::createFromBson, @@ -307,6 +309,28 @@ void DocumentSourceOut::finalize() { _timeseriesStateConsistent = true; } +void DocumentSourceOut::flush(BatchedCommandRequest bcr, BatchedObjects batch) { + DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); + + auto insertCommand = bcr.extractInsertRequest(); + insertCommand->setDocuments(std::move(batch)); + auto targetEpoch = boost::none; + + if (_timeseries) { + uassertStatusOK( + pExpCtx->mongoProcessInterface->insertTimeseries(pExpCtx, + _tempNs.getTimeseriesViewNamespace(), + std::move(insertCommand), + _writeConcern, + targetEpoch)); + } else { + uassertStatusOK(pExpCtx->mongoProcessInterface->insert( + pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); + } + + hangDollarOutAfterInsert.pauseWhileSet(); +} + BatchedCommandRequest DocumentSourceOut::makeBatchedWriteRequest() const { // Note that our insert targets '_tempNs' (or the associated timeseries view) since we will // never write to 'outputNs' directly. diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index e48a48ab9fb..58e3b782573 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -179,25 +179,7 @@ private: void finalize() override; - void flush(BatchedCommandRequest bcr, BatchedObjects batch) override { - DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - - auto insertCommand = bcr.extractInsertRequest(); - insertCommand->setDocuments(std::move(batch)); - auto targetEpoch = boost::none; - - if (_timeseries) { - uassertStatusOK(pExpCtx->mongoProcessInterface->insertTimeseries( - pExpCtx, - _tempNs.getTimeseriesViewNamespace(), - std::move(insertCommand), - _writeConcern, - targetEpoch)); - } else { - uassertStatusOK(pExpCtx->mongoProcessInterface->insert( - pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); - } - } + void flush(BatchedCommandRequest bcr, BatchedObjects batch) override; std::pair makeBatchObject(Document doc) const override { auto obj = doc.toBson(); -- 2.34.1