diff --git a/filter_backtrace.py b/filter_backtrace.py new file mode 100644 index 0000000000..9227689fda --- /dev/null +++ b/filter_backtrace.py @@ -0,0 +1,37 @@ +import json +import os +import buildscripts.mongosymb as mongosymb + +beginBacktrace = '----- BEGIN BACKTRACE -----' +endBacktrace = '----- END BACKTRACE -----' + +class State(object): + NORMAL = 0 + JSON_BACKTRACE = 1 + FRAMES_BACKTRACE = 2 + +state = State.NORMAL +framesBacktrace = [] +while True: + line = os.sys.stdin.readline() + if not line: + break + + line = line.strip() + if state == State.NORMAL: + os.sys.stdout.write(line + "\n") + if line == beginBacktrace: + state = State.JSON_BACKTRACE + elif state == State.JSON_BACKTRACE: + traceDoc = json.JSONDecoder().raw_decode(line)[0] + frames = mongosymb.symbolize_frames(traceDoc, mongosymb.path_dbg_file_resolver('./mongod'), '/opt/mongodbtoolchain/v2/bin/llvm-symbolizer') + mongosymb.classic_output(frames, os.sys.stdout, indent=2) + state = State.FRAMES_BACKTRACE + elif state == State.FRAMES_BACKTRACE: + if line == endBacktrace: + os.sys.stdout.write(line + "\n") + state = State.NORMAL + else: + framesBacktrace.append(line) + else: + print("BUGF") diff --git a/restart.js b/restart.js new file mode 100644 index 0000000000..79f964a0ff --- /dev/null +++ b/restart.js @@ -0,0 +1,79 @@ +(function test() { + // db.setLogLevel(3, "storage"); + rs.initiate(); + + isMaster = false; + while (!isMaster) { + sleep(100); + isMaster = rs.isMaster()["ismaster"]; + } + + print("ReplSet initiated"); + db.createCollection("bla", {validator: {name: {$type: "string"}}, validationAction: "error"}); + db.bla2.ensureIndex({"a": 1}); + db.bla2.ensureIndex({"b": 1}); + for (var i = 0; i < 200; ++i) { + // Only ~40-50 of these be "stable" + db.bla2.insert({_id: i}); + } + + // Everything here should be rolled back. + + // `bla` should remain with an {validationAction: "error"} and no index on {a: 1} + db.runCommand({collMod: "bla", validationAction: "warn"}); + db.bla.ensureIndex({a: 1}) + + // `bla3`s creation should be undone. + db.bla3.insert({}); + + // `bla2`s index on {c: 1} should be removed. Its index on {b: 1} should be rebuilt. + db.bla2.ensureIndex({"c": 1}); + db.bla2.dropIndex({"b": 1}); + + let testLocal = true; + let local = db.getSiblingDB("local"); + if (testLocal) { + local.notReplicated.insert({}); + } + + // Should demonstrate all 200 writes were successful from _id: 0 -> 199 + print("Before recover. First item: " + db.bla2.find().sort({_id: 1}).limit(1).next()["_id"] + + " Last item: " + db.bla2.find().sort({_id: -1}).limit(1).next()["_id"] + + " db.bla2.count(): " + db.bla2.count() + " db.bla2.find().itcount(): " + + db.bla2.find().itcount()); + printjson(db.adminCommand("restartStorageEngine")); + + if (testLocal) { + print("After first recover. Local count should be 1. Count: " + + local.notReplicated.count() + " ItCount: " + local.notReplicated.find().itcount()); + local.notReplicated.insert({}); + } + + // Should demonstrate there are ~40-50 documents remain. The `count` is not correct, but + // `itcount` + // is. + print("After first recover. First item: " + + db.bla2.find().sort({_id: 1}).limit(1).next()["_id"] + " Last item: " + + db.bla2.find().sort({_id: -1}).limit(1).next()["_id"] + " db.bla2.count(): " + + db.bla2.count() + " db.bla2.find().itcount(): " + db.bla2.find().itcount()); + + print("bla3 should be missing."); + printjson(db.getCollectionNames()); + + print( + "bla2 should only have an {_id: 1}, {a: 1} and {b: 1} index. {b: 1} should have been rebuilt. {c: 1} should be missing."); + printjson(db.bla2.getIndexes()); + printjson(db.adminCommand("restartStorageEngine")); + + if (testLocal) { + print("After second recover. Local count should be 2. Count: " + + local.notReplicated.count() + " ItCount: " + local.notReplicated.find().itcount()); + } + print("After second recover. First item: " + + db.bla2.find().sort({_id: 1}).limit(1).next()["_id"] + " Last item: " + + db.bla2.find().sort({_id: -1}).limit(1).next()["_id"] + " db.bla2.count(): " + + db.bla2.count() + " db.bla2.find().itcount(): " + db.bla2.find().itcount()); + + print("bla should have an `{options.validatorAction: error}`."); + printjson(db.bla.exists()); +}()); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b74534df19..df626ba1eb 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -722,6 +722,17 @@ env.Library( ], ) +env.Library( + target="recover_to_timestamp", + source=[ + "recover_to_timestamp.cpp", + ], + LIBDEPS=[ + "catalog/database_holder", + "repair_database", + ], +) + env.Library( target="repair_database", source=[ @@ -829,6 +840,7 @@ env.Library( "pipeline/serveronly", "prefetch", "query/query", + "recover_to_timestamp", "repair_database", "repl/bgsync", "repl/oplog_buffer_blocking_queue", @@ -1545,4 +1557,3 @@ env.CppUnitTest( 'write_ops', ], ) - diff --git a/src/mongo/db/catalog/database_holder_impl.cpp b/src/mongo/db/catalog/database_holder_impl.cpp index af081c9d59..8757720935 100644 --- a/src/mongo/db/catalog/database_holder_impl.cpp +++ b/src/mongo/db/catalog/database_holder_impl.cpp @@ -241,6 +241,7 @@ bool DatabaseHolderImpl::closeAll(OperationContext* opCtx, } Database* db = _dbs[name]; + UUIDCatalog::get(opCtx).onCloseDatabase(db); db->close(opCtx, reason); delete db; diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 527ae13670..498cd00420 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -186,6 +186,7 @@ env.Library( "plan_cache_commands.cpp", "rename_collection_cmd.cpp", "repair_cursor.cpp", + "restart_store_engine.cpp", "resize_oplog.cpp", "run_aggregate.cpp", "set_feature_compatibility_version_command.cpp", @@ -203,27 +204,28 @@ env.Library( '$BUILD_DIR/mongo/db/auth/authmongod', '$BUILD_DIR/mongo/db/background', '$BUILD_DIR/mongo/db/catalog/catalog', + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/collection', '$BUILD_DIR/mongo/db/catalog/index_key_validate', - '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/clientcursor', '$BUILD_DIR/mongo/db/cloner', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/index/index_access_methods', '$BUILD_DIR/mongo/db/index_d', '$BUILD_DIR/mongo/db/lasterror', - '$BUILD_DIR/mongo/db/write_ops', '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/pipeline/serveronly', + '$BUILD_DIR/mongo/db/recover_to_timestamp', '$BUILD_DIR/mongo/db/repair_database', - '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/isself', + '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/stats/serveronly', '$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1', '$BUILD_DIR/mongo/db/views/views_mongod', + '$BUILD_DIR/mongo/db/write_ops', '$BUILD_DIR/mongo/s/client/parallel', 'core', 'current_op_common', diff --git a/src/mongo/db/commands/restart_store_engine.cpp b/src/mongo/db/commands/restart_store_engine.cpp new file mode 100644 index 0000000000..3142302a69 --- /dev/null +++ b/src/mongo/db/commands/restart_store_engine.cpp @@ -0,0 +1,88 @@ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include +#include + +#include "mongo/db/auth/action_set.h" +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/privilege.h" +#include "mongo/db/background.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/commands.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/ftdc/ftdc_mongod.h" +#include "mongo/db/index_builder.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/recover_to_timestamp.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/util/log.h" + +namespace mongo { + +using std::string; +using std::stringstream; + +class RestartStorageEngineCmd : public BasicCommand { +public: + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + virtual bool adminOnly() const { + return false; + } + virtual bool slaveOk() const { + return true; + } + virtual bool maintenanceMode() const { + return true; + } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector* out) { + ActionSet actions; + actions.addAction(ActionType::compact); + out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions)); + } + virtual void help(stringstream& help) const { + help << "compact collection\n" + "warning: this operation locks the database and is slow. you can cancel with " + "killOp()\n" + "{ compact : , [force:], [validate:],\n" + " [paddingFactor:], [paddingBytes:] }\n" + " force - allows to run on a replica set primary\n" + " validate - check records are noncorrupt before adding to newly compacting " + "extents. slower but safer (defaults to true in this version)\n"; + } + RestartStorageEngineCmd() : BasicCommand("restartStorageEngine") {} + + virtual bool run(OperationContext* opCtx, + const string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + Lock::GlobalLock global(opCtx, MODE_X, UINT_MAX); + + // auto replCoordinator = repl::getGlobalReplicationCoordinator(); + // if (replCoordinator) { + // replCoordinator->stopNoopWriter(); + // } + // stopMongoDFTDC(); + + auto res = recoverToStableTimestamp(opCtx); + log() << "RecoverToStableTimestamp result " << res.getValue().asU64(); + result.appendTimestamp("snapshotName", res.getValue().asU64()); + + // if (replCoordinator) { + // replCoordinator->startNoopWriter(repl::OpTime()); + // } + // startMongoDFTDC(); + + return true; + } +}; +static RestartStorageEngineCmd restartCmd; +} diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp index c79147f7ad..a84b472095 100644 --- a/src/mongo/db/concurrency/d_concurrency.cpp +++ b/src/mongo/db/concurrency/d_concurrency.cpp @@ -180,6 +180,10 @@ void Lock::GlobalLock::_unlock() { } } +bool Lock::GlobalLock::_inLastGlobal() { + return _opCtx->lockState()->inLastGlobal(); +} + Lock::DBLock::DBLock(OperationContext* opCtx, StringData db, LockMode mode) : _id(RESOURCE_DATABASE, db), diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index ca06084916..45e522a04e 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -188,10 +188,10 @@ public: EnqueueOnly enqueueOnly); ~GlobalLock() { - _unlock(); - if (!_opCtx->lockState()->isLocked()) { + if (isLocked() && _inLastGlobal()) { _opCtx->recoveryUnit()->abandonSnapshot(); } + _unlock(); } /** @@ -204,6 +204,13 @@ public: return _result == LOCK_OK; } + /** + * Only public for testing. Returns true if the next unlock call will release the global + * lock, i.e: the lock is not currently being held recursively. This method is illegal to + * call if `isLocked` is false. + */ + bool _inLastGlobal(); + private: void _enqueue(LockMode lockMode, unsigned timeoutMs); void _unlock(); diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index 11bda08b7f..b1608cd50a 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -905,5 +905,23 @@ TEST_F(DConcurrencyTestFixture, PerformanceMMAPv1CollectionExclusive) { kMaxPerfThreads); } +TEST_F(DConcurrencyTestFixture, TestGlobalLockInLastGlobal) { + auto clients = makeKClientsWithLockers(1); + auto opCtx = clients[0].second.get(); + + Lock::GlobalLock gw1(opCtx, MODE_IS, 0); + ASSERT(gw1.isLocked()); + ASSERT(gw1._inLastGlobal()); + + { + Lock::GlobalLock gw2(opCtx, MODE_S, 0); + ASSERT(gw2.isLocked()); + ASSERT_FALSE(gw2._inLastGlobal()); + } + + ASSERT(gw1.isLocked()); + ASSERT(gw1._inLastGlobal()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 721067e3aa..c03cd923ab 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -398,6 +398,13 @@ bool LockerImpl::unlockGlobal() { return true; } +template +bool LockerImpl::inLastGlobal() { + LockRequestsMap::Iterator request = _requests.find(resourceIdGlobal); + invariant(request); + return request->recursiveCount == 1; +} + template void LockerImpl::beginWriteUnitOfWork() { // Sanity check that write transactions under MMAP V1 have acquired the flush lock, so we diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h index 04e700c9aa..6ffaeacdfa 100644 --- a/src/mongo/db/concurrency/lock_state.h +++ b/src/mongo/db/concurrency/lock_state.h @@ -110,6 +110,7 @@ public: virtual void downgradeGlobalXtoSForMMAPV1(); virtual bool unlockGlobal(); + virtual bool inLastGlobal(); virtual void beginWriteUnitOfWork(); virtual void endWriteUnitOfWork(); diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index b973da6d66..64bc618894 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -141,6 +141,12 @@ public: */ virtual bool unlockGlobal() = 0; + /** + * Returns true iff the reference count on the global lock is one. This can be used to perform + * cleanup before releasing lock resources. + */ + virtual bool inLastGlobal() = 0; + /** * This is only necessary for the MMAP V1 engine and in particular, the fsyncLock command * which needs to first acquire the global lock in X-mode for truncating the journal and diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index 2a84be3657..7d07aadc2d 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -77,6 +77,10 @@ public: invariant(false); } + virtual bool inLastGlobal() { + invariant(false); + } + virtual void downgradeGlobalXtoSForMMAPV1() { invariant(false); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 681d831922..f05d5f31c8 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -331,7 +331,7 @@ void repairDatabasesAndCheckVersion(OperationContext* opCtx) { if (!storageGlobalParams.readOnly) { StatusWith> swIndexesToRebuild = storageEngine->reconcileCatalogAndIdents(opCtx); - fassertStatusOK(40593, swIndexesToRebuild); + fassertStatusOK(40597, swIndexesToRebuild); for (auto&& collIndexPair : swIndexesToRebuild.getValue()) { const std::string& coll = collIndexPair.first; const std::string& indexName = collIndexPair.second; diff --git a/src/mongo/db/ftdc/SConscript b/src/mongo/db/ftdc/SConscript index 5e78036c3e..223d8fb9f4 100644 --- a/src/mongo/db/ftdc/SConscript +++ b/src/mongo/db/ftdc/SConscript @@ -23,6 +23,7 @@ ftdcEnv.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/bson/util/bson_extract', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/third_party/s2/s2', # For VarInt diff --git a/src/mongo/db/ftdc/collector.cpp b/src/mongo/db/ftdc/collector.cpp index 78e4f0de60..a2d70ab86d 100644 --- a/src/mongo/db/ftdc/collector.cpp +++ b/src/mongo/db/ftdc/collector.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/ftdc/constants.h" #include "mongo/db/ftdc/util.h" #include "mongo/db/jsobj.h" @@ -67,6 +68,7 @@ std::tuple FTDCCollectorCollection::collect(Client* client) { // batches that are taking a long time. auto opCtx = client->makeOperationContext(); opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + Lock::GlobalLock lk(opCtx.get(), LockMode::MODE_IS, 0); for (auto& collector : _collectors) { BSONObjBuilder subObjBuilder(builder.subobjStart(collector->name())); diff --git a/src/mongo/db/index_rebuilder.cpp b/src/mongo/db/index_rebuilder.cpp index 3fa674392c..b2bd2a0adb 100644 --- a/src/mongo/db/index_rebuilder.cpp +++ b/src/mongo/db/index_rebuilder.cpp @@ -100,7 +100,6 @@ void checkNS(OperationContext* opCtx, const std::list& nsToCheck) { log() << "found " << indexesToBuild.size() << " interrupted index build(s) on " << nss.ns(); - if (firstTime) { log() << "note: restart the server with --noIndexBuildRetry " << "to skip index rebuilds"; diff --git a/src/mongo/db/recover_to_timestamp.cpp b/src/mongo/db/recover_to_timestamp.cpp new file mode 100644 index 0000000000..1cc58333b0 --- /dev/null +++ b/src/mongo/db/recover_to_timestamp.cpp @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "recover_to_timestamp.h" + +#include + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_catalog_entry.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repair_database.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/util/log.h" + +namespace mongo { + +class CollectionCatalogEntry; + +StatusWith recoverToStableTimestamp(OperationContext* opCtx) { + log() << "Call to recoverToStableTimestamp"; + invariant(opCtx->lockState()->isW()); + + StorageEngine* storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); + StatusWith recoverStatus = storageEngine->recoverToStableTimestamp(opCtx); + if (!recoverStatus.isOK()) { + return recoverStatus; + } + + BSONObjBuilder result; + const bool force = true; + const std::string reason = "recover to timestamp"; + if (!dbHolder().closeAll(opCtx, result, force, reason)) { + return {ErrorCodes::InternalError, "Failed to close all databases."}; + } + + Status catalogStatus = storageEngine->loadCatalog(opCtx); + if (!catalogStatus.isOK()) { + return catalogStatus; + } + + auto swIndexesToRebuild = storageEngine->reconcileCatalogAndIdents(opCtx); + if (!swIndexesToRebuild.isOK()) { + return swIndexesToRebuild.getStatus(); + } + + for (const auto& collIndexPair : swIndexesToRebuild.getValue()) { + const std::string& coll = collIndexPair.first; + const std::string& indexName = collIndexPair.second; + DatabaseCatalogEntry* dbce = + storageEngine->getDatabaseCatalogEntry(opCtx, NamespaceString(coll).db()); + invariant(dbce); + CollectionCatalogEntry* cce = dbce->getCollectionCatalogEntry(coll); + invariant(cce); + + StatusWith swIndexToRebuild(getIndexNameObjs( + opCtx, dbce, cce, [&indexName](const std::string& str) { return str == indexName; })); + if (!swIndexToRebuild.isOK() || swIndexToRebuild.getValue().first.empty()) { + severe() << "Unable to get indexes for collection. Collection: " << coll; + fassertFailedNoTrace(40595); + } + + invariant(swIndexToRebuild.getValue().first.size() == 1 && + swIndexToRebuild.getValue().second.size() == 1); + log() << "Rebuilding index for recoverToStableTimestamp. Collection: " << coll + << " IndexName: " << indexName; + fassertStatusOK(40596, + rebuildIndexesOnCollection(opCtx, dbce, cce, swIndexToRebuild.getValue())); + } + + std::vector databases; + storageEngine->listDatabases(&databases); + for (auto&& dbName : databases) { + Database* database = dbHolder().openDb(opCtx, dbName); + invariant(database); + std::list collections; + database->getDatabaseCatalogEntry()->getCollectionNamespaces(&collections); + for (auto&& collNamespace : collections) { + Collection* collection = database->getCollection(opCtx, collNamespace); + invariant(collection); + IndexCatalog* indexCatalog = collection->getIndexCatalog(); + invariant(indexCatalog); + // WriteUnitOfWork wuow(opCtx); + // indexCatalog->getAndClearUnfinishedIndexes(opCtx); + // wuow.commit(); + } + }; + + return recoverStatus; +} +} diff --git a/src/mongo/db/recover_to_timestamp.h b/src/mongo/db/recover_to_timestamp.h new file mode 100644 index 0000000000..c08d030138 --- /dev/null +++ b/src/mongo/db/recover_to_timestamp.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/db/storage/snapshot_name.h" + +namespace mongo { +class OperationContext; + +StatusWith recoverToStableTimestamp(OperationContext* opCtx); +} diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 0d8de17e58..87897f094e 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -34,6 +34,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/operation_context.h" +#include "mongo/db/recover_to_timestamp.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/roll_back_local_operations.h" @@ -115,12 +116,12 @@ Status RollbackImpl::runRollback(OperationContext* opCtx) { } // Recover to the stable timestamp while holding the global exclusive lock. - auto serviceCtx = opCtx->getServiceContext(); + // auto serviceCtx = opCtx->getServiceContext(); { Lock::GlobalWrite globalWrite(opCtx); - status = _storageInterface->recoverToStableTimestamp(serviceCtx); - if (!status.isOK()) { - return status; + StatusWith swSnapshotName = recoverToStableTimestamp(opCtx); + if (!swSnapshotName.isOK()) { + return swSnapshotName.getStatus(); } } diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 27507e790e..499f49ea75 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -304,7 +304,7 @@ public: * * The 'stable' timestamp is set by calling StorageInterface::setStableTimestamp. */ - virtual Status recoverToStableTimestamp(ServiceContext* serviceCtx) = 0; + // virtual Status recoverToStableTimestamp(ServiceContext* serviceCtx) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 35e67cbbe9..98ca2b4f67 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -906,10 +906,6 @@ void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx, serviceCtx->getGlobalStorageEngine()->setInitialDataTimestamp(snapshotName); } -Status StorageInterfaceImpl::recoverToStableTimestamp(ServiceContext* serviceCtx) { - return serviceCtx->getGlobalStorageEngine()->recoverToStableTimestamp(); -} - Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) { AutoGetDb autoDB(opCtx, "admin", MODE_X); auto adminDb = autoDB.getDb(); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 572cf4eb99..1db4882430 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -138,8 +138,6 @@ public: void setInitialDataTimestamp(ServiceContext* serviceCtx, SnapshotName snapshotName) override; - Status recoverToStableTimestamp(ServiceContext* serviceCtx) override; - /** * Checks that the "admin" database contains a supported version of the auth data schema. */ diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 7526acfafa..8d0cb825b0 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -252,9 +252,9 @@ public: SnapshotName getInitialDataTimestamp() const; - Status recoverToStableTimestamp(ServiceContext* serviceCtx) override { - return Status{ErrorCodes::IllegalOperation, "recoverToStableTimestamp not implemented."}; - } + // Status recoverToStableTimestamp(ServiceContext* serviceCtx) override { + // return Status{ErrorCodes::IllegalOperation, "recoverToStableTimestamp not implemented."}; + // } Status isAdminDbValid(OperationContext* opCtx) override { return isAdminDbValidFn(opCtx); @@ -265,8 +265,8 @@ public: [](const NamespaceString& nss, const CollectionOptions& options, const BSONObj idIndexSpec, - const std::vector& - secondaryIndexSpecs) -> StatusWith> { + const std::vector& secondaryIndexSpecs) + -> StatusWith> { return Status{ErrorCodes::IllegalOperation, "CreateCollectionForBulkFn not implemented."}; }; InsertDocumentFn insertDocumentFn = diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp index 5e23a27ff0..bd7e8653bb 100644 --- a/src/mongo/db/storage/kv/kv_catalog.cpp +++ b/src/mongo/db/storage/kv/kv_catalog.cpp @@ -299,6 +299,7 @@ void KVCatalog::init(OperationContext* opCtx) { auto cursor = _rs->getCursor(opCtx); while (auto record = cursor->next()) { BSONObj obj = record->data.releaseToBson(); + LOG(2) << "KVCatalog::init. Obj: " << obj; if (FeatureTracker::isFeatureDocument(obj)) { // There should be at most one version document in the catalog. @@ -316,6 +317,7 @@ void KVCatalog::init(OperationContext* opCtx) { _idents[ns] = Entry(ident, record->id); } + LOG(2) << "KVCatalog::init. _idents size: " << _idents.size(); if (!_featureTracker) { // If there wasn't a feature document, then just an initialize a feature tracker that // doesn't manage a feature document yet. diff --git a/src/mongo/db/storage/kv/kv_catalog_feature_tracker.h b/src/mongo/db/storage/kv/kv_catalog_feature_tracker.h index 549f453067..2c9dba62fa 100644 --- a/src/mongo/db/storage/kv/kv_catalog_feature_tracker.h +++ b/src/mongo/db/storage/kv/kv_catalog_feature_tracker.h @@ -168,10 +168,6 @@ public: _usedRepairableFeaturesMask = mask; } -private: - // Must go through FeatureTracker::get() or FeatureTracker::create(). - FeatureTracker(KVCatalog* catalog, RecordId rid) : _catalog(catalog), _rid(rid) {} - struct FeatureBits { NonRepairableFeatureMask nonRepairableFeatures; RepairableFeatureMask repairableFeatures; @@ -181,6 +177,10 @@ private: void putInfo(OperationContext* opCtx, const FeatureBits& versionInfo); +private: + // Must go through FeatureTracker::get() or FeatureTracker::create(). + FeatureTracker(KVCatalog* catalog, RecordId rid) : _catalog(catalog), _rid(rid) {} + KVCatalog* _catalog; RecordId _rid; diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 3350971bdd..28f1f8377e 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -35,6 +35,7 @@ #include #include "mongo/base/status.h" +#include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/storage/kv/kv_prefix.h" @@ -263,6 +264,10 @@ public: return false; } + virtual StatusWith recoverToStableTimestamp() { + fassertFailed(40598); + } + /** * See `StorageEngine::replicationBatchIsComplete()` */ diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index 55d03fd0b5..d549cd45d7 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -33,6 +33,7 @@ #include #include "mongo/db/operation_context_noop.h" +#include "mongo/db/storage/kv/kv_catalog_feature_tracker.h" #include "mongo/db/storage/kv/kv_database_catalog_entry.h" #include "mongo/db/storage/kv/kv_engine.h" #include "mongo/util/assert_util.h" @@ -81,20 +82,25 @@ KVStorageEngine::KVStorageEngine( !(options.directoryPerDB && !engine->supportsDirectoryPerDB())); OperationContextNoop opCtx(_engine->newRecoveryUnit()); + fassertStatusOK(40615, loadCatalog(&opCtx)); +} - bool catalogExists = engine->hasIdent(&opCtx, catalogInfo); +Status KVStorageEngine::loadCatalog(OperationContext* opCtx) { + log() << "Loading catalog here."; + bool catalogExists = _engine->hasIdent(opCtx, catalogInfo); - if (options.forRepair && catalogExists) { + if (_options.forRepair && catalogExists) { log() << "Repairing catalog metadata"; // TODO should also validate all BSON in the catalog. - engine->repairIdent(&opCtx, catalogInfo).transitional_ignore(); + _engine->repairIdent(opCtx, catalogInfo).transitional_ignore(); } + // log() << "AAAA. Exists" << catalogExists; if (!catalogExists) { - WriteUnitOfWork uow(&opCtx); + WriteUnitOfWork uow(opCtx); Status status = _engine->createGroupedRecordStore( - &opCtx, catalogInfo, catalogInfo, CollectionOptions(), KVPrefix::kNotPrefixed); + opCtx, catalogInfo, catalogInfo, CollectionOptions(), KVPrefix::kNotPrefixed); // BadValue is usually caused by invalid configuration string. // We still fassert() but without a stack trace. if (status.code() == ErrorCodes::BadValue) { @@ -103,35 +109,48 @@ KVStorageEngine::KVStorageEngine( fassert(28520, status); uow.commit(); } + log() << "BBBBBB"; + + _catalog.reset(nullptr); + log() << "B11111"; + _catalogRecordStore.reset(nullptr); + log() << "B22222"; _catalogRecordStore = _engine->getGroupedRecordStore( - &opCtx, catalogInfo, catalogInfo, CollectionOptions(), KVPrefix::kNotPrefixed); + opCtx, catalogInfo, catalogInfo, CollectionOptions(), KVPrefix::kNotPrefixed); + log() << "CCCCCCCCCc"; _catalog.reset(new KVCatalog( _catalogRecordStore.get(), _options.directoryPerDB, _options.directoryForIndexes)); - _catalog->init(&opCtx); + log() << "DDDDDd"; + _catalog->init(opCtx); + // log() << "EEEEEEEEe"; std::vector collections; _catalog->getAllCollections(&collections); + LOG(3) << "Num collections: " << collections.size(); KVPrefix maxSeenPrefix = KVPrefix::kNotPrefixed; for (size_t i = 0; i < collections.size(); i++) { std::string coll = collections[i]; + LOG(4) << " Thing: " << coll; NamespaceString nss(coll); string dbName = nss.db().toString(); // No rollback since this is only for committed dbs. KVDatabaseCatalogEntryBase*& db = _dbs[dbName]; + LOG(3) << " DB? " << static_cast(db); if (!db) { db = _databaseCatalogEntryFactory(dbName, this).release(); } - db->initCollection(&opCtx, coll, options.forRepair); - auto maxPrefixForCollection = _catalog->getMetaData(&opCtx, coll).getMaxPrefix(); + // log() << "InitCollection. Ns: " << nss; + db->initCollection(opCtx, coll, _options.forRepair); + auto maxPrefixForCollection = _catalog->getMetaData(opCtx, coll).getMaxPrefix(); maxSeenPrefix = std::max(maxSeenPrefix, maxPrefixForCollection); } KVPrefix::setLargestPrefix(maxSeenPrefix); - opCtx.recoveryUnit()->abandonSnapshot(); + return Status::OK(); } /** @@ -144,8 +163,17 @@ KVStorageEngine::KVStorageEngine( * Second, a KVCatalog may have a collection ident that the KVEngine does not. This is an * illegal state and this method fasserts. * - * Third, a KVCatalog may have an index ident that the KVEngine does not. This method will - * rebuild the index. + * Third, a KVCatalog has a set of indexes it expects to exist that the KVEngine may not have + * built to completition. There are two (potentially overlapping) cases that are rolled + * together. Any index in the KVCatalog with {ready: false} or any index ident in the KVCatalog + * that the KVEngine does not have will be returned to the caller. Both varieties will be + * normalized into the following state: + * + * 1) The KVCatalog entry will have the index marked as {ready: true}. + * 2) The KVEngine will drop the underlying table. + * + * The caller is expected to rebuild or delete these returned indexes before the + * SortedDataInterfaces are requested to be constructed via `DatabaseCatalogEntry::getIndex`. */ StatusWith> KVStorageEngine::reconcileCatalogAndIdents(OperationContext* opCtx) { @@ -212,9 +240,23 @@ KVStorageEngine::reconcileCatalogAndIdents(OperationContext* opCtx) { // corresponding ident. The caller is expected to rebuild these indexes. std::vector ret; for (const auto& coll : collections) { - const BSONCollectionCatalogEntry::MetaData metaData = _catalog->getMetaData(opCtx, coll); + BSONCollectionCatalogEntry::MetaData metaData = _catalog->getMetaData(opCtx, coll); for (const auto& indexMetaData : metaData.indexes) { const std::string& indexName = indexMetaData.name(); + if (!indexMetaData.ready) { + log() << "Index is in unready state in storage recovery, rebuilding. NS: " << coll + << " Index: " << indexName; + WriteUnitOfWork wuow(opCtx); + NamespaceString nss(coll); + getDatabaseCatalogEntry(opCtx, nss.db()) + ->getCollectionCatalogEntry(nss.ns()) + ->indexBuildSuccess(opCtx, indexName); + wuow.commit(); + + ret.push_back(CollectionIndexNamePair(coll, indexName)); + continue; + } + std::string indexIdent = _catalog->getIndexIdent(opCtx, coll, indexName); if (engineIdents.find(indexIdent) != engineIdents.end()) { continue; @@ -276,7 +318,11 @@ KVDatabaseCatalogEntryBase* KVStorageEngine::getDatabaseCatalogEntry(OperationCo } Status KVStorageEngine::closeDatabase(OperationContext* opCtx, StringData db) { - // This is ok to be a no-op as there is no database layer in kv. + stdx::lock_guard lk(_dbsLock); + KVDatabaseCatalogEntryBase* entry = _dbs[db.toString()]; + delete entry; + _dbs.erase(db.toString()); + return Status::OK(); } @@ -379,4 +425,32 @@ bool KVStorageEngine::supportsRecoverToStableTimestamp() const { void KVStorageEngine::replicationBatchIsComplete() const { return _engine->replicationBatchIsComplete(); } + +StatusWith KVStorageEngine::recoverToStableTimestamp(OperationContext* opCtx) { + // When recovering to the stable timestamp, the feature tracking document will also be + // reverted. However, some tables won't be rolled back, e.g: user collections on the `local` + // database. Currently, it's not possible to know where a feature is being used and therefore + // not safe to roll back the feature tracking document. It's worth noting traditional rollback + // never updates the feature tracking document. + // + // Make a best effort to maintain the current state of the feature tracking document + // throughout the recover process. This process of fetching the data into memory and + // overwriting the recovered value is not resilient to crashes. + KVCatalog::FeatureTracker::FeatureBits featureInfo; + { + WriteUnitOfWork wuow(opCtx); + featureInfo = _catalog->getFeatureTracker()->getInfo(opCtx); + } + + StatusWith status = _engine->recoverToStableTimestamp(); + if (!status.isOK()) { + return status; + } + + WriteUnitOfWork wuow(opCtx); + _catalog->getFeatureTracker()->putInfo(opCtx, featureInfo); + wuow.commit(); + + return status; +} } // namespace mongo diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index 580e1c7d06..dc85e69c83 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -120,6 +120,8 @@ public: virtual void replicationBatchIsComplete() const override; + virtual StatusWith recoverToStableTimestamp(OperationContext* opCtx) override; + SnapshotManager* getSnapshotManager() const final; void setJournalListener(JournalListener* jl) final; @@ -146,7 +148,11 @@ public: StatusWith> reconcileCatalogAndIdents( OperationContext* opCtx) override; + Status loadCatalog(OperationContext* opCtx) override; + private: + // virtual Status loadCatalog(OperationContext* opCtx); + class RemoveDBChange; stdx::function _databaseCatalogEntryFactory; diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 7af4e45b94..b93f2205e3 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -312,7 +312,7 @@ public: * * fasserts if StorageEngine::supportsRecoverToStableTimestamp() would return false. */ - virtual Status recoverToStableTimestamp() { + virtual StatusWith recoverToStableTimestamp(OperationContext* opCtx) { fassertFailed(40547); } @@ -349,6 +349,10 @@ public: return std::vector(); }; + virtual Status loadCatalog(OperationContext* opCtx) { + fassertFailed(40599); + } + protected: /** * The destructor will never be called. See cleanShutdown instead. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index dcfc241e9a..cdd3ea5c21 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -57,6 +57,7 @@ #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "wt_debug.h" #define TRACING_ENABLED 0 @@ -219,7 +220,7 @@ StatusWith WiredTigerIndex::generateCreateString(const std::string& << "formatVersion=" << keyStringVersion << ',' << "infoObj=" << desc.infoObj().jsonString() << "),"; - const bool keepOldLoggingSettings = true; + const bool keepOldLoggingSettings = false; if (keepOldLoggingSettings || WiredTigerUtil::useTableLogging(NamespaceString(desc.parentNS()), getGlobalReplSettings().usingReplSets())) { @@ -235,7 +236,7 @@ StatusWith WiredTigerIndex::generateCreateString(const std::string& int WiredTigerIndex::Create(OperationContext* opCtx, const std::string& uri, const std::string& config) { - // Don't use the session from the recovery unit: create should not be used in a transaction + // Use a dedicated session for create operations to avoid transaction issues. WiredTigerSession session(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->conn()); WT_SESSION* s = session.getSession(); LOG(1) << "create uri: " << uri << " config: " << config; @@ -530,6 +531,10 @@ public: _prefix(prefix) {} ~BulkBuilder() { + WT_LOG("Closing cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); _cursor->close(_cursor); } @@ -549,13 +554,22 @@ protected: WT_SESSION* session = _session->getSession(); int err = session->open_cursor( session, idx->uri().c_str(), NULL, "bulk,checkpoint_wait=false", &cursor); - if (!err) + if (!err) { + WT_LOG("Opening bulk cursor. Session: " << static_cast(session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << idx->uri()); return cursor; + } warning() << "failed to create WiredTiger bulk cursor: " << wiredtiger_strerror(err); warning() << "falling back to non-bulk cursor for index " << idx->uri(); invariantWTOK(session->open_cursor(session, idx->uri().c_str(), NULL, NULL, &cursor)); + WT_LOG("Opening cursor. Session: " << static_cast(session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << idx->uri()); return cursor; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 9752fa11c0..0899e6b658 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -108,6 +108,7 @@ public: LOG(1) << "starting " << name() << " thread"; while (!_shuttingDown.load()) { + sleep(5); try { const bool forceCheckpoint = false; const bool stableCheckpoint = false; @@ -165,7 +166,7 @@ public: const SnapshotName stableTimestamp(_stableTimestamp.load()); const SnapshotName initialDataTimestamp(_initialDataTimestamp.load()); - const bool keepOldBehavior = true; + const bool keepOldBehavior = false; try { if (keepOldBehavior) { @@ -216,7 +217,7 @@ public: // `_initialDataTimestamp` in all necessary cases. This may be removed when replication // believes all sets of `_initialDataTimestamp` are correct. See SERVER-30184, // SERVER-30185, SERVER-30335. - const bool keepOldBehavior = true; + const bool keepOldBehavior = false; if (keepOldBehavior) { return false; } @@ -225,13 +226,16 @@ public: static_cast(Timestamp::kAllowUnstableCheckpointsSentinel.asULL()); const std::uint64_t initialDataTimestamp = _initialDataTimestamp.load(); // Illegal to be called when the dataset is incomplete. + log() << "supportsRecoverToStableTimestamp. InitialDataTimestamp: " + << Timestamp(initialDataTimestamp) + << " StableTimestamp: " << Timestamp(_stableTimestamp.load()); invariant(initialDataTimestamp > allowUnstableCheckpointsSentinel); // Must return false until `recoverToStableTimestamp` is implemented. See SERVER-29213. if (keepOldBehavior) { return false; } - return _stableTimestamp.load() > initialDataTimestamp; + return _stableTimestamp.load() >= initialDataTimestamp; } void setStableTimestamp(SnapshotName stableTimestamp) { @@ -242,6 +246,10 @@ public: _initialDataTimestamp.store(initialDataTimestamp.asU64()); } + SnapshotName getStableTimestamp() { + return SnapshotName(_stableTimestamp.load()); + } + void shutdown() { _shuttingDown.store(true); _condvar.notify_one(); @@ -494,7 +502,7 @@ void WiredTigerKVEngine::cleanShutdown() { closeConfig = "leak_memory=true"; } - const bool keepOldBehavior = true; + const bool keepOldBehavior = false; const bool needsDowngrade = !_readOnly && serverGlobalParams.featureCompatibility.version.load() == ServerGlobalParams::FeatureCompatibility::Version::k34; @@ -588,7 +596,7 @@ Status WiredTigerKVEngine::repairIdent(OperationContext* opCtx, StringData ident } Status WiredTigerKVEngine::_salvageIfNeeded(const char* uri) { - // Using a side session to avoid transactional issues + // Use a dedicates session for salave operations to avoid transaction issues. WiredTigerSession sessionWrapper(_conn); WT_SESSION* session = sessionWrapper.getSession(); @@ -681,6 +689,8 @@ Status WiredTigerKVEngine::createGroupedRecordStore(OperationContext* opCtx, const CollectionOptions& options, KVPrefix prefix) { _checkIdentPath(ident); + + // Use a dedicated session for create operations to avoid transaction issues. WiredTigerSession session(_conn); const bool prefixed = prefix.isPrefixed(); @@ -867,6 +877,7 @@ bool WiredTigerKVEngine::haveDropsQueued() const { void WiredTigerKVEngine::dropSomeQueuedIdents() { int numInQueue; + // Use a dedicated session for create operations to avoid transaction issues. WiredTigerSession session(_conn); { @@ -996,7 +1007,7 @@ bool WiredTigerKVEngine::initRsOplogBackgroundThread(StringData ns) { } void WiredTigerKVEngine::setStableTimestamp(SnapshotName stableTimestamp) { - const bool keepOldBehavior = true; + const bool keepOldBehavior = false; // Communicate to WiredTiger what the "stable timestamp" is. Timestamp-aware checkpoints will // only persist to disk transactions committed with a timestamp earlier than the "stable // timestamp". @@ -1012,7 +1023,20 @@ void WiredTigerKVEngine::setStableTimestamp(SnapshotName stableTimestamp) { // taking "stable checkpoints". In the transitioning case, it's imperative for the "stable // timestamp" to have first been communicated to WiredTiger. if (!keepOldBehavior) { - std::string conf = str::stream() << "stable_timestamp=" << stableTimestamp.toString(); + static int counter = 0; + static SnapshotName oneHundred; + if (counter == 100) { + oneHundred = stableTimestamp; + log() << "One hundredth write! Timestamp: " << oneHundred.asU64(); + } + if (counter > 100) { + stableTimestamp = oneHundred; + } + ++counter; + + std::string conf = str::stream() << "stable_timestamp=" << stableTimestamp.toString() + << ",oldest_timestamp=" << stableTimestamp.toString(); + log() << "Setting stable timestamp. Value: " << stableTimestamp.asU64(); _conn->set_timestamp(_conn, conf.c_str()); } if (_checkpointThread) { @@ -1022,6 +1046,7 @@ void WiredTigerKVEngine::setStableTimestamp(SnapshotName stableTimestamp) { void WiredTigerKVEngine::setInitialDataTimestamp(SnapshotName initialDataTimestamp) { if (_checkpointThread) { + log() << "Setting initial data timestamp. Value: " << initialDataTimestamp.asU64(); _checkpointThread->setInitialDataTimestamp(initialDataTimestamp); } } @@ -1058,4 +1083,56 @@ void WiredTigerKVEngine::replicationBatchIsComplete() const { } } +StatusWith WiredTigerKVEngine::recoverToStableTimestamp() { + // If `supportsRecoverToStableTimestamp` returns `true`, `_checkpointThread` must exist. + invariant(_checkpointThread); + invariant(supportsRecoverToStableTimestamp()); + + log() << "WiredTiger::RecoverToStableTimestamp shutting down journal and checkpoint threads"; + // Shutdown WiredTigerKVEngine owned accesses into the storage engine. + _journalFlusher->shutdown(); + _checkpointThread->shutdown(); + + auto journaledUriCountDataSizePair = + _sizeStorer->getNamespaceToCountMap([](const std::string ns) { + const bool usingReplSets = true; + return WiredTigerUtil::useTableLogging(NamespaceString(ns), usingReplSets); + }); + + log() << "WiredTiger::RecoverToStableTimestamp Saving journaled table sizes."; + for (auto it = journaledUriCountDataSizePair.begin(); it != journaledUriCountDataSizePair.end(); + it++) { + log() << "\tUri: " << it->first << " Count: " << it->second.first + << " DataSize: " << it->second.second; + } + + // Even if the stable timestamp has passed the initial data timestamp, a checkpoint may not + // yet have been taken. Doing a checkpoint here also minimizes the amount of oplog replication + // must replay. + const bool forceCheckpoint = true; + const bool stableCheckpoint = true; + _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); + auto stableTimestamp = _checkpointThread->getStableTimestamp(); + + int ret = _conn->rollback_to_stable(_conn, nullptr); + if (ret) { + return {ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Error rolling back to stable. Err: " << wiredtiger_strerror(ret)}; + } + + _journalFlusher = stdx::make_unique(_sessionCache.get()); + _journalFlusher->go(); + _checkpointThread = stdx::make_unique(_sessionCache.get()); + _checkpointThread->setInitialDataTimestamp(stableTimestamp); + _checkpointThread->setStableTimestamp(stableTimestamp); + _checkpointThread->go(); + + _sizeStorer->fillCache(); + for (auto it = journaledUriCountDataSizePair.begin(); it != journaledUriCountDataSizePair.end(); + it++) { + _sizeStorer->storeToCache(it->first, it->second.first, it->second.second); + } + + return stableTimestamp; +} } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index b9758b814e..b5ea29224a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -168,6 +168,8 @@ public: virtual bool supportsRecoverToStableTimestamp() const override; + virtual StatusWith recoverToStableTimestamp() override; + // wiredtiger specific // Calls WT_CONNECTION::reconfigure on the underlying WT_CONNECTION // held by this class diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index d0d19578af..133d482f47 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -62,6 +62,7 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" +#include "wt_debug.h" namespace mongo { @@ -505,6 +506,11 @@ public: void save() final { if (_cursor && !wt_keeptxnopen()) { try { + WT_LOG("Resetting cursor. Session: " << static_cast(_cursor->session) + << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); _cursor->reset(_cursor); } catch (const WriteConflictException& wce) { // Ignore since this is only called when we are about to kill our transaction @@ -521,6 +527,10 @@ public: invariantWTOK(session->open_cursor( session, _rs->_uri.c_str(), nullptr, _config.c_str(), &_cursor)); invariant(_cursor); + WT_LOG("Opening cursor. Session: " << static_cast(session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); } return true; } @@ -529,6 +539,11 @@ public: invariant(_opCtx); _opCtx = nullptr; if (_cursor) { + WT_LOG("Closing cursor. Session: " << static_cast(_cursor->session) + << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); invariantWTOK(_cursor->close(_cursor)); } _cursor = nullptr; @@ -606,10 +621,9 @@ StatusWith WiredTigerRecordStore::generateCreateString( } ss << ")"; - const bool keepOldLoggingSettings = true; - if (keepOldLoggingSettings || - WiredTigerUtil::useTableLogging(NamespaceString(ns), - getGlobalReplSettings().usingReplSets())) { + const bool keepOldBehavior = false; + if (keepOldBehavior || WiredTigerUtil::useTableLogging( + NamespaceString(ns), getGlobalReplSettings().usingReplSets())) { ss << ",log=(enabled=true)"; } else { ss << ",log=(enabled=false)"; @@ -642,8 +656,8 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, Status versionStatus = WiredTigerUtil::checkApplicationMetadataFormatVersion( ctx, _uri, kMinimumRecordStoreVersion, kMaximumRecordStoreVersion) .getStatus(); + LOG(1) << "WiredTigerRecordStore::WiredTigerRecordStore. NS: " << ns(); if (!versionStatus.isOK()) { - std::cout << " Version: " << versionStatus.reason() << std::endl; if (versionStatus.code() == ErrorCodes::FailedToParse) { uasserted(28548, versionStatus.reason()); } else { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index f3a66ada68..8931b82399 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -40,6 +40,9 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/stacktrace.h" +#include "wt_debug.h" namespace mongo { namespace { @@ -190,9 +193,11 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { } if (commit) { + WT_LOG("Commit transaction. Session: " << static_cast(s)); invariantWTOK(s->commit_transaction(s, NULL)); LOG(3) << "WT commit_transaction for snapshot id " << _mySnapshotId; } else { + WT_LOG("Rollback transaction. Session: " << static_cast(s)); invariantWTOK(s->rollback_transaction(s, NULL)); LOG(3) << "WT rollback_transaction for snapshot id " << _mySnapshotId; } @@ -241,6 +246,7 @@ void WiredTigerRecoveryUnit::_txnOpen() { _sessionCache->snapshotManager().beginTransactionOnOplog( _sessionCache->getKVEngine()->getOplogManager(), session); } else { + WT_LOG("Begin transaction. Session: " << static_cast(session)); invariantWTOK(session->begin_transaction(session, NULL)); } @@ -290,6 +296,10 @@ WiredTigerCursor::~WiredTigerCursor() { } void WiredTigerCursor::reset() { + WT_LOG("Resetting cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); invariantWTOK(_cursor->reset(_cursor)); } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index fdffe30c5e..6b45b879ca 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -42,9 +42,12 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" +#include "mongo/util/stacktrace.h" +#include "wt_debug.h" namespace mongo { @@ -56,6 +59,8 @@ WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, uint64_t epoch, uint64 _cursorsCached(0), _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); + WT_LOG("Session opened. Session: " << static_cast(_session)); + // printStackTrace(); } WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, @@ -70,10 +75,13 @@ WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, _cursorsCached(0), _cursorsOut(0) { invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); + WT_LOG("Session opened. Session: " << static_cast(_session)); + // printStackTrace(); } WiredTigerSession::~WiredTigerSession() { if (_session) { + WT_LOG("Session closed. Session: " << static_cast(_session)); invariantWTOK(_session->close(_session, NULL)); } } @@ -86,6 +94,10 @@ WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id, boo _cursors.erase(i); _cursorsOut++; _cursorsCached--; + WT_LOG("Re-using cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(c) + << " Uri: " + << c->uri); return c; } } @@ -93,6 +105,10 @@ WT_CURSOR* WiredTigerSession::getCursor(const std::string& uri, uint64_t id, boo WT_CURSOR* c = NULL; int ret = _session->open_cursor( _session, uri.c_str(), NULL, forRecordStore ? "" : "overwrite=false", &c); + WT_LOG("Opening cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(c) + << " Uri: " + << c->uri); if (ret != ENOENT) invariantWTOK(ret); if (c) @@ -105,6 +121,10 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { invariant(cursor); _cursorsOut--; + WT_LOG("Resetting cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << cursor->uri); invariantWTOK(cursor->reset(cursor)); // Cursors are pushed to the front of the list and removed from the back @@ -120,6 +140,10 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { cursor = _cursors.back()._cursor; _cursors.pop_back(); _cursorsCached--; + WT_LOG("Closing cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << cursor->uri); invariantWTOK(cursor->close(cursor)); } } @@ -130,6 +154,10 @@ void WiredTigerSession::closeAllCursors(const std::string& uri) { for (auto i = _cursors.begin(); i != _cursors.end();) { WT_CURSOR* cursor = i->_cursor; if (cursor && uri == cursor->uri) { + WT_LOG("Closing cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << cursor->uri); invariantWTOK(cursor->close(cursor)); i = _cursors.erase(i); } else @@ -146,6 +174,10 @@ void WiredTigerSession::closeCursorsForQueuedDrops(WiredTigerKVEngine* engine) { for (auto i = toDrop.begin(); i != toDrop.end(); i++) { WT_CURSOR* cursor = i->_cursor; if (cursor) { + WT_LOG("Closing cursor. Session: " << static_cast(_session) << " Cursor: " + << static_cast(cursor) + << " Uri: " + << cursor->uri); invariantWTOK(cursor->close(cursor)); } } @@ -223,7 +255,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableC { stdx::unique_lock lk(_journalListenerMutex); JournalListener::Token token = _journalListener->getToken(); - const bool keepOldBehavior = true; + const bool keepOldBehavior = false; if (keepOldBehavior) { invariantWTOK(s->checkpoint(s, nullptr)); } else { @@ -316,6 +348,8 @@ UniqueWiredTigerSession WiredTigerSessionCache::getSession() { // discarding older ones WiredTigerSession* cachedSession = _sessions.back(); _sessions.pop_back(); + WT_LOG( + "Re-using session. Session: " << static_cast(cachedSession->getSession())); return UniqueWiredTigerSession(cachedSession); } } @@ -352,6 +386,7 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { invariant(range == 0); // Release resources in the session we're about to cache. + WT_LOG("Session reset. Session: " << static_cast(ss)); invariantWTOK(ss->reset(ss)); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp index 23a8c8f90f..78a038224f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.cpp @@ -46,6 +46,7 @@ #include "mongo/stdx/thread.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" +#include "wt_debug.h" namespace mongo { @@ -63,10 +64,13 @@ WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, WT_SESSION* session = _session.getSession(); std::string config = WiredTigerCustomizationHooks::get(getGlobalServiceContext()) - ->getTableCreateConfig(storageUri); + ->getTableCreateConfig(storageUri) + + ",log=(enabled=false),"; if (!readOnly) { + log() << "Creating sizeStorer. Config: " << config; invariantWTOK(session->create(session, storageUri.c_str(), config.c_str())); - const bool keepOldLoggingSettings = true; + + const bool keepOldLoggingSettings = false; if (keepOldLoggingSettings) { logSizeStorerTable = true; } @@ -76,6 +80,10 @@ WiredTigerSizeStorer::WiredTigerSizeStorer(WT_CONNECTION* conn, invariantWTOK( session->open_cursor(session, storageUri.c_str(), NULL, "overwrite=true", &_cursor)); + WT_LOG("Opening cursor. Session: " << static_cast(session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << storageUri); _magic = MAGIC; } @@ -85,6 +93,10 @@ WiredTigerSizeStorer::~WiredTigerSizeStorer() { stdx::lock_guard cursorLock(_cursorMutex); _magic = 11111; + WT_LOG("Closing cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); _cursor->close(_cursor); } @@ -110,6 +122,7 @@ void WiredTigerSizeStorer::onCreate(WiredTigerRecordStore* rs, void WiredTigerSizeStorer::onDestroy(WiredTigerRecordStore* rs) { _checkMagic(); stdx::lock_guard lk(_entriesMutex); + LOG(3) << "WiredTigerSizeStorer::onDestroy. NS: " << rs->ns() << " URI: " << rs->getURI(); Entry& entry = _entries[rs->getURI()]; entry.numRecords = rs->numRecords(NULL); entry.dataSize = rs->dataSize(NULL); @@ -149,11 +162,26 @@ void WiredTigerSizeStorer::fillCache() { Map m; { // Seek to beginning if needed. + WT_LOG("Resetting cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); invariantWTOK(_cursor->reset(_cursor)); // Intentionally ignoring return value. - ON_BLOCK_EXIT(_cursor->reset, _cursor); - + ON_BLOCK_EXIT([this] { + WT_LOG("Resetting cursor. Session: " << static_cast(_cursor->session) + << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri; + _cursor->reset(_cursor)); + }); + + WT_LOG("Re-using cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); int cursorNextRet; while ((cursorNextRet = _cursor->next(_cursor)) != WT_NOTFOUND) { invariantWTOK(cursorNextRet); @@ -210,8 +238,16 @@ void WiredTigerSizeStorer::syncCache(bool syncToDisk) { return; // Nothing to do. WT_SESSION* session = _session.getSession(); + ON_BLOCK_EXIT([session] { + WT_LOG("Session reset. Session: " << static_cast(session)); + session->reset(session); + }); + WT_LOG("Begin transaction. Session: " << static_cast(session)); invariantWTOK(session->begin_transaction(session, syncToDisk ? "sync=true" : "")); - ScopeGuard rollbacker = MakeGuard(session->rollback_transaction, session, ""); + ScopeGuard rollbacker = MakeGuard([session] { + WT_LOG("Rollback transaction. Session: " << static_cast(session)); + session->rollback_transaction(session, ""); + }); for (Map::iterator it = myMap.begin(); it != myMap.end(); ++it) { string uriKey = it->first; @@ -229,14 +265,23 @@ void WiredTigerSizeStorer::syncCache(bool syncToDisk) { WiredTigerItem key(uriKey.c_str(), uriKey.size()); WiredTigerItem value(data.objdata(), data.objsize()); + WT_LOG("Re-using cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); _cursor->set_key(_cursor, key.Get()); _cursor->set_value(_cursor, value.Get()); invariantWTOK(_cursor->insert(_cursor)); } + WT_LOG("Resetting cursor. Session: " << static_cast(_cursor->session) << " Cursor: " + << static_cast(_cursor) + << " Uri: " + << _cursor->uri); invariantWTOK(_cursor->reset(_cursor)); rollbacker.Dismiss(); + WT_LOG("Commit transaction. Session: " << static_cast(session)); invariantWTOK(session->commit_transaction(session, NULL)); { @@ -246,4 +291,22 @@ void WiredTigerSizeStorer::syncCache(bool syncToDisk) { } } } + +std::map +WiredTigerSizeStorer::getNamespaceToCountMap(stdx::function filter) { + std::map ret; + stdx::lock_guard lk(_entriesMutex); + for (auto it = _entries.begin(); it != _entries.end(); ++it) { + if (!it->second.rs) { + continue; + } + + const auto& collName = it->second.rs->ns(); + if (filter(collName)) { + ret[it->first] = CountDataSizePair(it->second.numRecords, it->second.dataSize); + } + } + + return ret; } +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h index 67ec4d25c6..e0a9b7eb68 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_size_storer.h @@ -33,6 +33,7 @@ #include #include +#include #include #include "mongo/base/string_data.h" @@ -69,6 +70,15 @@ public: */ void syncCache(bool syncToDisk); + /** + * Return the current count and data size for all "db.coll" namespaces that successfully match + * the filter. The key in the returned map is the `uri` that can be passed into `storeToCache` + * or `loadFromCache`. + */ + typedef std::pair CountDataSizePair; + std::map getNamespaceToCountMap( + stdx::function filter); + private: void _checkMagic() const; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp index cd58ea18a1..c5a7495234 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp @@ -41,6 +41,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "wt_debug.h" namespace mongo { @@ -114,6 +115,7 @@ SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot( StringBuilder config; config << "snapshot=" << _committedSnapshot->asU64(); + WT_LOG("Create snapshot. Session: " << static_cast(session)); invariantWTOK(session->begin_transaction(session, config.str().c_str())); return *_committedSnapshot; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp index 862dd35966..b129e3e041 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp @@ -53,6 +53,31 @@ namespace mongo { using std::string; +StatusWith getMetadataRaw(WT_SESSION* session, StringData uri) { + WT_CURSOR* cursor; + invariantWTOK(session->open_cursor(session, "metadata:create", nullptr, "", &cursor)); + invariant(cursor); + ON_BLOCK_EXIT([cursor] { invariantWTOK(cursor->close(cursor)); }); + + std::string strUri = uri.toString(); + cursor->set_key(cursor, strUri.c_str()); + int ret = cursor->search(cursor); + if (ret == WT_NOTFOUND) { + return StatusWith(ErrorCodes::NoSuchKey, + str::stream() << "Unable to find metadata for " << uri); + } else if (ret != 0) { + return StatusWith(wtRCToStatus(ret)); + } + const char* metadata = NULL; + ret = cursor->get_value(cursor, &metadata); + if (ret != 0) { + return StatusWith(wtRCToStatus(ret)); + } + invariant(metadata); + return StatusWith(metadata); +} + + Status wtRCToStatus_slow(int retCode, const char* prefix) { if (retCode == 0) return Status::OK(); @@ -462,17 +487,42 @@ bool WiredTigerUtil::useTableLogging(NamespaceString ns, bool replEnabled) { } Status WiredTigerUtil::setTableLogging(OperationContext* opCtx, const std::string& uri, bool on) { - WiredTigerRecoveryUnit* recoveryUnit = WiredTigerRecoveryUnit::get(opCtx); - return setTableLogging(recoveryUnit->getSession(opCtx)->getSession(), uri, on); + // Try to close as much as possible to avoid EBUSY errors. + WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->closeAllCursors(uri); + WiredTigerSessionCache* sessionCache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache(); + sessionCache->closeAllCursors(uri); + + // Use a dedicated session for alter operations to avoid transaction issues. + WiredTigerSession session(sessionCache->conn()); + return setTableLogging(session.getSession(), uri, on); } Status WiredTigerUtil::setTableLogging(WT_SESSION* session, const std::string& uri, bool on) { - const bool keepOldBehavior = true; + const bool ensureRightLogging = true; + if (ensureRightLogging) { + auto swMetadata = getMetadataRaw(session, uri); + uassertStatusOK(swMetadata); + + if (on) { + if (swMetadata.getValue().find("log=(enabled=true)") != std::string::npos) { + return Status::OK(); + } + } else { + if (swMetadata.getValue().find("log=(enabled=false)") != std::string::npos) { + return Status::OK(); + } + } + return {ErrorCodes::InternalError, + str::stream() << "Wrong table logging settings. Expected on? " << on}; + } + + + const bool keepOldBehavior = false; if (keepOldBehavior) { return Status::OK(); } - LOG(3) << "Changing logging values. Uri: " << uri << " Enabled? " << on; + log() << "Changing logging values. Uri: " << uri << " Enabled? " << on; int ret; if (on) { ret = session->alter(session, uri.c_str(), "log=(enabled=true)"); @@ -481,6 +531,10 @@ Status WiredTigerUtil::setTableLogging(WT_SESSION* session, const std::string& u } if (ret) { + auto swMetadata = getMetadataRaw(session, uri); + uassertStatusOK(swMetadata); + log() << "Existing metadata: " << swMetadata.getValue(); + invariantWTOK(ret); return Status(ErrorCodes::WriteConflict, str::stream() << "Failed to update log setting. Uri: " << uri << " Enable? " << on diff --git a/src/mongo/db/storage/wiredtiger/wt_debug.h b/src/mongo/db/storage/wiredtiger/wt_debug.h new file mode 100644 index 0000000000..94422ae0cd --- /dev/null +++ b/src/mongo/db/storage/wiredtiger/wt_debug.h @@ -0,0 +1,3 @@ + +#define WT_LOG(x) +// #define WT_LOG(x) log() << "WT_DEBUG. " << x diff --git a/src/third_party/wiredtiger/meta.diff b/src/third_party/wiredtiger/meta.diff new file mode 100644 index 0000000000..579f843b37 --- /dev/null +++ b/src/third_party/wiredtiger/meta.diff @@ -0,0 +1,62 @@ +diff --git a/src/btree/bt_cursor.c b/src/btree/bt_cursor.c +index 6e1ab526e..4e01e380a 100644 +--- a/src/btree/bt_cursor.c ++++ b/src/btree/bt_cursor.c +@@ -729,6 +729,8 @@ __wt_btcur_insert(WT_CURSOR_BTREE *cbt) + retry: WT_ERR(__cursor_func_init(cbt, true)); + + if (btree->type == BTREE_ROW) { ++ __wt_verbose(session, WT_VERB_METADATA, "%s", ++ "BTCUR_INSERT: search first"); + WT_ERR(__cursor_row_search(session, cbt, NULL, true)); + /* + * If not overwriting, fail if the key exists, else insert the +@@ -739,6 +741,9 @@ retry: WT_ERR(__cursor_func_init(cbt, true)); + WT_ERR(WT_DUPLICATE_KEY); + + ret = __cursor_row_modify(session, cbt, WT_UPDATE_STANDARD); ++ if (ret != 0) ++ __wt_verbose(session, WT_VERB_METADATA, ++ "BTCUR_INSERT: ERR %d", ret); + } else { + /* + * Optionally insert a new record (ignoring the application's +diff --git a/src/cursor/cur_file.c b/src/cursor/cur_file.c +index b5a8e1353..b37b5c51f 100644 +--- a/src/cursor/cur_file.c ++++ b/src/cursor/cur_file.c +@@ -236,6 +236,8 @@ __curfile_insert(WT_CURSOR *cursor) + if (!F_ISSET(cursor, WT_CURSTD_APPEND)) + WT_ERR(__cursor_checkkey(cursor)); + WT_ERR(__cursor_checkvalue(cursor)); ++ __wt_verbose(session, WT_VERB_METADATA, "%s", ++ "CURFILE_INSERT: Call wt_btcur_insert"); + + WT_ERR(__wt_btcur_insert(cbt)); + +@@ -250,6 +252,9 @@ __curfile_insert(WT_CURSOR *cursor) + F_MASK(cursor, WT_CURSTD_KEY_SET) == 0)); + + err: CURSOR_UPDATE_API_END(session, ret); ++ if (ret != 0) ++ __wt_verbose(session, WT_VERB_METADATA, ++ "CURFILE_INSERT: ERR %d wt_btcur_insert", ret); + return (ret); + } + +diff --git a/src/meta/meta_table.c b/src/meta/meta_table.c +index 83688bbd5..e7a89ee3f 100644 +--- a/src/meta/meta_table.c ++++ b/src/meta/meta_table.c +@@ -209,6 +209,11 @@ __wt_metadata_update( + cursor->set_value(cursor, value); + WT_ERR(cursor->insert(cursor)); + err: WT_TRET(__wt_metadata_cursor_release(session, &cursor)); ++ if (ret != 0) ++ __wt_verbose(session, WT_VERB_METADATA, ++ "Update: INSERT ERR %d key: %s, value: %s, tracking: %s, %s" "turtle", ++ ret, key, value, WT_META_TRACKING(session) ? "true" : "false", ++ __metadata_turtle(key) ? "" : "not "); + return (ret); + } + diff --git a/src/third_party/wiredtiger/src/conn/conn_api.c b/src/third_party/wiredtiger/src/conn/conn_api.c index b29b6184ce..3372a338f6 100644 --- a/src/third_party/wiredtiger/src/conn/conn_api.c +++ b/src/third_party/wiredtiger/src/conn/conn_api.c @@ -2082,6 +2082,7 @@ wiredtiger_open(const char *home, WT_EVENT_HANDLER *event_handler, const char *enc_cfg[] = { NULL, NULL }, *merge_cfg; char version[64]; + printf("src/third_party/WiredTiger engaged\n"); /* Leave lots of space for optional additional configuration. */ const char *cfg[] = { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }; diff --git a/track_cursors.py b/track_cursors.py new file mode 100644 index 0000000000..bf6d959cc8 --- /dev/null +++ b/track_cursors.py @@ -0,0 +1,127 @@ +from pprint import pprint +import re +import os + +open_cursor = re.compile('Opening cursor. Session: 0x(.*?) Cursor: 0x(.*?) Uri: (.*)') +reset_cursor = re.compile('Resetting cursor. Session: 0x(.*?) Cursor: 0x(.*?) Uri: (.*)') +reuse_cursor = re.compile('Re-using cursor. Session: 0x(.*?) Cursor: 0x(.*?) Uri: (.*)') +close_cursor = re.compile('Closing cursor. Session: 0x(.*?) Cursor: 0x(.*?) Uri: (.*)') + +close_session = re.compile('Session closed. Session: 0x(.*)') +reset_session = re.compile('Session reset. Session: 0x(.*)') +open_session = re.compile('Session opened. Session: 0x(.*)') +reuse_session = re.compile('Re-using session. Session: 0x(.*)') + +begin_transaction = re.compile('Begin transaction. Session: 0x(.*)') +commit_transaction = re.compile('Commit transaction. Session: 0x(.*)') +rollback_transaction = re.compile('Rollback transaction. Session: 0x(.*)') + +coll_cursor_map = {} +session_cursor_map = {} + +cursors_map = {} +sessions_map = {} + +for line in os.sys.stdin: + line = line.strip() + + if 'WT_DEBUG. Opening cursor' in line: + items = open_cursor.findall(line)[0] + cursors = coll_cursor_map.get(items[-1], {}) + coll_cursor_map[items[-1]] = cursors + cursors[items[1]] = "Open" + cursors_map[items[1]] = ("Open", items[0], items[-1]) + + cursors = session_cursor_map.get(items[0], set([])) + session_cursor_map[items[0]] = cursors + cursors.add(items[1]) + continue + + if 'WT_DEBUG. Resetting cursor' in line: + items = reset_cursor.findall(line)[0] + cursors = coll_cursor_map.get(items[-1], {}) + coll_cursor_map[items[-1]] = cursors + cursors[items[1]] = "Reset" + cursors_map[items[1]] = ("Reset", items[0], items[-1]) + + continue + + if 'WT_DEBUG. Re-using cursor' in line: + items = reuse_cursor.findall(line)[0] + cursors = coll_cursor_map.get(items[-1], {}) + coll_cursor_map[items[-1]] = cursors + cursors[items[1]] = "Open" + cursors_map[items[1]] = ("Open", items[0], items[-1]) + + continue + + if 'WT_DEBUG. Session closed' in line: + items = close_session.findall(line) + for cursor in session_cursor_map.get(items[0], []): + existing = cursors_map[cursor] + cursors_map[cursor] = ("Closed via session close.", items[0], existing[-1]) + session_cursor_map[items[0]] = set([]) + sessions_map[items[0]] = 'Closed' + continue + + if 'WT_DEBUG. Session reset' in line: + items = reset_session.findall(line) + for cursor in session_cursor_map.get(items[0], []): + existing = cursors_map[cursor] + cursors_map[cursor] = ("Closed via session reset.", items[0], existing[-1]) + session_cursor_map[items[0]] = set([]) + sessions_map[items[0]] = 'Reset' + continue + + if 'WT_DEBUG. Closing cursor' in line: + items = close_cursor.findall(line)[0] + cursors = coll_cursor_map.get(items[-1], {}) + coll_cursor_map[items[-1]] = cursors + cursors[items[1]] = "Closed" + cursors_map[items[1]] = ("Closed", items[0], items[-1]) + continue + + if 'WT_DEBUG. Session opened' in line: + items = open_session.findall(line) + session = items[0] + sessions_map[session] = 'Opened' + continue + + if 'WT_DEBUG. Begin transaction' in line: + items = begin_transaction.findall(line) + session = items[0] + sessions_map[session] = 'In Transaction' + continue + + if 'WT_DEBUG. Commit transaction' in line: + items = commit_transaction.findall(line) + session = items[0] + sessions_map[session] = 'Committed Transaction' + continue + + if 'WT_DEBUG. Rollback transaction' in line: + items = rollback_transaction.findall(line) + session = items[0] + sessions_map[session] = 'Rolledback Transaction' + continue + + if 'WT_DEBUG. Re-using session' in line: + items = reuse_session.findall(line) + session = items[0] + sessions_map[session] = 'Re-using' + continue + + if 'WT_DEBUG.' in line: + print('Unknown WT_DEBUG.\n\t' + line) + os.sys.exit(1) + + if line == 'STOP HERE': + print('Early stop.') + break + +# print coll_cursor_map['table:_mdb_catalog'] +print("Cursor Map\n==========") +pprint(cursors_map) + +print("Session Map\n==========") +pprint(sessions_map)