From fc2976376ebd845c3d5f778ec011221120ca7d6a Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Tue, 3 Aug 2021 08:54:35 +0000 Subject: [PATCH] SERVER-58416 POC: Implementing a 2-phase commit protocol on the moveChunk operation --- .../db/s/migration_destination_manager.cpp | 18 ++++++- .../db/s/migration_destination_manager.h | 4 +- ...on_destination_manager_legacy_commands.cpp | 49 ++++++++++++++++++- src/mongo/db/s/migration_source_manager.cpp | 14 ++++-- 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 1451fd2ece..a58fb7acd1 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -60,6 +60,8 @@ #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/range_deletion_task_gen.h" +#include "mongo/db/s/recoverable_critical_section_service.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" @@ -548,7 +550,8 @@ void MigrationDestinationManager::abortWithoutSessionIdCheck() { _errmsg = "aborted without session id check"; } -Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) { +Status MigrationDestinationManager::startCommit(OperationContext* opCtx, + const MigrationSessionId& sessionId) { stdx::unique_lock lock(_mutex); @@ -611,6 +614,19 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } + // For simplicity of this POC, using the recoverable critical section instead of the regular one + // because the regular one requires the acquiring opCtx to stay alive until it is released. This + // didn't fit the current implementation of the recvChunk commands. + RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, _nss, BSON("recvChunk" << 1), ShardingCatalogClient::kMajorityWriteConcern); + + return Status::OK(); +} + +Status MigrationDestinationManager::commitPhase2(OperationContext* opCtx) { + forceShardFilteringMetadataRefresh(opCtx, _nss); + RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, _nss, BSON("recvChunk" << 1), ShardingCatalogClient::kMajorityWriteConcern); return Status::OK(); } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index a76e4468f7..3d86833fb5 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -132,7 +132,9 @@ public: */ void abortWithoutSessionIdCheck(); - Status startCommit(const MigrationSessionId& sessionId); + Status startCommit(OperationContext* opCtx, const MigrationSessionId& sessionId); + + Status commitPhase2(OperationContext* opCtx); /** * Gets the collection indexes from fromShardId. If given a chunk manager, will fetch the diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 1f6ac6ad58..a3e2446bbc 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -211,7 +211,7 @@ public: BSONObjBuilder& result) override { auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)); auto const mdm = MigrationDestinationManager::get(opCtx); - Status const status = mdm->startCommit(sessionId); + Status const status = mdm->startCommit(opCtx, sessionId); mdm->report(result, opCtx, false); if (!status.isOK()) { LOGV2(22014, @@ -282,5 +282,52 @@ public: } recvChunkAbortCommand; +class RecvChunkCommitPhase2Command : public BasicCommand { +public: + RecvChunkCommitPhase2Command() : BasicCommand("_recvChunkCommitPhase2") {} + + std::string help() const override { + return "internal"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector* out) const override { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + auto const mdm = MigrationDestinationManager::get(opCtx); + Status const status = mdm->commitPhase2(opCtx); + if (!status.isOK()) { + LOGV2(22014, + "_recvChunkCommitPhase2 failed: {error}", + "_recvChunkCommitPhase2 failed", + "error"_attr = redact(status)); + uassertStatusOK(status); + } + return true; + } + +} recvChunkCommitPhase2Command; + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 13deb093da..5df0b24a4e 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -52,6 +52,7 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/sharding_statistics.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/db/s/type_shard_collection.h" #include "mongo/db/vector_clock.h" #include "mongo/executor/task_executor.h" @@ -528,10 +529,15 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { const ChunkRange range(_args.getMinKey(), _args.getMaxKey()); if (!MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) { - // Best-effort make the recipient refresh its routing table to the new collection - // version. - refreshRecipientRoutingTable( - _opCtx, getNss(), _recipientHost, refreshedMetadata.getCollVersion()); + auto recipientShardId = + Grid::get(_opCtx)->shardRegistry()->getShardForHostNoReload(_recipientHost)->getId(); + + auto executor = Grid::get(_opCtx)->getExecutorPool()->getFixedExecutor(); + sharding_util::sendCommandToShards(_opCtx, + "admin"_sd, + BSON("_recvChunkCommitPhase2" << _args.getNss().ns()), + {recipientShardId}, + executor); } std::string orphanedRangeCleanUpErrMsg = str::stream() -- 2.17.1