From 23ad4d1a34b12f23a309c40949ab5f983398c8d5 Mon Sep 17 00:00:00 2001 From: Gregory Wlodarek Date: Fri, 18 Aug 2017 15:59:27 -0400 Subject: [PATCH] Background Validation --- jstests/concurrency/fsm_all.js | 11 +- .../background_validation.js | 27 ++ jstests/core/background_validation.js | 69 ++++ jstests/hooks/validate_collections.js | 11 +- src/mongo/db/catalog/collection.h | 11 +- src/mongo/db/catalog/collection_impl.cpp | 122 ++++++- src/mongo/db/catalog/collection_impl.h | 8 + src/mongo/db/catalog/collection_mock.h | 4 + src/mongo/db/catalog/database_impl.cpp | 5 + src/mongo/db/catalog/index_consistency.cpp | 139 ++++---- src/mongo/db/catalog/index_consistency.h | 88 ++--- .../private/record_store_validate_adaptor.cpp | 377 ++++++++++++++++++--- .../private/record_store_validate_adaptor.h | 4 + src/mongo/db/commands/validate.cpp | 48 ++- src/mongo/shell/utils.js | 1 + 15 files changed, 737 insertions(+), 188 deletions(-) create mode 100644 jstests/concurrency/fsm_background_workloads/background_validation.js create mode 100644 jstests/core/background_validation.js diff --git a/jstests/concurrency/fsm_all.js b/jstests/concurrency/fsm_all.js index 35031becb8..7a4d48b77c 100644 --- a/jstests/concurrency/fsm_all.js +++ b/jstests/concurrency/fsm_all.js @@ -8,6 +8,11 @@ var blacklist = [].map(function(file) { return dir + '/' + file; }); -runWorkloadsSerially(ls(dir).filter(function(file) { - return !Array.contains(blacklist, file); -})); +runWorkloadsSerially( + ls(dir).filter(function(file) { + return !Array.contains(blacklist, file); + }), + {sameCollection: true}, + { + backgroundWorkloads: ['jstests/concurrency/fsm_background_workloads/background_validation.js'] + }); diff --git a/jstests/concurrency/fsm_background_workloads/background_validation.js b/jstests/concurrency/fsm_background_workloads/background_validation.js new file mode 100644 index 0000000000..547fccdedb --- /dev/null +++ b/jstests/concurrency/fsm_background_workloads/background_validation.js @@ -0,0 +1,27 @@ +'use strict'; + +/** + * background_validation.js + * + * Runs background validation against a standalone server. + * + */ + +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_background_workloads/background_base.js'); // for $config + +var $config = extendWorkload($config, function($config, $super) { + $config.teardown = function(db, collName, cluster) { + + var res = db[collName].validate({full: false, background: true}); + + if (res.ok) { + assertAlways.eq(true, res.valid, tojson(res)); + } + + $super.teardown.apply(this, arguments); + + }; + + return $config; +}); diff --git a/jstests/core/background_validation.js b/jstests/core/background_validation.js new file mode 100644 index 0000000000..6fcbdaf58c --- /dev/null +++ b/jstests/core/background_validation.js @@ -0,0 +1,69 @@ +(function() { + + "use strict"; + + const t = db.jstests_background_validation; + t.drop(); + + const s1 = startParallelShell(function() { + const t = db.jstests_background_validation; + Random.setRandomSeed(); + + function randomStr() { + var result = ""; + var chars = "abcdefghijklmnopqrstuvwxyz"; + + for (var i = 0; i < 25; i++) { + result += chars.charAt(Math.floor(Random.rand() * chars.length)); + } + + return result; + } + + for (var i = 0; i < 100; ++i) { + t.drop(); + var res = t.ensureIndex({x: -1}); + assert.eq(1, res.ok, tojson(res)); + + var res = t.ensureIndex({y: 1}); + assert.eq(1, res.ok, tojson(res)); + + var res = t.ensureIndex({z: 1}); + assert.eq(1, res.ok, tojson(res)); + + for (var j = 0; j < 50; ++j) { + // Insert a random document every iteration + t.save({x: j, y: randomStr(), z: [j, j * 3, j % 3, randomStr()]}); + + // Every 5 iterations remove a random document + if (j % 5 == 0) { + t.remove({x: Math.floor(Math.random() * j) + 1}); + } + + // Every 3 iterations update a random document + if (j % 3 == 0) { + t.update( + {x: Math.floor(Math.random() * j) + 1}, + {$set: {y: randomStr(), z: [randomStr(), randomStr(), j, randomStr()]}}); + } + } + } + }); + + const s2 = startParallelShell(function() { + const t = db.jstests_background_validation; + for (var i = 0; i < 800; ++i) { + var res = t.validate({full: false, background: true}); + + if (res.ok) { + assert.eq(true, res.valid); + } + sleep(50); + } + + }); + + s1(); + s2(); + +})(); diff --git a/jstests/hooks/validate_collections.js b/jstests/hooks/validate_collections.js index ac2382ae83..d2a1753457 100644 --- a/jstests/hooks/validate_collections.js +++ b/jstests/hooks/validate_collections.js @@ -34,12 +34,19 @@ function validateCollections(db, obj) { filter = {$or: [filter, {type: {$exists: false}}]}; } + var background = false; + if (jsTest.options().useBackgroundValidation) { + // Cannot run a full validation in background validation. + full = false; + background = true; + } + let collInfo = db.getCollectionInfos(filter); for (var collDocument of collInfo) { var coll = db.getCollection(collDocument["name"]); - var res = coll.validate(full); + var res = coll.validate({full: full, background: background}); - if (!res.ok || !res.valid) { + if ((!background && (!res.ok || !res.valid)) || (background && res.ok && !res.valid)) { if (jsTest.options().skipValidationOnNamespaceNotFound && res.errmsg === 'ns not found') { // During a 'stopStart' backup/restore on the secondary node, the actual list of diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 4b25757186..bdf1477077 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -344,6 +344,8 @@ public: const IndexDescriptor* descriptor, const IndexKeyEntry& indexEntry, const ValidationOperation operation) const = 0; + + virtual void stopBackgroundValidation() const = 0; }; private: @@ -738,7 +740,7 @@ public: } /** - * Calls the Inforn function in the IndexObserver if it's hooked. + * Calls the Inform function in the IndexObserver if it's hooked. */ inline void informIndexObserver(OperationContext* opCtx, const IndexDescriptor* descriptor, @@ -747,6 +749,13 @@ public: return this->_impl().informIndexObserver(opCtx, descriptor, indexEntry, operation); } + /** + * Stops background validation if it was running on the collection. + */ + inline void stopBackgroundValidation() const { + return this->_impl().stopBackgroundValidation(); + } + private: inline DatabaseCatalogEntry* dbce() const { return this->_impl().dbce(); diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 7333330c4c..b667e83176 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -74,6 +74,8 @@ #include "mongo/rpc/object_check.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + namespace mongo { @@ -440,6 +442,13 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, if (!loc.isOK()) return loc.getStatus(); + { + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->processRecordBeforeCursor(loc.getValue(), doc.objsize(), true); + } + } + for (auto&& indexBlock : indexBlocks) { Status status = indexBlock->insert(doc, loc.getValue()); if (!status.isOK()) { @@ -501,6 +510,13 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, invariant(RecordId::min() < loc); invariant(loc < RecordId::max()); + { + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->processRecordBeforeCursor(loc, it->doc.objsize(), true); + } + } + BsonRecord bsonRecord = {loc, &(it->doc)}; bsonRecords.push_back(bsonRecord); } @@ -573,6 +589,13 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, opDebug->keysDeleted += keysDeleted; } + { + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->processRecordBeforeCursor(loc, doc.value().objsize(), false); + } + } + _recordStore->deleteRecord(opCtx, loc); getGlobalServiceContext()->getOpObserver()->onDelete( @@ -716,6 +739,14 @@ StatusWith CollectionImpl::_updateDocumentWithMove(OperationContext* o return newLocation; } + { + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->processRecordBeforeCursor( + newLocation.getValue(), newDoc.objsize(), true); + } + } + invariant(newLocation.getValue() != oldLocation); _cursorManager.invalidateDocument(opCtx, oldLocation, INVALIDATION_DELETION); @@ -726,6 +757,14 @@ StatusWith CollectionImpl::_updateDocumentWithMove(OperationContext* o int64_t keysDeleted; _indexCatalog.unindexRecord(opCtx, oldDoc.value(), oldLocation, true, &keysDeleted); + { + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->processRecordBeforeCursor( + oldLocation, oldDoc.value().objsize(), false); + } + } + // Remove old record. _recordStore->deleteRecord(opCtx, oldLocation); @@ -1015,6 +1054,14 @@ void CollectionImpl::informIndexObserver(OperationContext* opCtx, } } +void CollectionImpl::stopBackgroundValidation() const { + + stdx::lock_guard lock(_indexConsistencyMutex); + if (_indexConsistency) { + _indexConsistency->stop(); + } +} + void CollectionImpl::hookIndexObserver(IndexConsistency* consistency) { stdx::lock_guard lock(_indexObserverMutex); _indexObserver = stdx::make_unique(consistency); @@ -1031,18 +1078,23 @@ using ValidateResultsMap = std::map; void _validateRecordStore(OperationContext* opCtx, RecordStore* recordStore, + IndexConsistency* indexConsistency, ValidateCmdLevel level, bool background, + NamespaceString nss, RecordStoreValidateAdaptor* indexValidator, ValidateResults* results, BSONObjBuilder* output) { + log() << "validating records in " << nss; + // Validate RecordStore and, if `level == kValidateFull`, use the RecordStore's validate // function. if (background) { indexValidator->traverseRecordStore(recordStore, level, results, output); } else { auto status = recordStore->validate(opCtx, level, indexValidator, results, output); + indexConsistency->nextStage(); // RecordStore::validate always returns Status::OK(). Errors are reported through // `results`. dassert(status.isOK()); @@ -1053,6 +1105,8 @@ void _validateIndexes(OperationContext* opCtx, IndexCatalog* indexCatalog, BSONObjBuilder* keysPerIndex, RecordStoreValidateAdaptor* indexValidator, + IndexConsistency* indexConsistency, + bool background, ValidateCmdLevel level, ValidateResultsMap* indexNsResultsMap, ValidateResults* results) { @@ -1076,7 +1130,8 @@ void _validateIndexes(OperationContext* opCtx, } if (curIndexResults.valid) { - indexValidator->traverseIndex(iam, descriptor, &curIndexResults, &numTraversedKeys); + indexValidator->traverseIndex( + iam, descriptor, background, &curIndexResults, &numTraversedKeys); if (checkCounts && (numValidatedKeys != numTraversedKeys)) { curIndexResults.valid = false; @@ -1098,6 +1153,8 @@ void _validateIndexes(OperationContext* opCtx, results->valid = false; } } + + indexConsistency->nextStage(); } void _markIndexEntriesInvalid(ValidateResultsMap* indexNsResultsMap, ValidateResults* results) { @@ -1117,6 +1174,7 @@ void _markIndexEntriesInvalid(ValidateResultsMap* indexNsResultsMap, ValidateRes void _validateIndexKeyCount(OperationContext* opCtx, IndexCatalog* indexCatalog, + IndexConsistency* indexConsistency, RecordStore* recordStore, RecordStoreValidateAdaptor* indexValidator, ValidateResultsMap* indexNsResultsMap) { @@ -1125,10 +1183,11 @@ void _validateIndexKeyCount(OperationContext* opCtx, while (indexIterator.more()) { IndexDescriptor* descriptor = indexIterator.next(); ValidateResults& curIndexResults = (*indexNsResultsMap)[descriptor->indexNamespace()]; + int indexNumber = indexConsistency->getIndexNumber(descriptor->indexNamespace()); if (curIndexResults.valid) { indexValidator->validateIndexKeyCount( - descriptor, recordStore->numRecords(opCtx), curIndexResults); + descriptor, indexConsistency->getNumRecordKeys(indexNumber), curIndexResults); } } } @@ -1188,19 +1247,46 @@ Status CollectionImpl::validate(OperationContext* opCtx, BSONObjBuilder* output) { dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); + // A validation on an oplog has to be done in MODE_X. + if (background && ns().isOplog()) { + log() << "not validating " << ns().toString() + << " in the background because it is the oplog"; + background = false; + } + + IndexConsistency indexConsistency( + opCtx, _this, ns(), _recordStore, std::move(collLk), background); + + { + stdx::lock_guard lock(_indexConsistencyMutex); + _indexConsistency = &indexConsistency; + } + + hookIndexObserver(&indexConsistency); + auto scopeGuard = MakeGuard([this] { + unhookIndexObserver(); + stdx::lock_guard lock(_indexConsistencyMutex); + _indexConsistency = nullptr; + }); + try { ValidateResultsMap indexNsResultsMap; BSONObjBuilder keysPerIndex; // not using subObjStart to be exception safe - IndexConsistency indexConsistency( - opCtx, _this, ns(), _recordStore, std::move(collLk), background); RecordStoreValidateAdaptor indexValidator = RecordStoreValidateAdaptor( - opCtx, &indexConsistency, level, &_indexCatalog, &indexNsResultsMap); - + opCtx, &indexConsistency, ns(), level, &_indexCatalog, &indexNsResultsMap); // Validate the record store log(LogComponent::kIndex) << "validating collection " << ns().toString() << endl; - _validateRecordStore( - opCtx, _recordStore, level, background, &indexValidator, results, output); + + _validateRecordStore(opCtx, + _recordStore, + &indexConsistency, + level, + background, + ns(), + &indexValidator, + results, + output); // Validate indexes and check for mismatches. if (results->valid) { @@ -1208,6 +1294,8 @@ Status CollectionImpl::validate(OperationContext* opCtx, &_indexCatalog, &keysPerIndex, &indexValidator, + &indexConsistency, + background, level, &indexNsResultsMap, results); @@ -1219,8 +1307,12 @@ Status CollectionImpl::validate(OperationContext* opCtx, // Validate index key count. if (results->valid) { - _validateIndexKeyCount( - opCtx, &_indexCatalog, _recordStore, &indexValidator, &indexNsResultsMap); + _validateIndexKeyCount(opCtx, + &_indexCatalog, + &indexConsistency, + _recordStore, + &indexValidator, + &indexNsResultsMap); } // Report the validation results for the user to see @@ -1234,7 +1326,15 @@ Status CollectionImpl::validate(OperationContext* opCtx, log(LogComponent::kIndex) << "validated collection " << ns().toString() << endl; } } catch (DBException& e) { - if (ErrorCodes::isInterruption(e.code())) { + if (ErrorCodes::NamespaceNotFound == e.code()) { + // Don't unhook the IndexObserver if the Namespace was not found because either the + // database or collection was dropped and CollectionImpl is non-existent anymore. + scopeGuard.Dismiss(); + return e.toStatus(); + } + + if (ErrorCodes::isInterruption(ErrorCodes::Error(e.code())) || + ErrorCodes::IndexModified == e.code()) { return e.toStatus(); } string err = str::stream() << "exception during index validation: " << e.toString(); diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 9e5ca69270..8c2f68c0e4 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -364,6 +364,11 @@ public: const IndexKeyEntry& indexEntry, const ValidationOperation operation) const; + /** + * Stops background validation if it was running on the collection. + */ + void stopBackgroundValidation() const; + private: inline DatabaseCatalogEntry* dbce() const final { @@ -435,6 +440,9 @@ private: mutable stdx::mutex _indexObserverMutex; mutable std::unique_ptr _indexObserver; + mutable stdx::mutex _indexConsistencyMutex; + mutable IndexConsistency* _indexConsistency = nullptr; + // The default collation which is applied to operations and indices which have no collation of // their own. The collection's validator will respect this collation. // diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index ad836035f1..77645be45f 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -284,6 +284,10 @@ public: std::abort(); } + void stopBackgroundValidation() const { + std::abort(); + } + OptionalCollectionUUID uuid() const { std::abort(); } diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index f4f8897436..da3c7621d3 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -170,6 +170,8 @@ void DatabaseImpl::close(OperationContext* opCtx, const std::string& reason) { for (auto&& pair : _collections) { auto* coll = pair.second; + // inform IndexConsistency to stop upon coming back. + coll->stopBackgroundValidation(); coll->getCursorManager()->invalidateAll(opCtx, true, reason); } } @@ -461,6 +463,9 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx, return Status::OK(); // Post condition already met. } + // If background validation is running, stop it. + collection->stopBackgroundValidation(); + massertNamespaceNotIndex(fullns.toString(), "dropCollection"); BackgroundOperation::assertNoBgOpInProgForNs(fullns); diff --git a/src/mongo/db/catalog/index_consistency.cpp b/src/mongo/db/catalog/index_consistency.cpp index 40a2065672..c978506589 100644 --- a/src/mongo/db/catalog/index_consistency.cpp +++ b/src/mongo/db/catalog/index_consistency.cpp @@ -28,9 +28,12 @@ #include +#include "mongo/db/catalog/private/record_store_validate_adaptor.h" + #include "mongo/platform/basic.h" #include "mongo/db/catalog/collection_catalog_entry.h" +#include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_consistency.h" @@ -61,6 +64,7 @@ IndexConsistency::IndexConsistency(OperationContext* opCtx, _recordStore(recordStore), _collLk(std::move(collLk)), _isBackground(background), + _uuid(collection->uuid()), _tracker(opCtx->getServiceContext()->getFastClockSource(), internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())) { @@ -88,8 +92,7 @@ IndexConsistency::IndexConsistency(OperationContext* opCtx, indexInfo.numKeys = 0; indexInfo.numLongKeys = 0; - indexInfo.numRecords = 0; - indexInfo.numExtraIndexKeys = 0; + indexInfo.numRecordKeys = 0; _indexesInfo[indexNumber] = indexInfo; @@ -144,7 +147,7 @@ void IndexConsistency::addLongIndexKey(int indexNumber) { return; } - _indexesInfo[indexNumber].numRecords++; + _indexesInfo[indexNumber].numRecordKeys++; _indexesInfo[indexNumber].numLongKeys++; } @@ -168,36 +171,47 @@ int64_t IndexConsistency::getNumLongKeys(int indexNumber) const { return _indexesInfo.at(indexNumber).numLongKeys; } -int64_t IndexConsistency::getNumRecords(int indexNumber) const { +int64_t IndexConsistency::getNumRecordKeys(int indexNumber) const { stdx::lock_guard lock(_classMutex); if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) { return 0; } - return _indexesInfo.at(indexNumber).numRecords; + return _indexesInfo.at(indexNumber).numRecordKeys; } -bool IndexConsistency::haveEntryMismatch() const { +int64_t IndexConsistency::getNumRecordChangesBeforeCursor(int64_t* totalDataSize) { stdx::lock_guard lock(_classMutex); - for (auto iterator = _indexKeyCount.begin(); iterator != _indexKeyCount.end(); iterator++) { - if (iterator->second != 0) { - return true; + *totalDataSize = *totalDataSize + _dataSizeChanges; + return _numRecordChanges; +} + +void IndexConsistency::processRecordBeforeCursor(const RecordId& loc, int size, bool increment) { + + stdx::lock_guard lock(_classMutex); + if (_isBeforeLastProcessedRecordId_inlock(loc)) { + if (increment) { + _numRecordChanges += 1; + _dataSizeChanges += size; + } else { + _numRecordChanges -= 1; + _dataSizeChanges -= size; } } - - return false; } -int64_t IndexConsistency::getNumExtraIndexKeys(int indexNumber) const { +bool IndexConsistency::haveEntryMismatch() const { stdx::lock_guard lock(_classMutex); - if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) { - return 0; + for (auto iterator = _indexKeyCount.begin(); iterator != _indexKeyCount.end(); iterator++) { + if (iterator->second != 0) { + return true; + } } - return _indexesInfo.at(indexNumber).numExtraIndexKeys; + return false; } void IndexConsistency::applyChange(const IndexDescriptor* descriptor, @@ -224,14 +238,14 @@ void IndexConsistency::applyChange(const IndexDescriptor* descriptor, KeyString ks(version, indexEntry->key, ord, indexEntry->loc); if (_stage == ValidationStage::DOCUMENT) { - _setYieldAtRecord_inlock(indexEntry->loc); + _setYieldAtRecordId_inlock(indexEntry->loc); if (_isBeforeLastProcessedRecordId_inlock(indexEntry->loc)) { if (operation == ValidationOperation::INSERT) { if (indexEntry->key.objsize() >= static_cast(KeyString::TypeBits::kMaxKeyBytes)) { // Index keys >= 1024 bytes are not indexed but are stored in the document key // set. - _indexesInfo[indexNumber].numRecords++; + _indexesInfo[indexNumber].numRecordKeys++; _indexesInfo[indexNumber].numLongKeys++; } else { _addDocKey_inlock(ks, indexNumber); @@ -239,7 +253,7 @@ void IndexConsistency::applyChange(const IndexDescriptor* descriptor, } else if (operation == ValidationOperation::REMOVE) { if (indexEntry->key.objsize() >= static_cast(KeyString::TypeBits::kMaxKeyBytes)) { - _indexesInfo[indexNumber].numRecords--; + _indexesInfo[indexNumber].numRecordKeys--; _indexesInfo[indexNumber].numLongKeys--; } else { _removeDocKey_inlock(ks, indexNumber); @@ -269,16 +283,13 @@ void IndexConsistency::applyChange(const IndexDescriptor* descriptor, // and an event occured after our cursor if (operation == ValidationOperation::INSERT) { _removeIndexKey_inlock(ks, indexNumber); - _indexesInfo.at(indexNumber).numExtraIndexKeys++; } else if (operation == ValidationOperation::REMOVE) { _addIndexKey_inlock(ks, indexNumber); - _indexesInfo.at(indexNumber).numExtraIndexKeys--; } } } } - void IndexConsistency::nextStage() { stdx::lock_guard lock(_classMutex); @@ -295,32 +306,6 @@ ValidationStage IndexConsistency::getStage() const { return _stage; } -void IndexConsistency::setLastProcessedRecordId(RecordId recordId) { - - stdx::lock_guard lock(_classMutex); - if (!recordId.isNormal()) { - _lastProcessedRecordId = boost::none; - } else { - _lastProcessedRecordId = recordId; - } -} - -void IndexConsistency::setLastProcessedIndexEntry( - const IndexDescriptor& descriptor, const boost::optional& indexEntry) { - - const auto& key = descriptor.keyPattern(); - const Ordering ord = Ordering::make(key); - KeyString::Version version = KeyString::kLatestVersion; - - stdx::lock_guard lock(_classMutex); - if (!indexEntry) { - _lastProcessedIndexEntry.reset(); - } else { - _lastProcessedIndexEntry.reset( - new KeyString(version, indexEntry->key, ord, indexEntry->loc)); - } -} - void IndexConsistency::notifyStartIndex(int indexNumber) { stdx::lock_guard lock(_classMutex); @@ -354,24 +339,30 @@ int IndexConsistency::getIndexNumber(const std::string& indexNs) { return -1; } -bool IndexConsistency::shouldGetNewSnapshot(const RecordId recordId) const { +bool IndexConsistency::shouldGetNext(const RecordId recordId) { stdx::lock_guard lock(_classMutex); - if (!_yieldAtRecordId) { + if (_yieldAtRecordId && _yieldAtRecordId <= recordId) { return false; } - return _yieldAtRecordId <= recordId; + _lastProcessedRecordId = recordId; + + return true; } -bool IndexConsistency::shouldGetNewSnapshot(const KeyString& keyString) const { +bool IndexConsistency::shouldGetNext(const KeyString& keyString) { stdx::lock_guard lock(_classMutex); - if (!_yieldAtIndexEntry) { + if (_yieldAtIndexEntry && *_yieldAtIndexEntry <= keyString) { return false; } - return *_yieldAtIndexEntry <= keyString; + KeyString::Version version = KeyString::kLatestVersion; + _lastProcessedIndexEntry.reset(new KeyString(version)); + _lastProcessedIndexEntry->resetFromBuffer(keyString.getBuffer(), keyString.getSize()); + + return true; } void IndexConsistency::relockCollectionWithMode(LockMode mode) { @@ -380,11 +371,11 @@ void IndexConsistency::relockCollectionWithMode(LockMode mode) { _collLk.reset(new Lock::CollectionLock(_opCtx->lockState(), _nss.toString(), mode)); invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), mode)); + // Ensure it is safe to continue. + uassertStatusOK(_canProceed()); + // Check if the operation was killed. _opCtx->checkForInterrupt(); - - // Ensure it is safe to continue. - uassertStatusOK(_throwExceptionIfError()); } bool IndexConsistency::scanLimitHit() { @@ -406,14 +397,20 @@ void IndexConsistency::yield() { QueryYield::yieldAllLocks(_opCtx, nullptr, _nss); lock.lock(); + // Ensure it is safe to continue. + uassertStatusOK(_canProceed()); + // Check if the operation was killed. _opCtx->checkForInterrupt(); _yieldAtRecordId = boost::none; _yieldAtIndexEntry.reset(); +} - // Ensure it is safe to continue. - uassertStatusOK(_throwExceptionIfError()); +void IndexConsistency::stop() { + + stdx::unique_lock lock(_classMutex); + _shouldStopValidation = true; } void IndexConsistency::_addDocKey_inlock(const KeyString& ks, int indexNumber) { @@ -425,7 +422,7 @@ void IndexConsistency::_addDocKey_inlock(const KeyString& ks, int indexNumber) { const uint32_t hash = _hashKeyString(ks, indexNumber); _indexKeyCount[hash]++; - _indexesInfo.at(indexNumber).numRecords++; + _indexesInfo.at(indexNumber).numRecordKeys++; } void IndexConsistency::_removeDocKey_inlock(const KeyString& ks, int indexNumber) { @@ -437,7 +434,7 @@ void IndexConsistency::_removeDocKey_inlock(const KeyString& ks, int indexNumber const uint32_t hash = _hashKeyString(ks, indexNumber); _indexKeyCount[hash]--; - _indexesInfo.at(indexNumber).numRecords--; + _indexesInfo.at(indexNumber).numRecordKeys--; } void IndexConsistency::_addIndexKey_inlock(const KeyString& ks, int indexNumber) { @@ -474,7 +471,7 @@ bool IndexConsistency::_isIndexScanning_inlock(int indexNumber) const { return indexNumber == _currentIndex; } -void IndexConsistency::_setYieldAtRecord_inlock(const RecordId recordId) { +void IndexConsistency::_setYieldAtRecordId_inlock(const RecordId recordId) { if (_isBeforeLastProcessedRecordId_inlock(recordId)) { return; @@ -525,7 +522,11 @@ uint32_t IndexConsistency::_hashKeyString(const KeyString& ks, int indexNumber) return indexNsHash % (1U << 22); } -Status IndexConsistency::_throwExceptionIfError() { +Status IndexConsistency::_canProceed() { + + if (_shouldStopValidation) { + return Status(ErrorCodes::NamespaceNotFound, "Background validation was interrupted"); + } Database* database = dbHolder().get(_opCtx, _nss.db()); @@ -543,6 +544,20 @@ Status IndexConsistency::_throwExceptionIfError() { "The collection was dropped during background validation"); } + OptionalCollectionUUID uuid = collection->uuid(); + + // The UUID's don't match, so the collection was dropped. + if (uuid && _uuid && (uuid != _uuid)) { + return Status(ErrorCodes::NamespaceNotFound, + "The collection was dropped during background validation"); + } + + // If only one variable has a UUID and the other doesn't the collection was dropped. + if ((uuid && !_uuid) || (!uuid && _uuid)) { + return Status(ErrorCodes::NamespaceNotFound, + "The collection was dropped during background validation"); + } + // Ensure no indexes were removed or added. IndexCatalog* indexCatalog = collection->getIndexCatalog(); IndexCatalog::IndexIterator indexIterator = indexCatalog->getIndexIterator(_opCtx, false); diff --git a/src/mongo/db/catalog/index_consistency.h b/src/mongo/db/catalog/index_consistency.h index 3768bd7be0..30b48ce3d0 100644 --- a/src/mongo/db/catalog/index_consistency.h +++ b/src/mongo/db/catalog/index_consistency.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/db/catalog/collection_options.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/storage/key_string.h" @@ -49,7 +50,7 @@ enum class ValidationStage { DOCUMENT, INDEX, NONE }; * The `UPDATE` operation can be seen as two independent operations (`REMOVE` operation followed * by an `INSERT` operation). */ -enum class ValidationOperation { INSERT, REMOVE }; +enum class ValidationOperation { INSERT, REMOVE, NONE }; /** * The IndexConsistency class is used to keep track of the index consistency. @@ -75,10 +76,7 @@ struct IndexInfo { int64_t numLongKeys; // The number of records that have a key in their document that referenced back to the // this index - int64_t numRecords; - // Keeps track of how many indexes were removed (-1) and added (+1) after the - // point of validity was set for this index. - int64_t numExtraIndexKeys; + int64_t numRecordKeys; }; class IndexConsistency final { @@ -118,22 +116,25 @@ public: /** * Return the number of records with keys for the given `indexNs`. */ - int64_t getNumRecords(int indexNumber) const; + int64_t getNumRecordKeys(int indexNumber) const; /** - * Returns true if any value in the `_indexKeyCount` map is not equal to 0, otherwise - * return false. + * Return the number of inserted and removed records before the cursor that happened + * during the collection scan. */ - bool haveEntryMismatch() const; + int64_t getNumRecordChangesBeforeCursor(int64_t* totalDataSize); + + /** + * Processes a record that was inserted or removed before the cursor during + * the collection scan. + */ + void processRecordBeforeCursor(const RecordId& loc, int size, bool increment); /** - * Index entries may be added or removed by concurrent writes during the index scan phase, - * after establishing the point of validity. We need to account for these additions and - * removals so that when we validate the index key count, we also have a pre-image of the - * index counts and won't get incorrect results because of the extra index entries we may or - * may not have scanned. + * Returns true if any value in the `_indexKeyCount` map is not equal to 0, otherwise + * return false. */ - int64_t getNumExtraIndexKeys(int indexNumber) const; + bool haveEntryMismatch() const; /** * This is the entry point for the IndexObserver to apply its observed changes @@ -182,17 +183,6 @@ public: ValidationStage getStage() const; /** - * Sets `_lastProcessedRecordId` to `recordId`. - */ - void setLastProcessedRecordId(RecordId recordId); - - /** - * Sets `_lastProcessedIndexEntry` to the KeyString of `indexEntry`. - */ - void setLastProcessedIndexEntry(const IndexDescriptor& descriptor, - const boost::optional& indexEntry); - - /** * Informs the IndexConsistency instance that the index scan is beginning to scan the index * with namespace `indexNs`. This gives us a chance to clean up after the previous index and * setup for the new index. @@ -212,20 +202,26 @@ public: int getIndexNumber(const std::string& indexNs); /** - * Returns true if a new snapshot should be accquired. - * If the `recordId` is equal to or greater than `_yieldAtRecordId` then we must get - * a new snapshot otherwise we will use stale data. - * Otherwise we do not need a new snapshot and can continue with the collection scan. + * Returns false if a new snapshot should be accquired. + * If the `recordId` is equal to or greater than `_yieldAtRecordId` then + * a new snapshot must be acquired otherwise stale data will be used. + * + * If a new snapshot doesn't need to be acquired, the `_lastProcessedRecordId` is set to + * `recordId` and returns true, informing that the caller should proceed their cursor. */ - bool shouldGetNewSnapshot(const RecordId recordId) const; + bool shouldGetNext(const RecordId recordId); /** - * Returns true if a new snapshot should be accquired. - * If the `keyString` is equal to or greater than `_yieldAtIndexEntry` then we must get - * a new snapshot otherwise we will use stale data. - * Otherwise we do not need a new snapshot and can continue with the index scan. + * Returns false if a new snapshot should be accquired. + * If the `keyString` is equal to or greater than `_yieldAtIndexEntry` then + * a new snapshot must be acquired otherwise stale data will be used. + * + * If a new snapshot doesn't need to be acquired, the `_lastProcessedIndexEntry` is set to + * `keyString` and returns true, informing that the caller should proceed their cursor. */ - bool shouldGetNewSnapshot(const KeyString& keyString) const; + bool shouldGetNext(const KeyString& keyString); + + ValidationOperation getYieldOperation(); /** * Gives up the lock that the collection is currently held in and requests the @@ -244,6 +240,11 @@ public: */ void yield(); + /** + * Stops the validation by throwing a uassert + */ + void stop(); + private: OperationContext* _opCtx; Collection* _collection; @@ -251,6 +252,7 @@ private: const RecordStore* _recordStore; std::unique_ptr _collLk; const bool _isBackground; + OptionalCollectionUUID _uuid; ElapsedTracker _tracker; // We map the hashed KeyString values to a bucket which contain the count of how many @@ -262,7 +264,7 @@ private: // are too few index entries. // - If the count is < 0 in the bucket at the end of the validation pass, then there // are too many index entries. - std::map _indexKeyCount; + std::map _indexKeyCount; // Contains the corresponding index number for each index namespace std::map _indexNumber; @@ -273,6 +275,11 @@ private: // RecordId of the last processed document during the collection scan. boost::optional _lastProcessedRecordId = boost::none; + // The number of records that were deleted and added before the cursor during the + // collection scan. + int64_t _numRecordChanges = 0; + int64_t _dataSizeChanges = 0; + // KeyString of the last processed index entry during the index scan. std::unique_ptr _lastProcessedIndexEntry = nullptr; @@ -297,6 +304,9 @@ private: // Only one thread can use the class at a time mutable stdx::mutex _classMutex; + // If true, stops validation when `_canProceed()` is called. + bool _shouldStopValidation = false; + /** * Given the document's key KeyString, increment the corresponding `_indexKeyCount` * by hashing it. @@ -338,7 +348,7 @@ private: * scan we must yield before processing the record. This is a preventive measure so the * collection scan doesn't scan stale records. */ - void _setYieldAtRecord_inlock(const RecordId recordId); + void _setYieldAtRecordId_inlock(const RecordId recordId); /** * Allows the IndexObserver to set a yield point at the KeyString of `indexEntry` so that @@ -373,7 +383,7 @@ private: * 3) An index was added or removed in the collection being validated. * 4) The operation was killed. */ - Status _throwExceptionIfError(); + Status _canProceed(); }; // IndexConsistency } // namespace mongo diff --git a/src/mongo/db/catalog/private/record_store_validate_adaptor.cpp b/src/mongo/db/catalog/private/record_store_validate_adaptor.cpp index cc52362502..e82345ca99 100644 --- a/src/mongo/db/catalog/private/record_store_validate_adaptor.cpp +++ b/src/mongo/db/catalog/private/record_store_validate_adaptor.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_consistency.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/matcher/expression.h" @@ -42,10 +43,70 @@ #include "mongo/db/storage/key_string.h" #include "mongo/db/storage/record_store.h" #include "mongo/rpc/object_check.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { +namespace { + +void processIndexEntry(IndexConsistency* indexConsistency, + const KeyString* indexKeyString, + const KeyString* prevIndexKeyString, + bool isFirstEntry, + int64_t* numKeys, + int64_t indexNumber, + ValidateResults* results) { + + // Ensure that the index entries are in increasing or decreasing order. + if (!isFirstEntry && *indexKeyString < *prevIndexKeyString) { + if (results->valid) { + results->errors.push_back( + "one or more indexes are not in strictly ascending or descending " + "order"); + } + results->valid = false; + } + + indexConsistency->addIndexKey(*indexKeyString, indexNumber); + (*numKeys)++; +} + +void processRecord(RecordStoreValidateAdaptor* adaptor, + const boost::optional& curRecord, + const RecordId& prevRecordId, + int64_t* nrecords, + int64_t* dataSizeTotal, + int64_t* nInvalid, + ValidateResults* results) { + + ++(*nrecords); + + auto dataSize = curRecord->data.size(); + (*dataSizeTotal) += dataSize; + size_t validatedSize; + Status status = adaptor->validate(curRecord->id, curRecord->data, &validatedSize); + + // Checks to ensure isInRecordIdOrder() is being used properly. + if (prevRecordId.isNormal()) { + invariant(prevRecordId < curRecord->id); + } + + // While some storage engines, such as MMAPv1, may use padding, we still require + // that they return the unpadded record data. + if (!status.isOK() || validatedSize != static_cast(dataSize)) { + if (results->valid) { + // Only log once. + results->errors.push_back("detected one or more invalid documents (see logs)"); + } + (*nInvalid)++; + results->valid = false; + log() << "document at location: " << curRecord->id << " is corrupted"; + } +} +} // namespace + Status RecordStoreValidateAdaptor::validate(const RecordId& recordId, const RecordData& record, size_t* dataSize) { @@ -119,43 +180,195 @@ Status RecordStoreValidateAdaptor::validate(const RecordId& recordId, void RecordStoreValidateAdaptor::traverseIndex(const IndexAccessMethod* iam, const IndexDescriptor* descriptor, + bool background, ValidateResults* results, int64_t* numTraversedKeys) { + + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), LockMode::MODE_X)); + auto indexNs = descriptor->indexNamespace(); int indexNumber = _indexConsistency->getIndexNumber(indexNs); int64_t numKeys = 0; + if (background) { + // Notify the IndexConsistency instance that we're starting to scan a new index. + _indexConsistency->notifyStartIndex(indexNumber); + + // Yield to get the latest snapshot before we begin scanning through the index. + _indexConsistency->yield(); + } + const auto& key = descriptor->keyPattern(); const Ordering ord = Ordering::make(key); KeyString::Version version = KeyString::kLatestVersion; std::unique_ptr prevIndexKeyString = nullptr; bool isFirstEntry = true; + int interruptInterval = 4096; + boost::optional currentIndexEntry = boost::none; + std::unique_ptr cursor = iam->newCursor(_opCtx, true); - // Seeking to BSONObj() is equivalent to seeking to the first entry of an index. - for (auto indexEntry = cursor->seek(BSONObj(), true); indexEntry; indexEntry = cursor->next()) { - - // We want to use the latest version of KeyString here. - std::unique_ptr indexKeyString = - stdx::make_unique(version, indexEntry->key, ord, indexEntry->loc); - // Ensure that the index entries are in increasing or decreasing order. - if (!isFirstEntry && *indexKeyString < *prevIndexKeyString) { - if (results->valid) { - results->errors.push_back( - "one or more indexes are not in strictly ascending or descending " - "order"); + std::unique_ptr lookAheadCursor = iam->newCursor(_opCtx, true); + + if (background) { + // Allow concurrent read/writes to happen. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_IX); + } + + // `shouldYield` will be true if we hit the periodic yield point to allow other operations + // requiring locks to run. + bool shouldYield = false; + + bool isEOF = false; + while (!isEOF) { + + std::unique_ptr indexKeyString = nullptr; + + // `consumeAndReposition` will move the `cursor` forward once we get a new snapshot and + // move the `lookAheadCursor` back to where `cursor` is. After that, we will process the + // index entry. + bool consumeAndReposition = false; + if (!shouldYield) { + + boost::optional lookAheadIndexEntry = boost::none; + if (isFirstEntry) { + // Seeking to BSONObj() is equivalent to seeking to the first entry of an index. + lookAheadIndexEntry = lookAheadCursor->seek(BSONObj(), true); + } else { + lookAheadIndexEntry = lookAheadCursor->next(); + } + + if (!lookAheadIndexEntry) { + isEOF = true; + + if (!background) { + // Non-background validation exits here. + continue; + } + } else { + + // We want to use the latest version of KeyString here. + indexKeyString.reset(new KeyString( + version, lookAheadIndexEntry->key, ord, lookAheadIndexEntry->loc)); + + // Checks to see if we should get a new snapshot or we can continue using this + // snapshot because of our cursor placement. + bool shouldGetNext = _indexConsistency->shouldGetNext(*indexKeyString); + if (!shouldGetNext) { + shouldYield = true; + consumeAndReposition = true; + } else { + // The look ahead cursor saw something, so we can safely move up our cursor up. + if (isFirstEntry) { + // Seeking to BSONObj() is equivalent to seeking to the first entry of an + // index. + currentIndexEntry = cursor->seek(BSONObj(), true); + } else { + currentIndexEntry = cursor->next(); + } + } + } + } + + if (background && (isEOF || shouldYield)) { + + // Switch to MODE_X to prohibit concurrency. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_X); + + // Save the cursor as we'll be abandoning the snapshot. + cursor->save(); + lookAheadCursor->save(); + + // We yield to get a new snapshot to be able to gather any records that may + // have been added while we were waiting for get the MODE_X lock. + _indexConsistency->yield(); + + // Restore the cursor in the new snapshot. + // If the `prevRecordId` that the cursor is pointing to gets removed in the + // new snapshot, the call to next() will return the next closest position. + cursor->restore(); + lookAheadCursor->restore(); + + if (consumeAndReposition) { + + currentIndexEntry = cursor->next(); + if (!currentIndexEntry) { + isEOF = true; + } else { + indexKeyString.reset(new KeyString( + version, currentIndexEntry->key, ord, currentIndexEntry->loc)); + + // This will always return true as we just yielded and are in MODE_X. + _indexConsistency->shouldGetNext(*indexKeyString); + + boost::optional lookAheadIndexEntry = + lookAheadCursor->seekExact(currentIndexEntry->key); + while (currentIndexEntry->loc != lookAheadIndexEntry->loc) { + lookAheadIndexEntry = lookAheadCursor->next(); + } + } + } + + if (isEOF) { + continue; + } + + // Switch back to a MODE_IX lock to allow concurrency. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_IX); + + if (shouldYield && !consumeAndReposition) { + shouldYield = false; + continue; } - results->valid = false; } - _indexConsistency->addIndexKey(*indexKeyString, indexNumber); + // Process the current index entry and prepare for the next iteration. + processIndexEntry(_indexConsistency, + indexKeyString.get(), + prevIndexKeyString.get(), + isFirstEntry, + &numKeys, + indexNumber, + results); - numKeys++; isFirstEntry = false; prevIndexKeyString.swap(indexKeyString); + + if (!(numKeys % interruptInterval)) { + _opCtx->checkForInterrupt(); + } + + // Called last to ensure we don't end up in the loop forever. + shouldYield = _indexConsistency->scanLimitHit(); } - *numTraversedKeys = numKeys; + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), LockMode::MODE_X)); + + while (((isFirstEntry) ? currentIndexEntry = cursor->seek(BSONObj(), true) + : currentIndexEntry = cursor->next())) { + + std::unique_ptr indexKeyString = nullptr; + indexKeyString.reset( + new KeyString(version, currentIndexEntry->key, ord, currentIndexEntry->loc)); + + // Process the current index entry and prepare for the next iteration. + processIndexEntry(_indexConsistency, + indexKeyString.get(), + prevIndexKeyString.get(), + isFirstEntry, + &numKeys, + indexNumber, + results); + + isFirstEntry = false; + prevIndexKeyString.swap(indexKeyString); + } + + *numTraversedKeys = _indexConsistency->getNumKeys(indexNumber); + + if (background) { + _indexConsistency->notifyDoneIndex(indexNumber); + } } void RecordStoreValidateAdaptor::traverseRecordStore(RecordStore* recordStore, @@ -163,53 +376,131 @@ void RecordStoreValidateAdaptor::traverseRecordStore(RecordStore* recordStore, ValidateResults* results, BSONObjBuilder* output) { - long long nrecords = 0; - long long dataSizeTotal = 0; - long long nInvalid = 0; + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), LockMode::MODE_X)); + + int64_t nrecords = 0; + int64_t dataSizeTotal = 0; + int64_t nInvalid = 0; results->valid = true; std::unique_ptr cursor = recordStore->getCursor(_opCtx, true); + std::unique_ptr lookAheadCursor = recordStore->getCursor(_opCtx, true); int interruptInterval = 4096; + + boost::optional curRecord = boost::none; RecordId prevRecordId; - while (auto record = cursor->next()) { - ++nrecords; + // Allow concurrent read/writes to happen. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_IX); + + // `shouldYield` will be true if we hit the periodic yield point to allow other operations + // requiring locks to run. + bool shouldYield = false; + + bool isEOF = false; + while (!isEOF) { + + // `consumeAndReposition` will move the `cursor` forward once we get a new snapshot and + // move the `lookAheadCursor` back to where `cursor` is. After that, we will process the + // record. + bool consumeAndReposition = false; + if (!shouldYield) { + boost::optional lookAheadRecord = lookAheadCursor->next(); + if (!lookAheadRecord) { + isEOF = true; + } else { + + // Checks to see if we should get a new snapshot or we can continue using this + // snapshot because of our cursor placement. + bool shouldGetNext = _indexConsistency->shouldGetNext(lookAheadRecord->id); + if (!shouldGetNext) { + shouldYield = true; + consumeAndReposition = true; + } else { + // The look ahead cursor saw something, so we can safely move up our cursor up. + curRecord = cursor->next(); + } + } + } + + if (isEOF || shouldYield) { + + // Switch to MODE_X to prohibit concurrency. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_X); + + // Save the cursor as we'll be abandoning the snapshot. + cursor->save(); + lookAheadCursor->save(); + + // We yield to get a new snapshot to be able to gather any records that may + // have been added while we were waiting for get the MODE_X lock. + _indexConsistency->yield(); + + // Restore the cursor in the new snapshot. + // If the `prevRecordId` that the cursor is pointing to gets removed in the + // new snapshot, the call to next() will return the next closest position. + // If the cursor returns false, then the colletion object has been changed. + bool cursorRestore = cursor->restore(); + uassert(40614, "Background validation was interrupted", cursorRestore); + bool lookAheadCursorRestore = lookAheadCursor->restore(); + uassert(40615, "Background validation was interrupted", lookAheadCursorRestore); + + if (consumeAndReposition) { + curRecord = cursor->next(); + if (!curRecord) { + isEOF = true; + } else { + // This will always return true as we just yielded and are in MODE_X. + _indexConsistency->shouldGetNext(curRecord->id); + + lookAheadCursor->seekExact(curRecord->id); + } + } + + if (isEOF) { + continue; + } + + // Switch back to a MODE_IX lock to allow concurrency. + _indexConsistency->relockCollectionWithMode(LockMode::MODE_IX); + + if (shouldYield && !consumeAndReposition) { + shouldYield = false; + continue; + } + } + + // Process the current record and prepare for the next iteration. + processRecord(this, curRecord, prevRecordId, &nrecords, &dataSizeTotal, &nInvalid, results); + prevRecordId = curRecord->id; if (!(nrecords % interruptInterval)) { _opCtx->checkForInterrupt(); } - auto dataSize = record->data.size(); - dataSizeTotal += dataSize; - size_t validatedSize; - Status status = validate(record->id, record->data, &validatedSize); + // Called last to ensure we don't end up in the loop forever. + shouldYield = _indexConsistency->scanLimitHit(); + } - // Checks to ensure isInRecordIdOrder() is being used properly. - if (prevRecordId.isNormal()) { - invariant(prevRecordId < record->id); - } + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), LockMode::MODE_X)); - // While some storage engines, such as MMAPv1, may use padding, we still require - // that they return the unpadded record data. - if (!status.isOK() || validatedSize != static_cast(dataSize)) { - if (results->valid) { - // Only log once. - results->errors.push_back("detected one or more invalid documents (see logs)"); - } - nInvalid++; - results->valid = false; - log() << "document at location: " << record->id << " is corrupted"; - } + while ((curRecord = cursor->next())) { - prevRecordId = record->id; + // Process the current record and prepare for the next iteration. + processRecord(this, curRecord, prevRecordId, &nrecords, &dataSizeTotal, &nInvalid, results); + prevRecordId = curRecord->id; } + nrecords += _indexConsistency->getNumRecordChangesBeforeCursor(&dataSizeTotal); + + _indexConsistency->nextStage(); + if (results->valid) { recordStore->updateStatsAfterRepair(_opCtx, nrecords, dataSizeTotal); } - output->append("nInvalidDocuments", nInvalid); - output->appendNumber("nrecords", nrecords); + output->append("nInvalidDocuments", static_cast(nInvalid)); + output->appendNumber("nrecords", static_cast(nrecords)); } void RecordStoreValidateAdaptor::validateIndexKeyCount(IndexDescriptor* idx, diff --git a/src/mongo/db/catalog/private/record_store_validate_adaptor.h b/src/mongo/db/catalog/private/record_store_validate_adaptor.h index 60c08485fb..f227d94edf 100644 --- a/src/mongo/db/catalog/private/record_store_validate_adaptor.h +++ b/src/mongo/db/catalog/private/record_store_validate_adaptor.h @@ -52,12 +52,14 @@ class RecordStoreValidateAdaptor : public ValidateAdaptor { public: RecordStoreValidateAdaptor(OperationContext* opCtx, IndexConsistency* indexConsistency, + NamespaceString nss, ValidateCmdLevel level, IndexCatalog* ic, ValidateResultsMap* irm) : _opCtx(opCtx), _indexConsistency(indexConsistency), + _nss(std::move(nss)), _level(level), _indexCatalog(ic), _indexNsResultsMap(irm) {} @@ -74,6 +76,7 @@ public: */ void traverseIndex(const IndexAccessMethod* iam, const IndexDescriptor* descriptor, + bool background, ValidateResults* results, int64_t* numTraversedKeys); @@ -94,6 +97,7 @@ public: private: OperationContext* _opCtx; // Not owned. IndexConsistency* _indexConsistency; // Not owned. + const NamespaceString _nss; ValidateCmdLevel _level; IndexCatalog* _indexCatalog; // Not owned. ValidateResultsMap* _indexNsResultsMap; // Not owned. diff --git a/src/mongo/db/commands/validate.cpp b/src/mongo/db/commands/validate.cpp index d6cedeb95d..2e25200c15 100644 --- a/src/mongo/db/commands/validate.cpp +++ b/src/mongo/db/commands/validate.cpp @@ -102,6 +102,27 @@ public: const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj)); + // Only one validation per collection can be in progress, the rest wait until the current + // one is finished or the operation is interrupted. + { + stdx::unique_lock lock(_validationMutex); + try { + while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) { + opCtx->waitForConditionOrInterrupt(_validationNotifier, lock); + } + } catch (DBException& e) { + return appendCommandStatus(result, e.toStatus()); + } + + _validationsInProgress.insert(nss.ns()); + } + + ON_BLOCK_EXIT([&] { + stdx::lock_guard lock(_validationMutex); + _validationsInProgress.erase(nss.ns()); + _validationNotifier.notify_all(); + }); + const bool full = cmdObj["full"].trueValue(); const bool scanData = cmdObj["scandata"].trueValue(); @@ -162,35 +183,8 @@ public: return false; } - // Set it to false forcefully until it is fully implemented. - background = false; - result.append("ns", nss.ns()); - // Only one validation per collection can be in progress, the rest wait in order. - { - stdx::unique_lock lock(_validationMutex); - try { - while (_validationsInProgress.find(nss.ns()) != _validationsInProgress.end()) { - opCtx->waitForConditionOrInterrupt(_validationNotifier, lock); - } - } catch (AssertionException& e) { - appendCommandStatus( - result, - {ErrorCodes::CommandFailed, - str::stream() << "Exception during validation: " << e.toString()}); - return false; - } - - _validationsInProgress.insert(nss.ns()); - } - - ON_BLOCK_EXIT([&] { - stdx::lock_guard lock(_validationMutex); - _validationsInProgress.erase(nss.ns()); - _validationNotifier.notify_all(); - }); - ValidateResults results; Status status = collection->validate(opCtx, level, background, std::move(collLk), &results, &result); diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index fcf1a8c915..dd9a0a920c 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -231,6 +231,7 @@ jsTestOptions = function() { networkMessageCompressors: TestData.networkMessageCompressors, skipValidationOnInvalidViewDefinitions: TestData.skipValidationOnInvalidViewDefinitions, skipCollectionAndIndexValidation: TestData.skipCollectionAndIndexValidation, + useBackgroundValidation: TestData.useBackgroundValidation || false, // We default skipValidationOnNamespaceNotFound to true because mongod can end up // dropping a collection after calling listCollections (e.g. if a secondary applies an // oplog entry). -- 2.11.0 (Apple Git-81)