diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 8a76189a52c..dea5b3b5ebf 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -49,7 +49,13 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/session_update_tracker.h" +#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" +#include "mongo/db/repl/tenant_migration_donor_access_blocker.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_donor_access_blocker.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/db/session_catalog_mongod.h" @@ -57,15 +63,25 @@ #include "mongo/logv2/log.h" #include "mongo/util/concurrency/thread_pool.h" +using namespace tenant_migration_access_blocker; + namespace mongo { namespace repl { +using namespace tenant_migration_access_blocker; + + MigrationProtocolEnum protocol, + StringData donorConnectionString, MONGO_FAIL_POINT_DEFINE(hangInTenantOplogApplication); MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch); TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, + MigrationProtocolEnum protocol, + StringData donorConnectionString, OpTime applyFromOpTime, + _protocol(protocol), + _donorConnectionString(donorConnectionString), RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr executor, ThreadPool* writerPool, @@ -73,6 +89,8 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId), _migrationUuid(migrationUuid), _tenantId(tenantId), + _protocol(protocol), + _donorConnectionString(donorConnectionString), _beginApplyingAfterOpTime(applyFromOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), @@ -961,6 +979,30 @@ Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( invariant(oplogApplicationMode == OplogApplication::Mode::kInitialSync); auto op = entryOrGroupedInserts.getOp(); + if (_protocol == MigrationProtocolEnum::kShardMerge && + op.getCommandType() == OplogEntry::CommandType::kCreate) { + if (auto tenantId = parseTenantIdFromDB(op.getNss().db())) { + if (!tenant_migration_access_blocker::getTenantMigrationRecipientAccessBlocker( + opCtx->getServiceContext(), *tenantId)) { + LOGV2_DEBUG( + 6114101, + 1, + "Adding shard merge recipient access blocker due to 'create' oplog entry", + "migrationId"_attr = _migrationUuid, + "nss"_attr = op.getNss(), + "tenantId"_attr = *tenantId, + "op"_attr = redact(op.toBSONForLogging())); + auto mtab = std::make_shared( + opCtx->getServiceContext(), + _migrationUuid, + *tenantId, + MigrationProtocolEnum::kShardMerge, + _donorConnectionString); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .add(*tenantId, mtab); + } + } + } if (op.isIndexCommandType() && op.getCommandType() != OplogEntry::CommandType::kCreateIndexes && op.getCommandType() != OplogEntry::CommandType::kDropIndexes) { LOGV2_ERROR(488610, diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 58c4533b0d6..666c33fd63d 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/tenant_oplog_batcher.h" +#include "mongo/db/serverless/serverless_types_gen.h" #include "mongo/util/future.h" namespace mongo { @@ -73,6 +74,8 @@ public: TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, + MigrationProtocolEnum protocol, + StringData donorConnectionString, OpTime applyFromOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr executor, @@ -160,6 +163,8 @@ private: std::shared_ptr _oplogBatcher; // (R) const UUID _migrationUuid; // (R) const std::string _tenantId; // (R) + const MigrationProtocolEnum _protocol; // (R) + std::string _donorConnectionString; // (R) const OpTime _beginApplyingAfterOpTime; // (R) RandomAccessOplogBuffer* _oplogBuffer; // (R) std::shared_ptr _executor; // (R) diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 1592a825083..43e929b463a 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -181,11 +181,19 @@ public: return StorageInterface::get(_opCtx->getServiceContext()); } + std::shared_ptr makeApplier() { + auto writerPool = makeTenantMigrationWriterPool(); + return std::make_shared( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + } + protected: OplogBufferMock _oplogBuffer; executor::NetworkInterfaceMock* _net; std::shared_ptr _executor; std::string _tenantId = "tenant"; + MigrationProtocolEnum _protocol = MigrationProtocolEnum::kMultitenantMigrations; + std::string _donorConnectionString = "host:1234"; UUID _migrationUuid = UUID::gen(); ServiceContext::UniqueOperationContext _opCtx; TenantOplogApplierTestOpObserver* _opObserver; // Owned by service context opObserverRegistry @@ -203,10 +211,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"), UUID::gen())); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); @@ -229,10 +234,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeBatch) { } pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); // Even if we wait for the first op in a batch, it is the last op we should be notified on. auto lastBatchTimes = applier->getNotificationForOpTime(srcOps.front().getOpTime()).get(); @@ -255,10 +257,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForMultipleBatches) { srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "baz"), UUID::gen())); srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bif"), UUID::gen())); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(2 /* ops */); @@ -297,10 +296,7 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { 2, dbName, /* prepared */ false, {innerOps1, innerOps2, innerOps3}); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); // The first two ops should come in the first batch. auto firstBatchFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); @@ -355,10 +351,7 @@ TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) ASSERT_FALSE(docExists(_opCtx.get(), nss, doc2)); pushOps({partialOp, commitOp}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(commitOp.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -378,10 +371,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_DatabaseMissing) { const NamespaceString&, const std::vector&) { onInsertsCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -400,10 +390,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_CollectionMissing) { const NamespaceString&, const std::vector&) { onInsertsCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -432,10 +419,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_InsertExisting) { onUpdateCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -468,10 +452,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_UniqueKey_InsertExisting) { const NamespaceString&, const std::vector&) { onInsertsCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -497,10 +478,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsert_Success) { ASSERT_BSONOBJ_EQ(docs[0], entry.getObject()); }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -553,9 +531,7 @@ TEST_F(TenantOplogApplierTest, ApplyInserts_Grouped) { pushOps(entries); // Make sure all ops end up in a single thread so they can be batched. auto writerPool = makeTenantMigrationWriterPool(1); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entries.back().getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -580,10 +556,7 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_MissingDocument) { onUpdateCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -608,10 +581,7 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { ASSERT_EQUALS(uuid, args.uuid); }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -630,10 +600,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DatabaseMissing) { StmtId, const OplogDeleteEntryArgs&) { onDeleteCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -654,10 +621,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_CollectionMissing) { StmtId, const OplogDeleteEntryArgs&) { onDeleteCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -679,10 +643,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_DocumentMissing) { StmtId, const OplogDeleteEntryArgs&) { onDeleteCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -715,10 +676,7 @@ TEST_F(TenantOplogApplierTest, ApplyDelete_Success) { ASSERT_EQUALS(uuid, observer_uuid); }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -743,10 +701,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_CollExisting) { const BSONObj&) { applyCmdCalled = true; }; auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -777,10 +732,7 @@ TEST_F(TenantOplogApplierTest, ApplyRenameCollCommand_CollExisting) { bool stayTemp) { applyCmdCalled = true; }; auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -812,10 +764,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_Success) { }; auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -853,10 +802,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateIndexesCommand_Success) { }; auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -878,10 +824,7 @@ TEST_F(TenantOplogApplierTest, ApplyStartIndexBuildCommand_Failure) { << "ts" << Timestamp(1, 1) << "ui" << uuid); auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_EQUALS(opAppliedFuture.getNoThrow().getStatus().code(), 5434700); @@ -906,10 +849,7 @@ TEST_F(TenantOplogApplierTest, ApplyCreateCollCommand_WrongNSS) { const BSONObj&) { applyCmdCalled = true; }; auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -937,10 +877,7 @@ TEST_F(TenantOplogApplierTest, ApplyDropIndexesCommand_IndexNotFound) { auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -974,10 +911,7 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_IndexNotFound) { auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -1012,10 +946,7 @@ TEST_F(TenantOplogApplierTest, ApplyCollModCommand_CollectionMissing) { auto entry = OplogEntry(op); pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -1036,10 +967,7 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongNSS) { const NamespaceString& nss, const std::vector& docs) { onInsertsCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -1061,10 +989,7 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) { const NamespaceString& nss, const std::vector& docs) { onInsertsCalled = true; }; pushOps({entry}); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(entry.getOpTime()); ASSERT_NOT_OK(opAppliedFuture.getNoThrow().getStatus()); @@ -1078,10 +1003,7 @@ TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { std::vector srcOps; srcOps.push_back(makeNoopOplogEntry(1, "foo")); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1102,10 +1024,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { std::vector srcOps; srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[0].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1127,10 +1046,7 @@ TEST_F(TenantOplogApplierTest, ApplyInsertThenResumeTokenNoopInDifferentBatch_Su srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"), UUID::gen())); srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); tenantApplierBatchSizeBytes.store(100 * 1024 /* bytes */); tenantApplierBatchSizeOps.store(1 /* ops */); @@ -1157,10 +1073,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsertInSameBatch_Success srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "foo"), UUID::gen())); pushOps(srcOps); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1184,10 +1097,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Succe srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime()); - auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow(); @@ -1211,9 +1121,7 @@ TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg)); pushOps(srcOps); auto writerPool = makeTenantMigrationWriterPool(); - - auto applier = std::make_shared( - _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + auto applier = makeApplier(); ASSERT_OK(applier->startup()); auto opAppliedFuture = applier->getNotificationForOpTime(srcOps[1].getOpTime()); auto futureRes = opAppliedFuture.getNoThrow();