Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-57345

Retryable write pre-fetch missing documents

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major - P3
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 5.1.0-rc0
    • Component/s: Replication
    • Labels:
      None
    • Backwards Compatibility:
      Fully Compatible
    • Operating System:
      ALL
    • Sprint:
      Repl 2021-06-14

      Description

      During the investigation of SERVER-56631, another issue with retryable write pre-fetch popped up. The issue happens when we pause after recipient connects to the donor secondary as the sync source, then perform retryable writes on donor primary and wait for the sync source secondaries to replicate and advance committed snapshot to the last retryable write statement. Unlike SERVER-56631 where the committed snapshot is not at the secondary's batch boundary, this time the committed snapshot is at the boundary and is the last retryable write statement. Then we continue migration and pre-fetch retryable write statements. However the pre-fetch result is randomly missing some documents in the middle. As a result, after tenant migration completes and retries the previous retryable writes on recipient, we expect none of these statements are executed, but due to the missing documents during pre-fetch, these missing statements are re-executed, which is not correct. However if we advance the timestamp by writing something else after the retryable write on the donor primary (basically advance the committed snapshot after the last retryable write statement), then the pre-fetched result is correct. We suspect there is a problem in the aggregation pipeline we use to pre-fetch retryable writes.

       

      Here is the test script to repro this issue:

      /**
       * @tags: [
       *   requires_fcv_50,
       *   requires_majority_read_concern,
       *   incompatible_with_eft,
       *   incompatible_with_windows_tls,
       *   incompatible_with_macos, requires_persistence
       * ]
       */
       
       (function() {
          load("jstests/replsets/libs/tenant_migration_test.js");
          load("jstests/replsets/libs/tenant_migration_util.js");
          load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
          load("jstests/libs/uuid_util.js");       // For extractUUIDFromObject().
          load("jstests/libs/write_concern_util.js");
          
          const getRecipientCurrOp = function(conn, migrationId) {
              const res = conn.adminCommand({currentOp: true, desc: "tenant recipient migration"});
              assert.eq(res.inprog.length, 1);
              const currOp = res.inprog[0];
              assert.eq(bsonWoCompare(currOp.instanceID, migrationId), 0);
          
              return currOp;
          };
          
          const getDonorSyncSource = function(conn, migrationId) {
              const currOp = getRecipientCurrOp(conn, migrationId);
              return currOp.donorSyncSource;
          };
          
          const getStartFetchingDonorOpTime = function(conn, migrationId) {
              const currOp = getRecipientCurrOp(conn, migrationId);
              return currOp.startFetchingDonorOpTime;
          }
          
          const oplogApplierBatchSize = 50;
          
          const donorRst = new ReplSetTest({
              nodes: 3,
              // Force secondaries to sync from the primary to guarantee replication progress with the
              // stopReplProducerOnDocument failpoint. Also disable primary catchup because some
              // replicated retryable write statements are intentionally not being made majority
              // committed.
              settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
              nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
                  setParameter: {
                      tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000,
                      // Allow non-timestamped reads on donor after migration completes for testing.
                      'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
                  }
              }),
          });
          donorRst.startSet();
          donorRst.initiateWithHighElectionTimeout();
          const donorPrimary = donorRst.getPrimary();
          
          if (!TenantMigrationUtil.isFeatureFlagEnabled(donorPrimary)) {
              jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
              donorRst.stopSet();
              return;
          }
          
          const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst: donorRst});
          
          const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
          const kTenantId = "testTenantId";
          const migrationId = UUID();
          const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
          const kCollName = "retryable_write_secondary_oplog_application";
          const kNs = `${kDbName}.${kCollName}`;
          
          const migrationOpts = {
              migrationIdString: extractUUIDFromObject(migrationId),
              tenantId: kTenantId,
              // Use secondary as sync source.
              readPreference: {mode: 'secondary'},
          };
          
          const fpAfterConnectingTenantMigrationRecipientInstance = configureFailPoint(
              recipientPrimary,
              "fpAfterConnectingTenantMigrationRecipientInstance",
              {action: "hang"});
          
          // Start tenant migration and hang after recipient connects to donor sync source.
          jsTestLog("Starting tenant migration.");
          assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
          fpAfterConnectingTenantMigrationRecipientInstance.wait();
          
          assert.commandWorked(
              donorPrimary.getCollection(kNs).insert({_id: 0, counter: 0}, {writeConcern: {w: 3}}));
          donorRst.awaitReplication();
          
          const counterTotal = oplogApplierBatchSize;
          const counterMajorityCommitted = counterTotal - 2;
          jsTestLog(`counterTotal: ${counterTotal}, counterMajorityCommitted: ${counterMajorityCommitted}`);
          
          // Perform all the the retryable write statements on donor primary.
          const lsid = ({id: UUID()});
          assert.commandWorked(donorPrimary.getCollection(kNs).runCommand("update", {
              updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
              lsid,
              txnNumber: NumberLong(1),
          }));
       
          donorRst.awaitReplication();
          
          const stmtTotal = donorPrimary.getCollection("local.oplog.rs")
                                .findOne({"o.diff.u.counter": counterTotal});
          const stmtMajorityCommitted = donorPrimary.getCollection("local.oplog.rs")
                                            .findOne({"o.diff.u.counter": counterMajorityCommitted});
          
          assert.neq(null, stmtTotal);
          assert.neq(null, stmtMajorityCommitted);
          jsTestLog(`stmtTotal timestamp: ${tojson(stmtTotal.ts)}`);
          jsTestLog(`stmtMajorityCommitted timestamp: ${tojson(stmtMajorityCommitted.ts)}`);
       
          // Un-commenting this would make the pre-fetch result correct.
          // assert.commandWorked(
          //     donorPrimary.getCollection(kNs).insert({_id: 1, data: 0}, {writeConcern: {w: 3}}));
          // donorRst.awaitReplication();
          
          for (const s of donorRst.getSecondaries()) {
              assert.soon(() => {
                  const {optimes: {appliedOpTime, durableOpTime}} =
                      assert.commandWorked(s.adminCommand({replSetGetStatus: 1}));
          
                  print(`${s.host}: ${tojsononeline({
                      appliedOpTime,
                      durableOpTime,
                      stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
                  })}`);
          
                  return bsonWoCompare(appliedOpTime.ts, stmtTotal.ts) >= 0 &&
                      bsonWoCompare(durableOpTime.ts, stmtTotal.ts) >= 0;
              });
          }
          
          for (const s of donorRst.getSecondaries()) {
              assert.soon(() => {
                  const {lastStableRecoveryTimestamp} =
                      assert.commandWorked(s.adminCommand({replSetGetStatus: 1}));
              
                  print(`${s.host}: ${tojsononeline({
                      lastStableRecoveryTimestamp,
                      stmtTotalTimestamp: stmtTotal.ts
                  })}`);
              
                  return bsonWoCompare(lastStableRecoveryTimestamp, stmtTotal.ts) >= 0;
              });
          }
       
          const secondary1 = donorRst.getSecondaries()[0];
          const primaryTxRes = donorPrimary.getCollection("config.transactions").find().toArray();
          jsTestLog(`donor primary txRes: ${tojson(primaryTxRes)}`);
          const secondaryTxRes = secondary1.getCollection("config.transactions").find().toArray();
          jsTestLog(`donor secondary txRes: ${tojson(secondaryTxRes)}`);
       
          const secondaryViewRes = secondary1.getCollection("local.system.tenantMigration.oplogView").find().readConcern("majority").toArray();
          jsTestLog(`donor secondary secondaryViewRes: ${tojson(secondaryViewRes)}`);
          const secondaryCounterRes = secondary1.getCollection(kNs).find().readConcern("majority").toArray();
          jsTestLog(`donor secondary secondaryCounterRes: ${tojson(secondaryCounterRes)}`);
       
          const fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime = configureFailPoint(
              recipientPrimary,
              "fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime",
              {action: "hang"});
          
          fpAfterConnectingTenantMigrationRecipientInstance.off();
          fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.wait();
          
          const startFetchingDonorOpTime = getStartFetchingDonorOpTime(recipientPrimary, migrationId);
          // assert.eq(startFetchingDonorOpTime.ts, stmtMajorityCommitted.ts);
          assert.gte(startFetchingDonorOpTime.ts, stmtTotal.ts);
          
          fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime.off();
          
          TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
          assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
          
          const findRes = recipientPrimary.getCollection(kNs).findOne({_id: 0});
          jsTestLog(`findRes 1: ${tojson(findRes)}`);
          
          assert.commandWorked(recipientPrimary.getCollection(kNs).runCommand("update", {
              updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
              lsid,
              txnNumber: NumberLong(1),
          }));
          
          // Expect the counter's result to be the same as the previous result, i.e. no statement is re-executed.
          const findRes2 = recipientPrimary.getCollection(kNs).findOne({_id: 0});
          jsTestLog(`findRes 2: ${tojson(findRes2)}`);
          assert.eq(findRes, findRes2);
       
          const recipientLogs = recipientPrimary.getCollection("local.oplog.rs").find().toArray();
          jsTestLog(`recipient logs: ${tojson(recipientLogs)}`);
          
          donorRst.stopSet();
          tenantMigrationTest.stop();
      })();
      

        Attachments

          Issue Links

            Activity

              People

              Assignee:
              wenbin.zhu Wenbin Zhu
              Reporter:
              wenbin.zhu Wenbin Zhu
              Participants:
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved: